Skip to main content

mlt_core/frames/v01/
root.rs

1use crate::analyse::{Analyze, StatType};
2use crate::codecs::varint::parse_varint;
3use crate::utils::{AsUsize as _, SetOptionOnce as _, parse_string};
4use crate::v01::{
5    Column, ColumnType, DictionaryType, Geometry, GeometryValues, Id, IdValues, Layer01, Property,
6    RawFsstData, RawIdValue, RawPlainData, RawPresence, RawProperty, RawScalar, RawSharedDict,
7    RawSharedDictEncoding, RawSharedDictItem, RawStream, RawStrings, RawStringsEncoding,
8    StreamMeta, StreamType,
9};
10use crate::{Decoder, MltError, MltRefResult, MltResult, Parser};
11
12impl Analyze for Layer01<'_> {
13    fn collect_statistic(&self, stat: StatType) -> usize {
14        match stat {
15            StatType::DecodedMetaSize => self.name.len() + size_of::<u32>(),
16            StatType::DecodedDataSize => {
17                self.id.as_ref().map_or(0, |id| id.collect_statistic(stat))
18                    + self.geometry.collect_statistic(stat)
19                    + self.properties.collect_statistic(stat)
20            }
21            StatType::FeatureCount => self.geometry.collect_statistic(stat),
22        }
23    }
24
25    fn for_each_stream(&self, cb: &mut dyn FnMut(StreamMeta)) {
26        if let Some(ref id) = self.id {
27            id.for_each_stream(cb);
28        }
29        self.geometry.for_each_stream(cb);
30        self.properties.for_each_stream(cb);
31    }
32}
33
34impl Layer01<'_> {
35    /// Parse `v01::Layer` metadata, reserving decoded memory against the parser's budget.
36    pub fn from_bytes<'a>(input: &'a [u8], parser: &mut Parser) -> Result<Layer01<'a>, MltError> {
37        let (input, layer_name) = parse_string(input)?;
38        let (input, extent) = parse_varint::<u32>(input)?;
39        let (input, column_count) = parse_varint::<u32>(input)?;
40
41        // Each column requires at least 1 byte (column type)
42        if input.len() < column_count.as_usize() {
43            return Err(MltError::BufferUnderflow(column_count, input.len()));
44        }
45
46        // !!!!!!!
47        // WARNING: make sure to never use `let (input, ...)` after this point: input var is reused
48        let (mut input, (col_info, prop_count)) = parse_columns_meta(input, column_count, parser)?;
49        #[cfg(fuzzing)]
50        let layer_order = col_info
51            .iter()
52            .map(|column| column.typ)
53            .map(crate::frames::v01::fuzzing::LayerOrdering::from)
54            .collect();
55
56        let mut properties = Vec::with_capacity(prop_count.as_usize());
57        let mut id_column: Option<Id> = None;
58        let mut geometry: Option<Geometry> = None;
59
60        for column in col_info {
61            use crate::v01::RawProperty as RP;
62
63            let opt;
64            let value;
65            let name = column.name.unwrap_or("");
66
67            match column.typ {
68                ColumnType::Id | ColumnType::OptId => {
69                    (input, opt) = parse_optional(column.typ, input, parser)?;
70                    (input, value) = RawStream::from_bytes(input, parser)?;
71                    id_column.set_once(Id::new_raw(RawPresence(opt), RawIdValue::Id32(value)))?;
72                }
73                ColumnType::LongId | ColumnType::OptLongId => {
74                    (input, opt) = parse_optional(column.typ, input, parser)?;
75                    (input, value) = RawStream::from_bytes(input, parser)?;
76                    id_column.set_once(Id::new_raw(RawPresence(opt), RawIdValue::Id64(value)))?;
77                }
78                ColumnType::Geometry => {
79                    input = parse_geometry_column(input, &mut geometry, parser)?;
80                }
81                ColumnType::Bool | ColumnType::OptBool => {
82                    (input, opt) = parse_optional(column.typ, input, parser)?;
83                    (input, value) = RawStream::parse_bool(input, parser)?;
84                    properties.push(Property::Raw(RP::Bool(scalar(name, opt, value))));
85                }
86                ColumnType::I8 | ColumnType::OptI8 => {
87                    (input, opt) = parse_optional(column.typ, input, parser)?;
88                    (input, value) = RawStream::from_bytes(input, parser)?;
89                    properties.push(Property::Raw(RP::I8(scalar(name, opt, value))));
90                }
91                ColumnType::U8 | ColumnType::OptU8 => {
92                    (input, opt) = parse_optional(column.typ, input, parser)?;
93                    (input, value) = RawStream::from_bytes(input, parser)?;
94                    properties.push(Property::Raw(RP::U8(scalar(name, opt, value))));
95                }
96                ColumnType::I32 | ColumnType::OptI32 => {
97                    (input, opt) = parse_optional(column.typ, input, parser)?;
98                    (input, value) = RawStream::from_bytes(input, parser)?;
99                    properties.push(Property::Raw(RP::I32(scalar(name, opt, value))));
100                }
101                ColumnType::U32 | ColumnType::OptU32 => {
102                    (input, opt) = parse_optional(column.typ, input, parser)?;
103                    (input, value) = RawStream::from_bytes(input, parser)?;
104                    properties.push(Property::Raw(RP::U32(scalar(name, opt, value))));
105                }
106                ColumnType::I64 | ColumnType::OptI64 => {
107                    (input, opt) = parse_optional(column.typ, input, parser)?;
108                    (input, value) = RawStream::from_bytes(input, parser)?;
109                    properties.push(Property::Raw(RP::I64(scalar(name, opt, value))));
110                }
111                ColumnType::U64 | ColumnType::OptU64 => {
112                    (input, opt) = parse_optional(column.typ, input, parser)?;
113                    (input, value) = RawStream::from_bytes(input, parser)?;
114                    properties.push(Property::Raw(RP::U64(scalar(name, opt, value))));
115                }
116                ColumnType::F32 | ColumnType::OptF32 => {
117                    (input, opt) = parse_optional(column.typ, input, parser)?;
118                    (input, value) = RawStream::from_bytes(input, parser)?;
119                    properties.push(Property::Raw(RP::F32(scalar(name, opt, value))));
120                }
121                ColumnType::F64 | ColumnType::OptF64 => {
122                    (input, opt) = parse_optional(column.typ, input, parser)?;
123                    (input, value) = RawStream::from_bytes(input, parser)?;
124                    properties.push(Property::Raw(RP::F64(scalar(name, opt, value))));
125                }
126                ColumnType::Str | ColumnType::OptStr => {
127                    let prop;
128                    (input, prop) = parse_str_column(input, name, column.typ, parser)?;
129                    properties.push(Property::Raw(prop));
130                }
131                ColumnType::SharedDict => {
132                    let prop;
133                    (input, prop) = parse_shared_dict_column(input, &column, parser)?;
134                    properties.push(Property::Raw(prop));
135                }
136            }
137        }
138        if input.is_empty() {
139            Ok(Layer01 {
140                name: layer_name,
141                extent,
142                id: id_column,
143                geometry: geometry.ok_or(MltError::MissingGeometry)?,
144                properties,
145                #[cfg(fuzzing)]
146                layer_order,
147            })
148        } else {
149            Err(MltError::TrailingLayerData(input.len()))
150        }
151    }
152
153    /// Decode only the ID column, leaving other columns in their encoded form.
154    ///
155    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
156    pub fn decode_id(&mut self, dec: &mut Decoder) -> Result<Option<&mut IdValues>, MltError> {
157        Ok(if let Some(id) = &mut self.id {
158            Some(id.decode(dec)?)
159        } else {
160            None
161        })
162    }
163
164    /// Decode only the geometry column, leaving other columns in their encoded form.
165    ///
166    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
167    pub fn decode_geometry(&mut self, dec: &mut Decoder) -> MltResult<&mut GeometryValues> {
168        self.geometry.decode(dec)
169    }
170
171    /// Decode only the property columns, leaving other columns in their encoded form.
172    ///
173    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
174    pub fn decode_properties(&mut self, dec: &mut Decoder) -> MltResult<()> {
175        for prop in &mut self.properties {
176            prop.decode(dec)?;
177        }
178        Ok(())
179    }
180
181    pub fn decode_all(&mut self, dec: &mut Decoder) -> MltResult<()> {
182        self.decode_id(dec)?;
183        self.decode_geometry(dec)?;
184        self.decode_properties(dec)?;
185        Ok(())
186    }
187}
188
189fn parse_struct_children<'a>(
190    mut input: &'a [u8],
191    column: &Column<'a>,
192    parser: &mut Parser,
193) -> MltRefResult<'a, Vec<RawSharedDictItem<'a>>> {
194    let mut children = Vec::with_capacity(column.children.len());
195    for child in &column.children {
196        let (inp, sc) = parse_varint::<u32>(input)?;
197        let (inp, child_optional) = parse_optional(child.typ, inp, parser)?;
198        let optional_stream_count = u32::from(child_optional.is_some());
199        if let Some(data_count) = sc.checked_sub(optional_stream_count)
200            && data_count != 1
201        {
202            return Err(MltError::UnexpectedStructChildCount(data_count));
203        }
204        let (inp, child_data) = RawStream::from_bytes(inp, parser)?;
205        children.push(RawSharedDictItem {
206            name: child.name.unwrap_or(""),
207            presence: RawPresence(child_optional),
208            data: child_data,
209        });
210        input = inp;
211    }
212    Ok((input, children))
213}
214
215fn parse_optional<'a>(
216    typ: ColumnType,
217    input: &'a [u8],
218    parser: &mut Parser,
219) -> MltRefResult<'a, Option<RawStream<'a>>> {
220    if typ.is_optional() {
221        let (input, optional) = RawStream::parse_bool(input, parser)?;
222        Ok((input, Some(optional)))
223    } else {
224        Ok((input, None))
225    }
226}
227
228fn parse_geometry_column<'a>(
229    input: &'a [u8],
230    geometry: &mut Option<Geometry<'a>>,
231    parser: &mut Parser,
232) -> MltResult<&'a [u8]> {
233    let (input, stream_count) = parse_varint::<u32>(input)?;
234    if stream_count == 0 {
235        return Err(MltError::GeometryWithoutStreams);
236    }
237    // Each stream requires at least 1 byte (physical stream type)
238    let stream_count_capa = stream_count.as_usize();
239    if input.len() < stream_count_capa {
240        return Err(MltError::BufferUnderflow(stream_count, input.len()));
241    }
242    // metadata
243    let (input, value) = RawStream::from_bytes(input, parser)?;
244    // geometry items
245    let (input, value_vec) = RawStream::parse_multiple(input, stream_count_capa - 1, parser)?;
246    geometry.set_once(Geometry::new_raw(value, value_vec))?;
247    Ok(input)
248}
249
250fn parse_str_column<'a>(
251    mut input: &'a [u8],
252    name: &'a str,
253    typ: ColumnType,
254    parser: &mut Parser,
255) -> MltRefResult<'a, RawProperty<'a>> {
256    let mut stream_count = {
257        let stream_count_u32;
258        (input, stream_count_u32) = parse_varint::<u32>(input)?;
259        stream_count_u32.as_usize()
260    };
261    let presence;
262    (input, presence) = parse_optional(typ, input, parser)?;
263    if presence.is_some() {
264        if stream_count == 0 {
265            return Err(MltError::UnsupportedStringStreamCount(stream_count));
266        }
267        stream_count -= 1;
268    }
269    let mut str_streams = [None, None, None, None, None];
270    if stream_count > str_streams.len() {
271        return Err(MltError::UnsupportedStringStreamCount(stream_count));
272    }
273    for slot in str_streams.iter_mut().take(stream_count) {
274        let stream;
275        (input, stream) = RawStream::from_bytes(input, parser)?;
276        *slot = Some(stream);
277    }
278    let encoding = match str_streams {
279        [Some(s1), Some(s2), None, None, None] => {
280            RawStringsEncoding::plain(RawPlainData::new(s1, s2)?)
281        }
282        [Some(s1), Some(s2), Some(s3), None, None] => {
283            RawStringsEncoding::dictionary(RawPlainData::new(s1, s3)?, s2)?
284        }
285        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
286            RawStringsEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
287        }
288        [Some(s1), Some(s2), Some(s3), Some(s4), Some(s5)] => {
289            RawStringsEncoding::fsst_dictionary(RawFsstData::new(s1, s2, s3, s4)?, s5)?
290        }
291        _ => Err(MltError::UnsupportedStringStreamCount(stream_count))?,
292    };
293    Ok((
294        input,
295        RawProperty::Str(RawStrings {
296            name,
297            presence: RawPresence(presence),
298            encoding,
299        }),
300    ))
301}
302
303fn parse_shared_dict_column<'a>(
304    mut input: &'a [u8],
305    column: &Column<'a>,
306    parser: &mut Parser,
307) -> MltRefResult<'a, RawProperty<'a>> {
308    // Read header streams until we hit the dictionary DATA(Single|Shared) stream.
309    let stream_count;
310    (input, stream_count) = parse_varint::<u32>(input)?;
311    let mut dict_streams = [None, None, None, None, None];
312    let mut streams_taken = 0_usize;
313    while streams_taken < stream_count.as_usize() {
314        let stream;
315        (input, stream) = RawStream::from_bytes(input, parser)?;
316        let is_last = matches!(
317            stream.meta.stream_type,
318            StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
319        );
320        dict_streams[streams_taken] = Some(stream);
321        streams_taken += 1;
322        if is_last {
323            break;
324        } else if streams_taken >= dict_streams.len() {
325            return Err(MltError::UnsupportedStringStreamCount(streams_taken + 1));
326        }
327    }
328    let children;
329    (input, children) = parse_struct_children(input, column, parser)?;
330    let name = column.name.unwrap_or("");
331    let encoding = match dict_streams {
332        [Some(s1), Some(s2), None, None, None] => {
333            RawSharedDictEncoding::plain(RawPlainData::new(s1, s2)?)
334        }
335        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
336            RawSharedDictEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
337        }
338        _ => Err(MltError::SharedDictRequiresStreams(streams_taken))?,
339    };
340    Ok((
341        input,
342        RawProperty::SharedDict(RawSharedDict {
343            name,
344            encoding,
345            children,
346        }),
347    ))
348}
349
350fn parse_columns_meta<'a>(
351    mut input: &'a [u8],
352    column_count: u32,
353    parser: &mut Parser,
354) -> MltRefResult<'a, (Vec<Column<'a>>, u32)> {
355    use crate::v01::ColumnType::{Geometry, Id, LongId, OptId, OptLongId, SharedDict};
356
357    let mut col_info = Vec::with_capacity(column_count.as_usize());
358    let mut geometries = 0;
359    let mut ids = 0;
360    for _ in 0..column_count {
361        let mut typ;
362        (input, typ) = Column::from_bytes(input, parser)?;
363        match typ.typ {
364            Geometry => geometries += 1,
365            Id | OptId | LongId | OptLongId => ids += 1,
366            SharedDict => {
367                // Yes, we need to parse children right here; otherwise this messes up the next column
368                let child_column_count;
369                (input, child_column_count) = parse_varint::<u32>(input)?;
370
371                // Each column requires at least 1 byte (ColumnType without a name)
372                let child_col_capacity = child_column_count.as_usize();
373                if input.len() < child_col_capacity {
374                    return Err(MltError::BufferUnderflow(child_column_count, input.len()));
375                }
376                let mut children = Vec::with_capacity(child_col_capacity);
377                for _ in 0..child_column_count {
378                    let child;
379                    (input, child) = Column::from_bytes(input, parser)?;
380                    children.push(child);
381                }
382                typ.children = children;
383            }
384            _ => {}
385        }
386        col_info.push(typ);
387    }
388    if geometries > 1 {
389        return Err(MltError::MultipleGeometryColumns);
390    }
391    if ids > 1 {
392        return Err(MltError::MultipleIdColumns);
393    }
394
395    Ok((input, (col_info, column_count - geometries - ids)))
396}
397
398fn scalar<'a>(name: &'a str, opt: Option<RawStream<'a>>, value: RawStream<'a>) -> RawScalar<'a> {
399    RawScalar {
400        name,
401        presence: RawPresence(opt),
402        data: value,
403    }
404}