Skip to main content

mlt_core/decoder/
root.rs

1use crate::LazyParsed::Raw;
2use crate::MltError::{
3    BufferUnderflow, GeometryWithoutStreams, InvalidSharedDictStreamCount, MissingGeometry,
4    MultipleGeometryColumns, MultipleIdColumns, SharedDictRequiresStreams, TrailingLayerData,
5    UnexpectedStructChildCount, UnsupportedStringStreamCount,
6};
7use crate::codecs::varint::parse_varint;
8use crate::decoder::{
9    Column, ColumnType, DictionaryType, Geometry, GeometryValues, Id, IdValues, Layer01,
10    ParsedLayer01, RawFsstData, RawGeometry, RawId, RawIdValue, RawPlainData, RawPresence,
11    RawProperty, RawScalar, RawSharedDict, RawSharedDictEncoding, RawSharedDictItem, RawStream,
12    RawStrings, RawStringsEncoding, StreamType,
13};
14use crate::errors::AsMltError as _;
15use crate::utils::{AsUsize as _, SetOptionOnce as _, parse_string};
16use crate::{Layer, Lazy, MltError, MltRefResult, MltResult, ParsedLayer};
17
18/// Default memory budget: 20 MiB.
19const DEFAULT_MAX_BYTES: u32 = 20 * 1024 * 1024;
20
21/// Stateful decoder that enforces a per-tile memory budget during decoding.
22///
23/// Pass a `Decoder` to every `raw.decode()` / `into_tile()` call and to
24/// `from_bytes`-style parsers. Each method charges the budget before
25/// performing heap allocations, so the total heap used never exceeds `max_bytes`
26/// (in bytes).
27///
28/// ```
29/// use mlt_core::Decoder;
30///
31/// // Default: 10 MiB budget.
32/// let mut dec = Decoder::default();
33///
34/// // Custom budget.
35/// let mut dec = Decoder::with_max_size(64 * 1024 * 1024);
36/// ```
37#[derive(Debug, Clone, PartialEq, Eq, Default)]
38pub struct Decoder {
39    /// Keep track of the memory used when decoding a tile: raw->parsed transition
40    budget: MemBudget,
41    /// Reusable scratch buffer for the physical u32 decode pass.
42    /// Held here so its heap allocation is reused across streams without extra cost.
43    pub(crate) buffer_u32: Vec<u32>,
44    /// Reusable scratch buffer for the physical u64 decode pass.
45    /// Held here so its heap allocation is reused across streams without extra cost.
46    pub(crate) buffer_u64: Vec<u64>,
47}
48
49impl Decoder {
50    /// Create a decoder with a custom memory budget (in bytes).
51    #[must_use]
52    pub fn with_max_size(max_bytes: u32) -> Self {
53        Self {
54            budget: MemBudget::with_max_size(max_bytes),
55            ..Default::default()
56        }
57    }
58
59    pub fn decode_all<'a>(
60        &mut self,
61        layers: impl IntoIterator<Item = Layer<'a>>,
62    ) -> MltResult<Vec<ParsedLayer<'a>>> {
63        layers
64            .into_iter()
65            .map(|l| l.decode_all(self))
66            .collect::<MltResult<_>>()
67    }
68
69    /// Allocate a `Vec<T>` with the given capacity, charging the decoder's budget for
70    /// `capacity * size_of::<T>()` bytes. Use this instead of `Vec::with_capacity` in decode paths.
71    #[inline]
72    pub(crate) fn alloc<T>(&mut self, capacity: usize) -> MltResult<Vec<T>> {
73        let bytes = capacity.checked_mul(size_of::<T>()).or_overflow()?;
74        let bytes_u32 = u32::try_from(bytes).or_overflow()?;
75        self.budget.consume(bytes_u32)?;
76        Ok(Vec::with_capacity(capacity))
77    }
78
79    /// Charge the budget for `size` raw bytes. Prefer [`consume_items`][Self::consume_items]
80    /// when charging for a known-type collection.
81    #[inline]
82    pub(crate) fn consume(&mut self, size: u32) -> MltResult<()> {
83        self.budget.consume(size)
84    }
85
86    /// Charge the budget for `count` items of type `T` (`count * size_of::<T>()` bytes).
87    #[inline]
88    pub(crate) fn consume_items<T>(&mut self, count: usize) -> MltResult<()> {
89        let bytes = count.checked_mul(size_of::<T>()).or_overflow()?;
90        self.budget.consume(u32::try_from(bytes).or_overflow()?)
91    }
92
93    #[inline]
94    pub(crate) fn adjust(&mut self, adjustment: u32) {
95        self.budget.adjust(adjustment);
96    }
97
98    /// Return the unused portion of a pre-charged allocation budget.
99    ///
100    /// Call this after fully populating a `Vec<T>` that was pre-allocated with [`Decoder::alloc`],
101    /// passing the same `alloc_size` that was given to `alloc`.
102    ///
103    /// Returns an error if the vector grew beyond `alloc_size` (malformed input caused more items
104    /// than declared). Subtracts `(alloc_size - buf.len()) * size_of::<T>()` from the budget.
105    #[inline]
106    pub(crate) fn adjust_alloc<T>(&mut self, buf: &[T], alloc_size: usize) -> MltResult<()> {
107        if buf.len() > alloc_size {
108            return Err(MltError::InvalidDecodingStreamSize(buf.len(), alloc_size));
109        }
110        // Return the unused portion of the pre-charged budget.
111        let unused = (alloc_size - buf.len()) * size_of::<T>();
112        // unused fits in u32: it's at most alloc_size * size_of::<T>(), which was checked to fit
113        // in u32 when alloc() was called. Using saturating_cast to avoid a fallible conversion.
114        #[expect(
115            clippy::cast_possible_truncation,
116            reason = "unused <= alloc_size * size_of::<T>() which was verified to fit in u32 by alloc()"
117        )]
118        self.budget.adjust(unused as u32);
119        Ok(())
120    }
121
122    #[must_use]
123    pub fn consumed(&self) -> u32 {
124        self.budget.consumed()
125    }
126
127    /// Reset the memory budget to zero, keeping scratch buffers allocated.
128    ///
129    /// Call this between tiles when reusing a single `Decoder` for multiple
130    /// decodes — the per-tile budget is enforced fresh, but the internal
131    /// `buffer_u32` / `buffer_u64` scratch space is retained so it doesn't
132    /// need to be re-allocated.
133    ///
134    /// # Safety / correctness precondition
135    ///
136    /// Only call this after dropping any decoded allocations returned from the
137    /// previous tile. Resetting the budget while earlier decoded outputs are
138    /// still alive makes the budget enforceable only per-tile and can bypass
139    /// the stronger guarantee that total live heap tracked by this decoder
140    /// never exceeds the configured maximum.
141    pub fn reset_budget(&mut self) {
142        self.budget.reset();
143    }
144}
145
146impl MemBudget {
147    /// Reset tracked usage for a new decode window.
148    ///
149    /// Callers must ensure that allocations accounted for by the previous
150    /// window are no longer live before resetting.
151    fn reset(&mut self) {
152        self.bytes_used = 0;
153    }
154}
155/// Stateful parser that enforces a memory budget during parsing (binary → raw structures).
156///
157/// The parse chain reserves memory before allocations so total heap stays within the limit.
158///
159/// ```
160/// use mlt_core::Parser;
161///
162/// # let bytes: &[u8] = &[];
163/// let mut parser = Parser::default();
164/// let layers = parser.parse_layers(bytes).expect("parse");
165///
166/// // Or with a custom limit:
167/// let mut parser = Parser::with_max_size(64 * 1024 * 1024);
168/// ```
169#[derive(Debug, Clone, PartialEq, Eq, Default)]
170pub struct Parser {
171    budget: MemBudget,
172}
173
174impl Parser {
175    /// Create a parser with a custom memory budget (in bytes).
176    #[must_use]
177    pub fn with_max_size(max_bytes: u32) -> Self {
178        Self {
179            budget: MemBudget::with_max_size(max_bytes),
180        }
181    }
182
183    /// Parse a sequence of binary layers, reserving decoded memory against this parser's budget.
184    pub fn parse_layers<'a>(&mut self, mut input: &'a [u8]) -> MltResult<Vec<Layer<'a>>> {
185        let mut result = Vec::new();
186        while !input.is_empty() {
187            let layer;
188            (input, layer) = Layer::from_bytes(input, self)?;
189            result.push(layer);
190        }
191        Ok(result)
192    }
193
194    /// Reserve `size` bytes from the parse budget. Used internally by the parse chain.
195    #[inline]
196    pub(crate) fn reserve(&mut self, size: u32) -> MltResult<()> {
197        self.budget.consume(size)
198    }
199
200    #[must_use]
201    pub fn reserved(&self) -> u32 {
202        self.budget.consumed()
203    }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct MemBudget {
208    /// Hard ceiling: total decoded bytes may not exceed this value.
209    pub max_bytes: u32,
210    /// Running total of used bytes so far.
211    pub bytes_used: u32,
212}
213
214impl Default for MemBudget {
215    /// Create a decoder with the default 10 MiB memory budget.
216    fn default() -> Self {
217        Self::with_max_size(DEFAULT_MAX_BYTES)
218    }
219}
220
221impl MemBudget {
222    /// Create a decoder with a custom memory budget (in bytes).
223    #[must_use]
224    fn with_max_size(max_bytes: u32) -> Self {
225        Self {
226            max_bytes,
227            bytes_used: 0,
228        }
229    }
230
231    /// Adjust previous consumption by `- adjustment` bytes.  Will panic if used incorrectly.
232    #[inline]
233    fn adjust(&mut self, adjustment: u32) {
234        self.bytes_used = self.bytes_used.checked_sub(adjustment).unwrap();
235    }
236
237    /// Take `size` bytes from the allocation budget. Call this before the actual allocation.
238    #[inline]
239    fn consume(&mut self, size: u32) -> MltResult<()> {
240        let accumulator = &mut self.bytes_used;
241        let max_bytes = self.max_bytes;
242        if let Some(new_value) = accumulator
243            .checked_add(size)
244            .and_then(|v| if v > max_bytes { None } else { Some(v) })
245        {
246            *accumulator = new_value;
247            Ok(())
248        } else {
249            Err(MltError::MemoryLimitExceeded {
250                limit: max_bytes,
251                used: *accumulator,
252                requested: size,
253            })
254        }
255    }
256
257    fn consumed(&self) -> u32 {
258        self.bytes_used
259    }
260}
261
262impl<'a> Layer01<'a, Lazy> {
263    /// Parse `v01::Layer` metadata, reserving decoded memory against the parser's budget.
264    pub fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltResult<Self> {
265        let (input, layer_name) = parse_string(input)?;
266        let (input, extent) = parse_varint::<u32>(input)?;
267        let (input, column_count) = parse_varint::<u32>(input)?;
268
269        // Each column requires at least 1 byte (column type)
270        if input.len() < column_count.as_usize() {
271            return Err(BufferUnderflow(column_count, input.len()));
272        }
273
274        // !!!!!!!
275        // WARNING: make sure to never use `let (input, ...)` after this point: input var is reused
276        let (mut input, (col_info, prop_count)) = parse_columns_meta(input, column_count, parser)?;
277        #[cfg(fuzzing)]
278        let layer_order = col_info
279            .iter()
280            .map(|column| column.typ)
281            .map(crate::decoder::fuzzing::LayerOrdering::from)
282            .collect();
283
284        let mut properties = Vec::with_capacity(prop_count.as_usize());
285        let mut id_column: Option<Id> = None;
286        let mut geometry: Option<Geometry> = None;
287
288        for column in col_info {
289            use crate::decoder::RawProperty as RP;
290
291            let opt;
292            let value;
293            let name = column.name.unwrap_or("");
294
295            match column.typ {
296                ColumnType::Id | ColumnType::OptId => {
297                    (input, opt) = parse_optional(column.typ, input, parser)?;
298                    (input, value) = RawStream::from_bytes(input, parser)?;
299                    id_column.set_once(Raw(RawId {
300                        presence: RawPresence(opt),
301                        value: RawIdValue::Id32(value),
302                    }))?;
303                }
304                ColumnType::LongId | ColumnType::OptLongId => {
305                    (input, opt) = parse_optional(column.typ, input, parser)?;
306                    (input, value) = RawStream::from_bytes(input, parser)?;
307                    id_column.set_once(Raw(RawId {
308                        presence: RawPresence(opt),
309                        value: RawIdValue::Id64(value),
310                    }))?;
311                }
312                ColumnType::Geometry => {
313                    input = parse_geometry_column(input, &mut geometry, parser)?;
314                }
315                ColumnType::Bool | ColumnType::OptBool => {
316                    (input, opt) = parse_optional(column.typ, input, parser)?;
317                    (input, value) = RawStream::parse_bool(input, parser)?;
318                    properties.push(Raw(RP::Bool(scalar(name, opt, value))));
319                }
320                ColumnType::I8 | ColumnType::OptI8 => {
321                    (input, opt) = parse_optional(column.typ, input, parser)?;
322                    (input, value) = RawStream::from_bytes(input, parser)?;
323                    properties.push(Raw(RP::I8(scalar(name, opt, value))));
324                }
325                ColumnType::U8 | ColumnType::OptU8 => {
326                    (input, opt) = parse_optional(column.typ, input, parser)?;
327                    (input, value) = RawStream::from_bytes(input, parser)?;
328                    properties.push(Raw(RP::U8(scalar(name, opt, value))));
329                }
330                ColumnType::I32 | ColumnType::OptI32 => {
331                    (input, opt) = parse_optional(column.typ, input, parser)?;
332                    (input, value) = RawStream::from_bytes(input, parser)?;
333                    properties.push(Raw(RP::I32(scalar(name, opt, value))));
334                }
335                ColumnType::U32 | ColumnType::OptU32 => {
336                    (input, opt) = parse_optional(column.typ, input, parser)?;
337                    (input, value) = RawStream::from_bytes(input, parser)?;
338                    properties.push(Raw(RP::U32(scalar(name, opt, value))));
339                }
340                ColumnType::I64 | ColumnType::OptI64 => {
341                    (input, opt) = parse_optional(column.typ, input, parser)?;
342                    (input, value) = RawStream::from_bytes(input, parser)?;
343                    properties.push(Raw(RP::I64(scalar(name, opt, value))));
344                }
345                ColumnType::U64 | ColumnType::OptU64 => {
346                    (input, opt) = parse_optional(column.typ, input, parser)?;
347                    (input, value) = RawStream::from_bytes(input, parser)?;
348                    properties.push(Raw(RP::U64(scalar(name, opt, value))));
349                }
350                ColumnType::F32 | ColumnType::OptF32 => {
351                    (input, opt) = parse_optional(column.typ, input, parser)?;
352                    (input, value) = RawStream::from_bytes(input, parser)?;
353                    properties.push(Raw(RP::F32(scalar(name, opt, value))));
354                }
355                ColumnType::F64 | ColumnType::OptF64 => {
356                    (input, opt) = parse_optional(column.typ, input, parser)?;
357                    (input, value) = RawStream::from_bytes(input, parser)?;
358                    properties.push(Raw(RP::F64(scalar(name, opt, value))));
359                }
360                ColumnType::Str | ColumnType::OptStr => {
361                    let prop;
362                    (input, prop) = parse_str_column(input, name, column.typ, parser)?;
363                    properties.push(Raw(prop));
364                }
365                ColumnType::SharedDict => {
366                    let prop;
367                    (input, prop) = parse_shared_dict_column(input, &column, parser)?;
368                    properties.push(Raw(prop));
369                }
370            }
371        }
372        if input.is_empty() {
373            Ok(Layer01 {
374                name: layer_name,
375                extent,
376                id: id_column,
377                geometry: geometry.ok_or(MissingGeometry)?,
378                properties,
379                #[cfg(fuzzing)]
380                layer_order,
381            })
382        } else {
383            Err(TrailingLayerData(input.len()))
384        }
385    }
386
387    /// Decode only the ID column, leaving other columns in their encoded form.
388    ///
389    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
390    pub fn decode_id(&mut self, dec: &mut Decoder) -> MltResult<Option<&mut IdValues>> {
391        Ok(if let Some(id) = &mut self.id {
392            Some(id.decode(dec)?)
393        } else {
394            None
395        })
396    }
397
398    /// Decode only the geometry column, leaving other columns in their encoded form.
399    ///
400    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
401    pub fn decode_geometry(&mut self, dec: &mut Decoder) -> MltResult<&mut GeometryValues> {
402        self.geometry.decode(dec)
403    }
404
405    /// Decode only the property columns, leaving other columns in their encoded form.
406    ///
407    /// Use this instead of [`Self::decode_all`] when other columns will be accessed lazily.
408    pub fn decode_properties(&mut self, dec: &mut Decoder) -> MltResult<()> {
409        for prop in &mut self.properties {
410            prop.decode(dec)?;
411        }
412        Ok(())
413    }
414
415    /// Decode all columns and transition to [`Layer01<Parsed>`].
416    ///
417    /// Consumes `self` (a `Layer01<Lazy>`) and returns a `Layer01<Parsed>` where every
418    /// column field holds its parsed value directly, enabling infallible readonly access.
419    pub fn decode_all(self, dec: &mut Decoder) -> MltResult<ParsedLayer01<'a>> {
420        Ok(Layer01 {
421            name: self.name,
422            extent: self.extent,
423            id: self.id.map(|id| id.into_parsed(dec)).transpose()?,
424            geometry: self.geometry.into_parsed(dec)?,
425            properties: self
426                .properties
427                .into_iter()
428                .map(|p| p.into_parsed(dec))
429                .collect::<MltResult<Vec<_>>>()?,
430            #[cfg(fuzzing)]
431            layer_order: self.layer_order,
432        })
433    }
434}
435
436fn parse_struct_children<'a>(
437    mut input: &'a [u8],
438    column: &Column<'a>,
439    parser: &mut Parser,
440) -> MltRefResult<'a, Vec<RawSharedDictItem<'a>>> {
441    let mut children = Vec::with_capacity(column.children.len());
442    for child in &column.children {
443        let (inp, sc) = parse_varint::<u32>(input)?;
444        let (inp, child_optional) = parse_optional(child.typ, inp, parser)?;
445        let optional_stream_count = u32::from(child_optional.is_some());
446        if let Some(data_count) = sc.checked_sub(optional_stream_count)
447            && data_count != 1
448        {
449            return Err(UnexpectedStructChildCount(data_count));
450        }
451        let (inp, child_data) = RawStream::from_bytes(inp, parser)?;
452        children.push(RawSharedDictItem {
453            name: child.name.unwrap_or(""),
454            presence: RawPresence(child_optional),
455            data: child_data,
456        });
457        input = inp;
458    }
459    Ok((input, children))
460}
461
462fn parse_optional<'a>(
463    typ: ColumnType,
464    input: &'a [u8],
465    parser: &mut Parser,
466) -> MltRefResult<'a, Option<RawStream<'a>>> {
467    if typ.is_optional() {
468        let (input, optional) = RawStream::parse_bool(input, parser)?;
469        Ok((input, Some(optional)))
470    } else {
471        Ok((input, None))
472    }
473}
474
475fn parse_geometry_column<'a>(
476    input: &'a [u8],
477    geometry: &mut Option<Geometry<'a>>,
478    parser: &mut Parser,
479) -> MltResult<&'a [u8]> {
480    let (input, stream_count) = parse_varint::<u32>(input)?;
481    if stream_count == 0 {
482        return Err(GeometryWithoutStreams);
483    }
484    // Each stream requires at least 1 byte (physical stream type)
485    let stream_count_capa = stream_count.as_usize();
486    if input.len() < stream_count_capa {
487        return Err(BufferUnderflow(stream_count, input.len()));
488    }
489    // metadata
490    let (input, meta) = RawStream::from_bytes(input, parser)?;
491    // geometry items
492    let (input, items) = RawStream::parse_multiple(input, stream_count_capa - 1, parser)?;
493    geometry.set_once(Raw(RawGeometry { meta, items }))?;
494    Ok(input)
495}
496
497fn parse_str_column<'a>(
498    mut input: &'a [u8],
499    name: &'a str,
500    typ: ColumnType,
501    parser: &mut Parser,
502) -> MltRefResult<'a, RawProperty<'a>> {
503    let mut stream_count = {
504        let stream_count_u32;
505        (input, stream_count_u32) = parse_varint::<u32>(input)?;
506        stream_count_u32.as_usize()
507    };
508    let presence;
509    (input, presence) = parse_optional(typ, input, parser)?;
510    if presence.is_some() {
511        if stream_count == 0 {
512            return Err(UnsupportedStringStreamCount(stream_count));
513        }
514        stream_count -= 1;
515    }
516    let mut str_streams = [None, None, None, None, None];
517    if stream_count > str_streams.len() {
518        return Err(UnsupportedStringStreamCount(stream_count));
519    }
520    for slot in str_streams.iter_mut().take(stream_count) {
521        let stream;
522        (input, stream) = RawStream::from_bytes(input, parser)?;
523        *slot = Some(stream);
524    }
525    let encoding = match str_streams {
526        [Some(s1), Some(s2), None, None, None] => {
527            RawStringsEncoding::plain(RawPlainData::new(s1, s2)?)
528        }
529        [Some(s1), Some(s2), Some(s3), None, None] => {
530            RawStringsEncoding::dictionary(RawPlainData::new(s1, s3)?, s2)?
531        }
532        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
533            RawStringsEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
534        }
535        [Some(s1), Some(s2), Some(s3), Some(s4), Some(s5)] => {
536            RawStringsEncoding::fsst_dictionary(RawFsstData::new(s1, s2, s3, s4)?, s5)?
537        }
538        _ => Err(UnsupportedStringStreamCount(stream_count))?,
539    };
540    Ok((
541        input,
542        RawProperty::Str(RawStrings {
543            name,
544            presence: RawPresence(presence),
545            encoding,
546        }),
547    ))
548}
549
550fn parse_shared_dict_column<'a>(
551    mut input: &'a [u8],
552    column: &Column<'a>,
553    parser: &mut Parser,
554) -> MltRefResult<'a, RawProperty<'a>> {
555    // Read header streams until we hit the dictionary DATA(Single|Shared) stream.
556    let stream_count;
557    (input, stream_count) = parse_varint::<u32>(input)?;
558    let mut dict_streams = [None, None, None, None, None];
559    let mut streams_taken = 0_usize;
560    while streams_taken < stream_count.as_usize() {
561        let stream;
562        (input, stream) = RawStream::from_bytes(input, parser)?;
563        let is_last = matches!(
564            stream.meta.stream_type,
565            StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
566        );
567        dict_streams[streams_taken] = Some(stream);
568        streams_taken += 1;
569        if is_last {
570            break;
571        } else if streams_taken >= dict_streams.len() {
572            return Err(UnsupportedStringStreamCount(streams_taken + 1));
573        }
574    }
575    let children;
576    (input, children) = parse_struct_children(input, column, parser)?;
577
578    // Validate stream_count: must equal dict_streams + children + optional_children.
579    let children_n = u32::try_from(children.len()).or_overflow()?;
580    let optional_n = children
581        .iter()
582        .filter(|c| c.presence.0.is_some())
583        .count()
584        .try_into()
585        .or_overflow()?;
586    let dict_n = u32::try_from(streams_taken).or_overflow()?;
587    let expected = crate::utils::checked_sum3(dict_n, children_n, optional_n)?;
588    // Java's encoder had a bug (fixed) that overcounted by 1: dict + 2*N + 1.
589    // Accept that value too so that files produced by older Java encoders still parse.
590    let java_legacy = expected.checked_add(1).or_overflow()?;
591    if stream_count != expected && stream_count != java_legacy {
592        return Err(InvalidSharedDictStreamCount {
593            actual: stream_count,
594            expected,
595        });
596    }
597
598    let name = column.name.unwrap_or("");
599    let encoding = match dict_streams {
600        [Some(s1), Some(s2), None, None, None] => {
601            RawSharedDictEncoding::plain(RawPlainData::new(s1, s2)?)
602        }
603        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
604            RawSharedDictEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
605        }
606        _ => Err(SharedDictRequiresStreams(streams_taken))?,
607    };
608    Ok((
609        input,
610        RawProperty::SharedDict(RawSharedDict {
611            name,
612            encoding,
613            children,
614        }),
615    ))
616}
617
618fn parse_columns_meta<'a>(
619    mut input: &'a [u8],
620    column_count: u32,
621    parser: &mut Parser,
622) -> MltRefResult<'a, (Vec<Column<'a>>, u32)> {
623    use crate::decoder::ColumnType::{Geometry, Id, LongId, OptId, OptLongId, SharedDict};
624
625    let mut col_info = Vec::with_capacity(column_count.as_usize());
626    let mut geometries = 0;
627    let mut ids = 0;
628    for _ in 0..column_count {
629        let mut typ;
630        (input, typ) = Column::from_bytes(input, parser)?;
631        match typ.typ {
632            Geometry => geometries += 1,
633            Id | OptId | LongId | OptLongId => ids += 1,
634            SharedDict => {
635                // Yes, we need to parse children right here; otherwise this messes up the next column
636                let child_column_count;
637                (input, child_column_count) = parse_varint::<u32>(input)?;
638
639                // Each column requires at least 1 byte (ColumnType without a name)
640                let child_col_capacity = child_column_count.as_usize();
641                if input.len() < child_col_capacity {
642                    return Err(BufferUnderflow(child_column_count, input.len()));
643                }
644                let mut children = Vec::with_capacity(child_col_capacity);
645                for _ in 0..child_column_count {
646                    let child;
647                    (input, child) = Column::from_bytes(input, parser)?;
648                    children.push(child);
649                }
650                typ.children = children;
651            }
652            _ => {}
653        }
654        col_info.push(typ);
655    }
656    if geometries > 1 {
657        return Err(MultipleGeometryColumns);
658    }
659    if ids > 1 {
660        return Err(MultipleIdColumns);
661    }
662
663    Ok((input, (col_info, column_count - geometries - ids)))
664}
665
666fn scalar<'a>(name: &'a str, opt: Option<RawStream<'a>>, value: RawStream<'a>) -> RawScalar<'a> {
667    RawScalar {
668        name,
669        presence: RawPresence(opt),
670        data: value,
671    }
672}