Skip to main content

mlt_core/frames/v01/stream/
parse.rs

1use std::fmt::{self, Debug};
2use std::io::{self, Write};
3
4use integer_encoding::VarIntWriter as _;
5
6use crate::analyse::{Analyze, StatType};
7use crate::codecs::varint::parse_varint;
8use crate::errors::fail_if_invalid_stream_size;
9use crate::utils::{BinarySerializer as _, parse_u8, take};
10use crate::v01::{
11    IntEncoding, LogicalEncoding, LogicalTechnique, MortonMeta, PhysicalEncoding, RawStream,
12    RawStreamData, RleMeta, StreamMeta, StreamType,
13};
14use crate::{MltError, MltRefResult, MltResult, Parser};
15
16impl IntEncoding {
17    #[must_use]
18    pub const fn new(logical: LogicalEncoding, physical: PhysicalEncoding) -> Self {
19        Self { logical, physical }
20    }
21
22    #[must_use]
23    pub const fn none() -> Self {
24        Self::new(LogicalEncoding::None, PhysicalEncoding::None)
25    }
26}
27
28impl StreamMeta {
29    #[must_use]
30    pub fn new(stream_type: StreamType, encoding: IntEncoding, num_values: u32) -> Self {
31        Self {
32            stream_type,
33            encoding,
34            num_values,
35        }
36    }
37
38    /// Parse stream from the input
39    ///
40    /// If `is_bool` is true, compute RLE parameters for boolean streams
41    /// automatically instead of reading them from the input.
42    ///
43    /// Returns the stream metadata and the size of the stream in bytes.
44    /// Reserves an upper-bound estimate of decoded bytes (`num_values * 8`) on the parser
45    /// for all stream types. RLE uses `num_rle_values * 8` since that is the actual expanded count.
46    pub(super) fn from_bytes<'a>(
47        input: &'a [u8],
48        is_bool: bool,
49        parser: &mut Parser,
50    ) -> MltRefResult<'a, (Self, u32)> {
51        use crate::v01::LogicalTechnique as LT;
52
53        let (input, stream_type) = StreamType::from_bytes(input)?;
54        let (input, val) = parse_u8(input)?;
55        let logical1 = LT::parse(val >> 5)?;
56        let logical2 = LT::parse((val >> 2) & 0x7)?;
57        let physical_encoding = PhysicalEncoding::parse(val & 0x3)?;
58
59        let (input, num_values) = parse_varint::<u32>(input)?;
60        let (input, byte_length) = parse_varint::<u32>(input)?;
61
62        let mut input = input;
63        let logical_encoding = match (logical1, logical2) {
64            (LT::None | LT::Delta | LT::ComponentwiseDelta | LT::PseudoDecimal, LT::None) => {
65                // Reserve decoded memory upper bound: worst case u64 = 8 bytes per value
66                let decoded_bytes = num_values.saturating_mul(8);
67                parser.reserve(decoded_bytes)?;
68                match logical1 {
69                    LT::None => LogicalEncoding::None,
70                    LT::Delta => LogicalEncoding::Delta,
71                    LT::ComponentwiseDelta => LogicalEncoding::ComponentwiseDelta,
72                    _ => LogicalEncoding::PseudoDecimal,
73                }
74            }
75            (LT::Delta, LT::Rle) | (LT::Rle, LT::None) => {
76                let runs;
77                let num_rle_values;
78                if is_bool {
79                    runs = num_values.div_ceil(8);
80                    num_rle_values = byte_length;
81                } else {
82                    (input, runs) = parse_varint::<u32>(input)?;
83                    (input, num_rle_values) = parse_varint::<u32>(input)?;
84                }
85                // Reserve decoded memory (worst case: u64 = 8 bytes per value)
86                let decoded_bytes = num_rle_values.saturating_mul(8);
87                parser.reserve(decoded_bytes)?;
88                let rle = RleMeta {
89                    runs,
90                    num_rle_values,
91                };
92                if logical1 == LT::Rle {
93                    LogicalEncoding::Rle(rle)
94                } else {
95                    LogicalEncoding::DeltaRle(rle)
96                }
97            }
98            (LT::Morton, LT::None | LT::Rle | LT::Delta) => {
99                // Reserve decoded memory upper bound: worst case u64 = 8 bytes per value
100                let decoded_bytes = num_values.saturating_mul(8);
101                parser.reserve(decoded_bytes)?;
102                let num_bits;
103                let coordinate_shift;
104                (input, num_bits) = parse_varint::<u32>(input)?;
105                (input, coordinate_shift) = parse_varint::<u32>(input)?;
106                let meta = MortonMeta {
107                    num_bits,
108                    coordinate_shift,
109                };
110                match logical2 {
111                    LT::Rle => LogicalEncoding::MortonRle(meta),
112                    LT::Delta => LogicalEncoding::MortonDelta(meta),
113                    _ => LogicalEncoding::Morton(meta),
114                }
115            }
116            _ => Err(MltError::InvalidLogicalEncodings(logical1, logical2))?,
117        };
118
119        let meta = StreamMeta::new(
120            stream_type,
121            IntEncoding::new(logical_encoding, physical_encoding),
122            num_values,
123        );
124        Ok((input, (meta, byte_length)))
125    }
126
127    pub fn write_to<W: Write>(
128        &self,
129        writer: &mut W,
130        is_bool: bool,
131        byte_length: u32,
132    ) -> io::Result<()> {
133        use LogicalEncoding as LE;
134        use LogicalTechnique as LT;
135
136        writer.write_u8(self.stream_type.as_u8())?;
137        let logical_enc_u8: u8 = match self.encoding.logical {
138            LE::None => (LT::None as u8) << 5,
139            LE::Delta => (LT::Delta as u8) << 5,
140            LE::DeltaRle(_) => ((LT::Delta as u8) << 5) | ((LT::Rle as u8) << 2),
141            LE::ComponentwiseDelta => (LT::ComponentwiseDelta as u8) << 5,
142            LE::Rle(_) => (LT::Rle as u8) << 5,
143            LE::Morton(_) => (LT::Morton as u8) << 5,
144            LE::MortonRle(_) => (LT::Morton as u8) << 5 | ((LT::Rle as u8) << 2),
145            LE::MortonDelta(_) => (LT::Morton as u8) << 5 | ((LT::Delta as u8) << 2),
146            LE::PseudoDecimal => (LT::PseudoDecimal as u8) << 5,
147        };
148        let physical_enc_u8: u8 = match self.encoding.physical {
149            PhysicalEncoding::None => 0x0,
150            PhysicalEncoding::FastPFOR => 0x1,
151            PhysicalEncoding::VarInt => 0x2,
152            PhysicalEncoding::Alp => 0x3,
153        };
154        writer.write_u8(logical_enc_u8 | physical_enc_u8)?;
155        writer.write_varint(self.num_values)?;
156        writer.write_varint(byte_length)?;
157
158        // some encoding have settings inside them
159        match self.encoding.logical {
160            LE::DeltaRle(r) | LE::Rle(r) => {
161                if !is_bool {
162                    writer.write_varint(r.runs)?;
163                    writer.write_varint(r.num_rle_values)?;
164                }
165            }
166            LE::Morton(m) | LE::MortonDelta(m) | LE::MortonRle(m) => {
167                writer.write_varint(m.num_bits)?;
168                writer.write_varint(m.coordinate_shift)?;
169            }
170            LE::None | LE::Delta | LE::ComponentwiseDelta | LE::PseudoDecimal => {}
171        }
172        Ok(())
173    }
174}
175
176impl Analyze for StreamMeta {
177    fn collect_statistic(&self, stat: StatType) -> usize {
178        if stat == StatType::DecodedMetaSize {
179            size_of::<Self>()
180        } else {
181            0
182        }
183    }
184}
185
186impl Debug for StreamMeta {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        // ensure we process all fields, and format them without the alt field
189        let Self {
190            stream_type,
191            encoding,
192            num_values,
193        } = self;
194        f.debug_struct("StreamMeta")
195            .field("stream_type", &format_args!("{stream_type:?}"))
196            .field("logical_encoding", &format_args!("{:?}", encoding.logical))
197            .field(
198                "physical_encoding",
199                &format_args!("{:?}", encoding.physical),
200            )
201            .field("num_values", &format_args!("{num_values:?}"))
202            .finish()
203    }
204}
205
206impl<'a> RawStream<'a> {
207    #[must_use]
208    pub fn new(meta: StreamMeta, data: RawStreamData<'a>) -> Self {
209        Self { meta, data }
210    }
211
212    #[must_use]
213    pub fn as_bytes(&self) -> &'a [u8] {
214        match &self.data {
215            RawStreamData::Encoded(v) | RawStreamData::VarInt(v) => v,
216        }
217    }
218
219    pub fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltRefResult<'a, Self> {
220        Self::from_bytes_internal(input, false, parser)
221    }
222
223    pub fn parse_multiple(
224        mut input: &'a [u8],
225        count: usize,
226        parser: &mut Parser,
227    ) -> MltRefResult<'a, Vec<Self>> {
228        let mut result = Vec::with_capacity(count);
229        for _ in 0..count {
230            let stream;
231            (input, stream) = RawStream::from_bytes_internal(input, false, parser)?;
232            result.push(stream);
233        }
234        Ok((input, result))
235    }
236
237    pub fn parse_bool(input: &'a [u8], parser: &mut Parser) -> MltRefResult<'a, Self> {
238        Self::from_bytes_internal(input, true, parser)
239    }
240
241    /// Parse stream from the input
242    /// If `is_bool` is true, compute RLE parameters for boolean streams
243    /// automatically instead of reading them from the input.
244    /// For RLE streams with `VarInt` data, validates that run lengths sum to `num_rle_values`.
245    fn from_bytes_internal(
246        input: &'a [u8],
247        is_bool: bool,
248        parser: &mut Parser,
249    ) -> MltRefResult<'a, Self> {
250        use LogicalEncoding as LE;
251        use PhysicalEncoding as PD;
252
253        let (input, (meta, byte_length)) = StreamMeta::from_bytes(input, is_bool, parser)?;
254
255        let (input, data) = take(input, byte_length)?;
256
257        // For RLE with VarInt physical encoding, validate stream: run lengths must sum to num_rle_values
258        if let LE::Rle(r) | LE::DeltaRle(r) = meta.encoding.logical
259            && matches!(meta.encoding.physical, PD::VarInt)
260            && !is_bool
261        {
262            validate_rle_varint_stream(data, r.runs, r.num_rle_values)?;
263        }
264
265        let stream_data = match meta.encoding.physical {
266            PD::None | PD::FastPFOR => RawStreamData::Encoded(data),
267            PD::VarInt => RawStreamData::VarInt(data),
268            PD::Alp => return Err(MltError::UnsupportedPhysicalEncoding("ALP")),
269        };
270
271        Ok((input, RawStream::new(meta, stream_data)))
272    }
273}
274
275/// Validate RLE stream data: first `runs` varints must sum to `num_rle_values`.
276fn validate_rle_varint_stream(data: &[u8], runs: u32, num_rle_values: u32) -> MltResult<()> {
277    use crate::utils::AsUsize as _;
278    let mut rest = data;
279    let mut sum: u64 = 0;
280    for _ in 0..runs {
281        let (next, len) = parse_varint::<u32>(rest)?;
282        rest = next;
283        sum = sum
284            .checked_add(len.into())
285            .ok_or(MltError::IntegerOverflow)?;
286    }
287    if sum != u64::from(num_rle_values) {
288        let sum_usize = usize::try_from(sum).map_err(|_| MltError::IntegerOverflow)?;
289        fail_if_invalid_stream_size(sum_usize, num_rle_values.as_usize())?;
290    }
291    Ok(())
292}