Skip to main content

mlt_core/decoder/
root.rs

1use crate::LazyParsed::Raw;
2use crate::MltError::{
3    BufferUnderflow, GeometryWithoutStreams, InvalidSharedDictStreamCount, MissingGeometry,
4    MultipleGeometryColumns, MultipleIdColumns, SharedDictRequiresStreams, TrailingLayerData,
5    UnexpectedStructChildCount, UnsupportedStringStreamCount,
6};
7use crate::codecs::varint::parse_varint;
8use crate::decoder::{
9    Column, ColumnType, DictionaryType, Geometry, Id, Layer01, ParsedLayer01, RawFsstData,
10    RawGeometry, RawId, RawIdValue, RawPlainData, RawPresence, RawProperty, RawScalar,
11    RawSharedDict, RawSharedDictEncoding, RawSharedDictItem, RawStream, RawStrings,
12    RawStringsEncoding, StreamType,
13};
14use crate::errors::AsMltError as _;
15use crate::utils::{AsUsize as _, SetOptionOnce as _, parse_string};
16use crate::{Layer, Lazy, MltError, MltRefResult, MltResult, ParsedLayer};
17
18/// Default memory budget: 20 MiB.
19const DEFAULT_MAX_BYTES: u32 = 20 * 1024 * 1024;
20
21/// Stateful decoder that enforces a per-tile memory budget during decoding.
22///
23/// Pass a `Decoder` to every `raw.decode()` / `into_tile()` call and to
24/// `from_bytes`-style parsers. Each method charges the budget before
25/// performing heap allocations, so the total heap used never exceeds `max_bytes`
26/// (in bytes).
27///
28/// ```
29/// use mlt_core::Decoder;
30///
31/// // Default: 10 MiB budget.
32/// let mut dec = Decoder::default();
33///
34/// // Custom budget.
35/// let mut dec = Decoder::with_max_size(64 * 1024 * 1024);
36/// ```
37#[derive(Debug, Clone, PartialEq, Eq, Default)]
38pub struct Decoder {
39    /// Keep track of the memory used when decoding a tile: raw->parsed transition
40    budget: MemBudget,
41    /// Reusable scratch buffer for the physical u32 decode pass.
42    /// Held here so its heap allocation is reused across streams without extra cost.
43    pub(crate) buffer_u32: Vec<u32>,
44    /// Reusable scratch buffer for the physical u64 decode pass.
45    /// Held here so its heap allocation is reused across streams without extra cost.
46    pub(crate) buffer_u64: Vec<u64>,
47}
48
49impl Decoder {
50    /// Create a decoder with a custom memory budget (in bytes).
51    #[must_use]
52    pub fn with_max_size(max_bytes: u32) -> Self {
53        Self {
54            budget: MemBudget::with_max_size(max_bytes),
55            ..Default::default()
56        }
57    }
58
59    pub fn decode_all<'a>(
60        &mut self,
61        layers: impl IntoIterator<Item = Layer<'a>>,
62    ) -> MltResult<Vec<ParsedLayer<'a>>> {
63        layers
64            .into_iter()
65            .map(|l| l.decode_all(self))
66            .collect::<MltResult<_>>()
67    }
68
69    /// Allocate a `Vec<T>` with the given capacity, charging the decoder's budget for
70    /// `capacity * size_of::<T>()` bytes. Use this instead of `Vec::with_capacity` in decode paths.
71    #[inline]
72    pub(crate) fn alloc<T>(&mut self, capacity: usize) -> MltResult<Vec<T>> {
73        let bytes = capacity.checked_mul(size_of::<T>()).or_overflow()?;
74        let bytes_u32 = u32::try_from(bytes).or_overflow()?;
75        self.budget.consume(bytes_u32)?;
76        Ok(Vec::with_capacity(capacity))
77    }
78
79    /// Charge the budget for `size` raw bytes. Prefer [`consume_items`][Self::consume_items]
80    /// when charging for a known-type collection.
81    #[inline]
82    pub(crate) fn consume(&mut self, size: u32) -> MltResult<()> {
83        self.budget.consume(size)
84    }
85
86    /// Charge the budget for `count` items of type `T` (`count * size_of::<T>()` bytes).
87    #[inline]
88    pub(crate) fn consume_items<T>(&mut self, count: usize) -> MltResult<()> {
89        let bytes = count.checked_mul(size_of::<T>()).or_overflow()?;
90        self.budget.consume(u32::try_from(bytes).or_overflow()?)
91    }
92
93    #[inline]
94    pub(crate) fn adjust(&mut self, adjustment: u32) {
95        self.budget.adjust(adjustment);
96    }
97
98    /// Return the unused portion of a pre-charged allocation budget.
99    ///
100    /// Call this after fully populating a `Vec<T>` that was pre-allocated with [`Decoder::alloc`],
101    /// passing the same `alloc_size` that was given to `alloc`.
102    ///
103    /// Returns an error if the vector grew beyond `alloc_size` (malformed input caused more items
104    /// than declared). Subtracts `(alloc_size - buf.len()) * size_of::<T>()` from the budget.
105    #[inline]
106    pub(crate) fn adjust_alloc<T>(&mut self, buf: &[T], alloc_size: usize) -> MltResult<()> {
107        if buf.len() > alloc_size {
108            return Err(MltError::InvalidDecodingStreamSize(buf.len(), alloc_size));
109        }
110        // Return the unused portion of the pre-charged budget.
111        let unused = (alloc_size - buf.len()) * size_of::<T>();
112        // unused fits in u32: it's at most alloc_size * size_of::<T>(), which was checked to fit
113        // in u32 when alloc() was called. Using saturating_cast to avoid a fallible conversion.
114        #[expect(
115            clippy::cast_possible_truncation,
116            reason = "unused <= alloc_size * size_of::<T>() which was verified to fit in u32 by alloc()"
117        )]
118        self.budget.adjust(unused as u32);
119        Ok(())
120    }
121
122    #[must_use]
123    pub fn consumed(&self) -> u32 {
124        self.budget.consumed()
125    }
126
127    /// Reset the memory budget to zero, keeping scratch buffers allocated.
128    ///
129    /// Call this between tiles when reusing a single `Decoder` for multiple
130    /// decodes — the per-tile budget is enforced fresh, but the internal
131    /// `buffer_u32` / `buffer_u64` scratch space is retained so it doesn't
132    /// need to be re-allocated.
133    ///
134    /// # Safety / correctness precondition
135    ///
136    /// Only call this after dropping any decoded allocations returned from the
137    /// previous tile. Resetting the budget while earlier decoded outputs are
138    /// still alive makes the budget enforceable only per-tile and can bypass
139    /// the stronger guarantee that total live heap tracked by this decoder
140    /// never exceeds the configured maximum.
141    pub fn reset_budget(&mut self) {
142        self.budget.reset();
143    }
144}
145
146impl MemBudget {
147    /// Reset tracked usage for a new decode window.
148    ///
149    /// Callers must ensure that allocations accounted for by the previous
150    /// window are no longer live before resetting.
151    fn reset(&mut self) {
152        self.bytes_used = 0;
153    }
154}
155/// Stateful parser that enforces a memory budget during parsing (binary → raw structures).
156///
157/// The parse chain reserves memory before allocations so total heap stays within the limit.
158///
159/// ```
160/// use mlt_core::Parser;
161///
162/// # let bytes: &[u8] = &[];
163/// let mut parser = Parser::default();
164/// let layers = parser.parse_layers(bytes).expect("parse");
165///
166/// // Or with a custom limit:
167/// let mut parser = Parser::with_max_size(64 * 1024 * 1024);
168/// ```
169#[derive(Debug, Clone, PartialEq, Eq, Default)]
170pub struct Parser {
171    budget: MemBudget,
172}
173
174impl Parser {
175    /// Create a parser with a custom memory budget (in bytes).
176    #[must_use]
177    pub fn with_max_size(max_bytes: u32) -> Self {
178        Self {
179            budget: MemBudget::with_max_size(max_bytes),
180        }
181    }
182
183    /// Parse a sequence of binary layers, reserving decoded memory against this parser's budget.
184    pub fn parse_layers<'a>(&mut self, mut input: &'a [u8]) -> MltResult<Vec<Layer<'a>>> {
185        let mut result = Vec::new();
186        while !input.is_empty() {
187            let layer;
188            (input, layer) = Layer::from_bytes(input, self)?;
189            result.push(layer);
190        }
191        Ok(result)
192    }
193
194    /// Reserve `size` bytes from the parse budget. Used internally by the parse chain.
195    #[inline]
196    pub(crate) fn reserve(&mut self, size: u32) -> MltResult<()> {
197        self.budget.consume(size)
198    }
199
200    #[must_use]
201    pub fn reserved(&self) -> u32 {
202        self.budget.consumed()
203    }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct MemBudget {
208    /// Hard ceiling: total decoded bytes may not exceed this value.
209    pub max_bytes: u32,
210    /// Running total of used bytes so far.
211    pub bytes_used: u32,
212}
213
214impl Default for MemBudget {
215    /// Create a decoder with the default 10 MiB memory budget.
216    fn default() -> Self {
217        Self::with_max_size(DEFAULT_MAX_BYTES)
218    }
219}
220
221impl MemBudget {
222    /// Create a decoder with a custom memory budget (in bytes).
223    #[must_use]
224    fn with_max_size(max_bytes: u32) -> Self {
225        Self {
226            max_bytes,
227            bytes_used: 0,
228        }
229    }
230
231    /// Adjust previous consumption by `- adjustment` bytes.  Will panic if used incorrectly.
232    #[inline]
233    fn adjust(&mut self, adjustment: u32) {
234        self.bytes_used = self.bytes_used.checked_sub(adjustment).unwrap();
235    }
236
237    /// Take `size` bytes from the allocation budget. Call this before the actual allocation.
238    #[inline]
239    fn consume(&mut self, size: u32) -> MltResult<()> {
240        let accumulator = &mut self.bytes_used;
241        let max_bytes = self.max_bytes;
242        if let Some(new_value) = accumulator
243            .checked_add(size)
244            .and_then(|v| if v > max_bytes { None } else { Some(v) })
245        {
246            *accumulator = new_value;
247            Ok(())
248        } else {
249            Err(MltError::MemoryLimitExceeded {
250                limit: max_bytes,
251                used: *accumulator,
252                requested: size,
253            })
254        }
255    }
256
257    fn consumed(&self) -> u32 {
258        self.bytes_used
259    }
260}
261
262impl<'a> Layer01<'a, Lazy> {
263    /// Parse `v01::Layer` metadata, reserving decoded memory against the parser's budget.
264    pub(crate) fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltResult<Self> {
265        let (input, layer_name) = parse_string(input)?;
266        let (input, extent) = parse_varint::<u32>(input)?;
267        let (input, column_count) = parse_varint::<u32>(input)?;
268
269        // Each column requires at least 1 byte (column type)
270        if input.len() < column_count.as_usize() {
271            return Err(BufferUnderflow(column_count, input.len()));
272        }
273
274        // !!!!!!!
275        // WARNING: make sure to never use `let (input, ...)` after this point: input var is reused
276        let (mut input, (col_info, prop_count)) = parse_columns_meta(input, column_count, parser)?;
277        #[cfg(fuzzing)]
278        let layer_order = col_info
279            .iter()
280            .map(|column| column.typ)
281            .map(crate::decoder::fuzzing::LayerOrdering::from)
282            .collect();
283
284        let mut properties = Vec::with_capacity(prop_count.as_usize());
285        let mut id_column: Option<Id> = None;
286        let mut geometry: Option<Geometry> = None;
287
288        for column in col_info {
289            use crate::decoder::RawProperty as RP;
290
291            let opt;
292            let value;
293            let name = column.name.unwrap_or("");
294
295            match column.typ {
296                ColumnType::Id | ColumnType::OptId => {
297                    (input, opt) = parse_optional(column.typ, input, parser)?;
298                    (input, value) = RawStream::from_bytes(input, parser)?;
299                    id_column.set_once(Raw(RawId {
300                        presence: RawPresence(opt),
301                        value: RawIdValue::Id32(value),
302                    }))?;
303                }
304                ColumnType::LongId | ColumnType::OptLongId => {
305                    (input, opt) = parse_optional(column.typ, input, parser)?;
306                    (input, value) = RawStream::from_bytes(input, parser)?;
307                    id_column.set_once(Raw(RawId {
308                        presence: RawPresence(opt),
309                        value: RawIdValue::Id64(value),
310                    }))?;
311                }
312                ColumnType::Geometry => {
313                    input = parse_geometry_column(input, &mut geometry, parser)?;
314                }
315                ColumnType::Bool | ColumnType::OptBool => {
316                    (input, opt) = parse_optional(column.typ, input, parser)?;
317                    (input, value) = RawStream::parse_bool(input, parser)?;
318                    properties.push(Raw(RP::Bool(scalar(name, opt, value))));
319                }
320                ColumnType::I8 | ColumnType::OptI8 => {
321                    (input, opt) = parse_optional(column.typ, input, parser)?;
322                    (input, value) = RawStream::from_bytes(input, parser)?;
323                    properties.push(Raw(RP::I8(scalar(name, opt, value))));
324                }
325                ColumnType::U8 | ColumnType::OptU8 => {
326                    (input, opt) = parse_optional(column.typ, input, parser)?;
327                    (input, value) = RawStream::from_bytes(input, parser)?;
328                    properties.push(Raw(RP::U8(scalar(name, opt, value))));
329                }
330                ColumnType::I32 | ColumnType::OptI32 => {
331                    (input, opt) = parse_optional(column.typ, input, parser)?;
332                    (input, value) = RawStream::from_bytes(input, parser)?;
333                    properties.push(Raw(RP::I32(scalar(name, opt, value))));
334                }
335                ColumnType::U32 | ColumnType::OptU32 => {
336                    (input, opt) = parse_optional(column.typ, input, parser)?;
337                    (input, value) = RawStream::from_bytes(input, parser)?;
338                    properties.push(Raw(RP::U32(scalar(name, opt, value))));
339                }
340                ColumnType::I64 | ColumnType::OptI64 => {
341                    (input, opt) = parse_optional(column.typ, input, parser)?;
342                    (input, value) = RawStream::from_bytes(input, parser)?;
343                    properties.push(Raw(RP::I64(scalar(name, opt, value))));
344                }
345                ColumnType::U64 | ColumnType::OptU64 => {
346                    (input, opt) = parse_optional(column.typ, input, parser)?;
347                    (input, value) = RawStream::from_bytes(input, parser)?;
348                    properties.push(Raw(RP::U64(scalar(name, opt, value))));
349                }
350                ColumnType::F32 | ColumnType::OptF32 => {
351                    (input, opt) = parse_optional(column.typ, input, parser)?;
352                    (input, value) = RawStream::from_bytes(input, parser)?;
353                    properties.push(Raw(RP::F32(scalar(name, opt, value))));
354                }
355                ColumnType::F64 | ColumnType::OptF64 => {
356                    (input, opt) = parse_optional(column.typ, input, parser)?;
357                    (input, value) = RawStream::from_bytes(input, parser)?;
358                    properties.push(Raw(RP::F64(scalar(name, opt, value))));
359                }
360                ColumnType::Str | ColumnType::OptStr => {
361                    let prop;
362                    (input, prop) = parse_str_column(input, name, column.typ, parser)?;
363                    properties.push(Raw(prop));
364                }
365                ColumnType::SharedDict => {
366                    let prop;
367                    (input, prop) = parse_shared_dict_column(input, &column, parser)?;
368                    properties.push(Raw(prop));
369                }
370            }
371        }
372        if input.is_empty() {
373            Ok(Layer01 {
374                name: layer_name,
375                extent,
376                id: id_column,
377                geometry: geometry.ok_or(MissingGeometry)?,
378                properties,
379                #[cfg(fuzzing)]
380                layer_order,
381            })
382        } else {
383            Err(TrailingLayerData(input.len()))
384        }
385    }
386
387    /// Decode all columns and transition to [`Layer01<Parsed>`].
388    ///
389    /// Consumes `self` (a `Layer01<Lazy>`) and returns a `Layer01<Parsed>` where every
390    /// column field holds its parsed value directly, enabling infallible readonly access.
391    pub fn decode_all(self, dec: &mut Decoder) -> MltResult<ParsedLayer01<'a>> {
392        Ok(Layer01 {
393            name: self.name,
394            extent: self.extent,
395            id: self.id.map(|id| id.into_parsed(dec)).transpose()?,
396            geometry: self.geometry.into_parsed(dec)?,
397            properties: self
398                .properties
399                .into_iter()
400                .map(|p| p.into_parsed(dec))
401                .collect::<MltResult<Vec<_>>>()?,
402            #[cfg(fuzzing)]
403            layer_order: self.layer_order,
404        })
405    }
406}
407
408fn parse_struct_children<'a>(
409    mut input: &'a [u8],
410    column: &Column<'a>,
411    parser: &mut Parser,
412) -> MltRefResult<'a, Vec<RawSharedDictItem<'a>>> {
413    let mut children = Vec::with_capacity(column.children.len());
414    for child in &column.children {
415        let (inp, sc) = parse_varint::<u32>(input)?;
416        let (inp, child_optional) = parse_optional(child.typ, inp, parser)?;
417        let optional_stream_count = u32::from(child_optional.is_some());
418        if let Some(data_count) = sc.checked_sub(optional_stream_count)
419            && data_count != 1
420        {
421            return Err(UnexpectedStructChildCount(data_count));
422        }
423        let (inp, child_data) = RawStream::from_bytes(inp, parser)?;
424        children.push(RawSharedDictItem {
425            name: child.name.unwrap_or(""),
426            presence: RawPresence(child_optional),
427            data: child_data,
428        });
429        input = inp;
430    }
431    Ok((input, children))
432}
433
434fn parse_optional<'a>(
435    typ: ColumnType,
436    input: &'a [u8],
437    parser: &mut Parser,
438) -> MltRefResult<'a, Option<RawStream<'a>>> {
439    if typ.is_optional() {
440        let (input, optional) = RawStream::parse_bool(input, parser)?;
441        Ok((input, Some(optional)))
442    } else {
443        Ok((input, None))
444    }
445}
446
447fn parse_geometry_column<'a>(
448    input: &'a [u8],
449    geometry: &mut Option<Geometry<'a>>,
450    parser: &mut Parser,
451) -> MltResult<&'a [u8]> {
452    let (input, stream_count) = parse_varint::<u32>(input)?;
453    if stream_count == 0 {
454        return Err(GeometryWithoutStreams);
455    }
456    // Each stream requires at least 1 byte (physical stream type)
457    let stream_count_capa = stream_count.as_usize();
458    if input.len() < stream_count_capa {
459        return Err(BufferUnderflow(stream_count, input.len()));
460    }
461    // metadata
462    let (input, meta) = RawStream::from_bytes(input, parser)?;
463    // geometry items
464    let (input, items) = RawStream::parse_multiple(input, stream_count_capa - 1, parser)?;
465    geometry.set_once(Raw(RawGeometry { meta, items }))?;
466    Ok(input)
467}
468
469fn parse_str_column<'a>(
470    mut input: &'a [u8],
471    name: &'a str,
472    typ: ColumnType,
473    parser: &mut Parser,
474) -> MltRefResult<'a, RawProperty<'a>> {
475    let mut stream_count = {
476        let stream_count_u32;
477        (input, stream_count_u32) = parse_varint::<u32>(input)?;
478        stream_count_u32.as_usize()
479    };
480    let presence;
481    (input, presence) = parse_optional(typ, input, parser)?;
482    if presence.is_some() {
483        if stream_count == 0 {
484            return Err(UnsupportedStringStreamCount(stream_count));
485        }
486        stream_count -= 1;
487    }
488    let mut str_streams = [None, None, None, None, None];
489    if stream_count > str_streams.len() {
490        return Err(UnsupportedStringStreamCount(stream_count));
491    }
492    for slot in str_streams.iter_mut().take(stream_count) {
493        let stream;
494        (input, stream) = RawStream::from_bytes(input, parser)?;
495        *slot = Some(stream);
496    }
497    let encoding = match str_streams {
498        [Some(s1), Some(s2), None, None, None] => {
499            RawStringsEncoding::plain(RawPlainData::new(s1, s2)?)
500        }
501        [Some(s1), Some(s2), Some(s3), None, None] => {
502            RawStringsEncoding::dictionary(RawPlainData::new(s1, s3)?, s2)?
503        }
504        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
505            RawStringsEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
506        }
507        [Some(s1), Some(s2), Some(s3), Some(s4), Some(s5)] => {
508            RawStringsEncoding::fsst_dictionary(RawFsstData::new(s1, s2, s3, s4)?, s5)?
509        }
510        _ => Err(UnsupportedStringStreamCount(stream_count))?,
511    };
512    Ok((
513        input,
514        RawProperty::Str(RawStrings {
515            name,
516            presence: RawPresence(presence),
517            encoding,
518        }),
519    ))
520}
521
522fn parse_shared_dict_column<'a>(
523    mut input: &'a [u8],
524    column: &Column<'a>,
525    parser: &mut Parser,
526) -> MltRefResult<'a, RawProperty<'a>> {
527    // Read header streams until we hit the dictionary DATA(Single|Shared) stream.
528    let stream_count;
529    (input, stream_count) = parse_varint::<u32>(input)?;
530    let mut dict_streams = [None, None, None, None, None];
531    let mut streams_taken = 0_usize;
532    while streams_taken < stream_count.as_usize() {
533        let stream;
534        (input, stream) = RawStream::from_bytes(input, parser)?;
535        let is_last = matches!(
536            stream.meta.stream_type,
537            StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
538        );
539        dict_streams[streams_taken] = Some(stream);
540        streams_taken += 1;
541        if is_last {
542            break;
543        } else if streams_taken >= dict_streams.len() {
544            return Err(UnsupportedStringStreamCount(streams_taken + 1));
545        }
546    }
547    let children;
548    (input, children) = parse_struct_children(input, column, parser)?;
549
550    // Validate stream_count: must equal dict_streams + children + optional_children.
551    let children_n = u32::try_from(children.len()).or_overflow()?;
552    let optional_n = children
553        .iter()
554        .filter(|c| c.presence.0.is_some())
555        .count()
556        .try_into()
557        .or_overflow()?;
558    let dict_n = u32::try_from(streams_taken).or_overflow()?;
559    let expected = crate::utils::checked_sum3(dict_n, children_n, optional_n)?;
560    // Java's encoder had a bug (fixed) that overcounted by 1: dict + 2*N + 1.
561    // Accept that value too so that files produced by older Java encoders still parse.
562    let java_legacy = expected.checked_add(1).or_overflow()?;
563    if stream_count != expected && stream_count != java_legacy {
564        return Err(InvalidSharedDictStreamCount {
565            actual: stream_count,
566            expected,
567        });
568    }
569
570    let name = column.name.unwrap_or("");
571    let encoding = match dict_streams {
572        [Some(s1), Some(s2), None, None, None] => {
573            RawSharedDictEncoding::plain(RawPlainData::new(s1, s2)?)
574        }
575        [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
576            RawSharedDictEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
577        }
578        _ => Err(SharedDictRequiresStreams(streams_taken))?,
579    };
580    Ok((
581        input,
582        RawProperty::SharedDict(RawSharedDict {
583            name,
584            encoding,
585            children,
586        }),
587    ))
588}
589
590fn parse_columns_meta<'a>(
591    mut input: &'a [u8],
592    column_count: u32,
593    parser: &mut Parser,
594) -> MltRefResult<'a, (Vec<Column<'a>>, u32)> {
595    use crate::decoder::ColumnType::{Geometry, Id, LongId, OptId, OptLongId, SharedDict};
596
597    let mut col_info = Vec::with_capacity(column_count.as_usize());
598    let mut geometries = 0;
599    let mut ids = 0;
600    for _ in 0..column_count {
601        let mut typ;
602        (input, typ) = Column::from_bytes(input, parser)?;
603        match typ.typ {
604            Geometry => geometries += 1,
605            Id | OptId | LongId | OptLongId => ids += 1,
606            SharedDict => {
607                // Yes, we need to parse children right here; otherwise this messes up the next column
608                let child_column_count;
609                (input, child_column_count) = parse_varint::<u32>(input)?;
610
611                // Each column requires at least 1 byte (ColumnType without a name)
612                let child_col_capacity = child_column_count.as_usize();
613                if input.len() < child_col_capacity {
614                    return Err(BufferUnderflow(child_column_count, input.len()));
615                }
616                let mut children = Vec::with_capacity(child_col_capacity);
617                for _ in 0..child_column_count {
618                    let child;
619                    (input, child) = Column::from_bytes(input, parser)?;
620                    children.push(child);
621                }
622                typ.children = children;
623            }
624            _ => {}
625        }
626        col_info.push(typ);
627    }
628    if geometries > 1 {
629        return Err(MultipleGeometryColumns);
630    }
631    if ids > 1 {
632        return Err(MultipleIdColumns);
633    }
634
635    Ok((input, (col_info, column_count - geometries - ids)))
636}
637
638fn scalar<'a>(name: &'a str, opt: Option<RawStream<'a>>, value: RawStream<'a>) -> RawScalar<'a> {
639    RawScalar {
640        name,
641        presence: RawPresence(opt),
642        data: value,
643    }
644}