1use crate::LazyParsed::Raw;
2use crate::MltError::{
3 BufferUnderflow, GeometryWithoutStreams, InvalidSharedDictStreamCount, MissingGeometry,
4 MultipleGeometryColumns, MultipleIdColumns, SharedDictRequiresStreams, TrailingLayerData,
5 UnexpectedStructChildCount, UnsupportedStringStreamCount,
6};
7use crate::codecs::varint::parse_varint;
8use crate::decoder::{
9 Column, ColumnType, DictionaryType, Geometry, GeometryValues, Id, IdValues, Layer01,
10 ParsedLayer01, RawFsstData, RawGeometry, RawId, RawIdValue, RawPlainData, RawPresence,
11 RawProperty, RawScalar, RawSharedDict, RawSharedDictEncoding, RawSharedDictItem, RawStream,
12 RawStrings, RawStringsEncoding, StreamType,
13};
14use crate::errors::AsMltError as _;
15use crate::utils::{AsUsize as _, SetOptionOnce as _, parse_string};
16use crate::{Layer, Lazy, MltError, MltRefResult, MltResult, ParsedLayer};
17
18const DEFAULT_MAX_BYTES: u32 = 20 * 1024 * 1024;
20
21#[derive(Debug, Clone, PartialEq, Eq, Default)]
38pub struct Decoder {
39 budget: MemBudget,
41 pub(crate) buffer_u32: Vec<u32>,
44 pub(crate) buffer_u64: Vec<u64>,
47}
48
49impl Decoder {
50 #[must_use]
52 pub fn with_max_size(max_bytes: u32) -> Self {
53 Self {
54 budget: MemBudget::with_max_size(max_bytes),
55 ..Default::default()
56 }
57 }
58
59 pub fn decode_all<'a>(
60 &mut self,
61 layers: impl IntoIterator<Item = Layer<'a>>,
62 ) -> MltResult<Vec<ParsedLayer<'a>>> {
63 layers
64 .into_iter()
65 .map(|l| l.decode_all(self))
66 .collect::<MltResult<_>>()
67 }
68
69 #[inline]
72 pub(crate) fn alloc<T>(&mut self, capacity: usize) -> MltResult<Vec<T>> {
73 let bytes = capacity.checked_mul(size_of::<T>()).or_overflow()?;
74 let bytes_u32 = u32::try_from(bytes).or_overflow()?;
75 self.budget.consume(bytes_u32)?;
76 Ok(Vec::with_capacity(capacity))
77 }
78
79 #[inline]
82 pub(crate) fn consume(&mut self, size: u32) -> MltResult<()> {
83 self.budget.consume(size)
84 }
85
86 #[inline]
88 pub(crate) fn consume_items<T>(&mut self, count: usize) -> MltResult<()> {
89 let bytes = count.checked_mul(size_of::<T>()).or_overflow()?;
90 self.budget.consume(u32::try_from(bytes).or_overflow()?)
91 }
92
93 #[inline]
94 pub(crate) fn adjust(&mut self, adjustment: u32) {
95 self.budget.adjust(adjustment);
96 }
97
98 #[inline]
106 pub(crate) fn adjust_alloc<T>(&mut self, buf: &[T], alloc_size: usize) -> MltResult<()> {
107 if buf.len() > alloc_size {
108 return Err(MltError::InvalidDecodingStreamSize(buf.len(), alloc_size));
109 }
110 let unused = (alloc_size - buf.len()) * size_of::<T>();
112 #[expect(
115 clippy::cast_possible_truncation,
116 reason = "unused <= alloc_size * size_of::<T>() which was verified to fit in u32 by alloc()"
117 )]
118 self.budget.adjust(unused as u32);
119 Ok(())
120 }
121
122 #[must_use]
123 pub fn consumed(&self) -> u32 {
124 self.budget.consumed()
125 }
126
127 pub fn reset_budget(&mut self) {
142 self.budget.reset();
143 }
144}
145
146impl MemBudget {
147 fn reset(&mut self) {
152 self.bytes_used = 0;
153 }
154}
155#[derive(Debug, Clone, PartialEq, Eq, Default)]
170pub struct Parser {
171 budget: MemBudget,
172}
173
174impl Parser {
175 #[must_use]
177 pub fn with_max_size(max_bytes: u32) -> Self {
178 Self {
179 budget: MemBudget::with_max_size(max_bytes),
180 }
181 }
182
183 pub fn parse_layers<'a>(&mut self, mut input: &'a [u8]) -> MltResult<Vec<Layer<'a>>> {
185 let mut result = Vec::new();
186 while !input.is_empty() {
187 let layer;
188 (input, layer) = Layer::from_bytes(input, self)?;
189 result.push(layer);
190 }
191 Ok(result)
192 }
193
194 #[inline]
196 pub(crate) fn reserve(&mut self, size: u32) -> MltResult<()> {
197 self.budget.consume(size)
198 }
199
200 #[must_use]
201 pub fn reserved(&self) -> u32 {
202 self.budget.consumed()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct MemBudget {
208 pub max_bytes: u32,
210 pub bytes_used: u32,
212}
213
214impl Default for MemBudget {
215 fn default() -> Self {
217 Self::with_max_size(DEFAULT_MAX_BYTES)
218 }
219}
220
221impl MemBudget {
222 #[must_use]
224 fn with_max_size(max_bytes: u32) -> Self {
225 Self {
226 max_bytes,
227 bytes_used: 0,
228 }
229 }
230
231 #[inline]
233 fn adjust(&mut self, adjustment: u32) {
234 self.bytes_used = self.bytes_used.checked_sub(adjustment).unwrap();
235 }
236
237 #[inline]
239 fn consume(&mut self, size: u32) -> MltResult<()> {
240 let accumulator = &mut self.bytes_used;
241 let max_bytes = self.max_bytes;
242 if let Some(new_value) = accumulator
243 .checked_add(size)
244 .and_then(|v| if v > max_bytes { None } else { Some(v) })
245 {
246 *accumulator = new_value;
247 Ok(())
248 } else {
249 Err(MltError::MemoryLimitExceeded {
250 limit: max_bytes,
251 used: *accumulator,
252 requested: size,
253 })
254 }
255 }
256
257 fn consumed(&self) -> u32 {
258 self.bytes_used
259 }
260}
261
262impl<'a> Layer01<'a, Lazy> {
263 pub fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltResult<Self> {
265 let (input, layer_name) = parse_string(input)?;
266 let (input, extent) = parse_varint::<u32>(input)?;
267 let (input, column_count) = parse_varint::<u32>(input)?;
268
269 if input.len() < column_count.as_usize() {
271 return Err(BufferUnderflow(column_count, input.len()));
272 }
273
274 let (mut input, (col_info, prop_count)) = parse_columns_meta(input, column_count, parser)?;
277 #[cfg(fuzzing)]
278 let layer_order = col_info
279 .iter()
280 .map(|column| column.typ)
281 .map(crate::decoder::fuzzing::LayerOrdering::from)
282 .collect();
283
284 let mut properties = Vec::with_capacity(prop_count.as_usize());
285 let mut id_column: Option<Id> = None;
286 let mut geometry: Option<Geometry> = None;
287
288 for column in col_info {
289 use crate::decoder::RawProperty as RP;
290
291 let opt;
292 let value;
293 let name = column.name.unwrap_or("");
294
295 match column.typ {
296 ColumnType::Id | ColumnType::OptId => {
297 (input, opt) = parse_optional(column.typ, input, parser)?;
298 (input, value) = RawStream::from_bytes(input, parser)?;
299 id_column.set_once(Raw(RawId {
300 presence: RawPresence(opt),
301 value: RawIdValue::Id32(value),
302 }))?;
303 }
304 ColumnType::LongId | ColumnType::OptLongId => {
305 (input, opt) = parse_optional(column.typ, input, parser)?;
306 (input, value) = RawStream::from_bytes(input, parser)?;
307 id_column.set_once(Raw(RawId {
308 presence: RawPresence(opt),
309 value: RawIdValue::Id64(value),
310 }))?;
311 }
312 ColumnType::Geometry => {
313 input = parse_geometry_column(input, &mut geometry, parser)?;
314 }
315 ColumnType::Bool | ColumnType::OptBool => {
316 (input, opt) = parse_optional(column.typ, input, parser)?;
317 (input, value) = RawStream::parse_bool(input, parser)?;
318 properties.push(Raw(RP::Bool(scalar(name, opt, value))));
319 }
320 ColumnType::I8 | ColumnType::OptI8 => {
321 (input, opt) = parse_optional(column.typ, input, parser)?;
322 (input, value) = RawStream::from_bytes(input, parser)?;
323 properties.push(Raw(RP::I8(scalar(name, opt, value))));
324 }
325 ColumnType::U8 | ColumnType::OptU8 => {
326 (input, opt) = parse_optional(column.typ, input, parser)?;
327 (input, value) = RawStream::from_bytes(input, parser)?;
328 properties.push(Raw(RP::U8(scalar(name, opt, value))));
329 }
330 ColumnType::I32 | ColumnType::OptI32 => {
331 (input, opt) = parse_optional(column.typ, input, parser)?;
332 (input, value) = RawStream::from_bytes(input, parser)?;
333 properties.push(Raw(RP::I32(scalar(name, opt, value))));
334 }
335 ColumnType::U32 | ColumnType::OptU32 => {
336 (input, opt) = parse_optional(column.typ, input, parser)?;
337 (input, value) = RawStream::from_bytes(input, parser)?;
338 properties.push(Raw(RP::U32(scalar(name, opt, value))));
339 }
340 ColumnType::I64 | ColumnType::OptI64 => {
341 (input, opt) = parse_optional(column.typ, input, parser)?;
342 (input, value) = RawStream::from_bytes(input, parser)?;
343 properties.push(Raw(RP::I64(scalar(name, opt, value))));
344 }
345 ColumnType::U64 | ColumnType::OptU64 => {
346 (input, opt) = parse_optional(column.typ, input, parser)?;
347 (input, value) = RawStream::from_bytes(input, parser)?;
348 properties.push(Raw(RP::U64(scalar(name, opt, value))));
349 }
350 ColumnType::F32 | ColumnType::OptF32 => {
351 (input, opt) = parse_optional(column.typ, input, parser)?;
352 (input, value) = RawStream::from_bytes(input, parser)?;
353 properties.push(Raw(RP::F32(scalar(name, opt, value))));
354 }
355 ColumnType::F64 | ColumnType::OptF64 => {
356 (input, opt) = parse_optional(column.typ, input, parser)?;
357 (input, value) = RawStream::from_bytes(input, parser)?;
358 properties.push(Raw(RP::F64(scalar(name, opt, value))));
359 }
360 ColumnType::Str | ColumnType::OptStr => {
361 let prop;
362 (input, prop) = parse_str_column(input, name, column.typ, parser)?;
363 properties.push(Raw(prop));
364 }
365 ColumnType::SharedDict => {
366 let prop;
367 (input, prop) = parse_shared_dict_column(input, &column, parser)?;
368 properties.push(Raw(prop));
369 }
370 }
371 }
372 if input.is_empty() {
373 Ok(Layer01 {
374 name: layer_name,
375 extent,
376 id: id_column,
377 geometry: geometry.ok_or(MissingGeometry)?,
378 properties,
379 #[cfg(fuzzing)]
380 layer_order,
381 })
382 } else {
383 Err(TrailingLayerData(input.len()))
384 }
385 }
386
387 pub fn decode_id(&mut self, dec: &mut Decoder) -> MltResult<Option<&mut IdValues>> {
391 Ok(if let Some(id) = &mut self.id {
392 Some(id.decode(dec)?)
393 } else {
394 None
395 })
396 }
397
398 pub fn decode_geometry(&mut self, dec: &mut Decoder) -> MltResult<&mut GeometryValues> {
402 self.geometry.decode(dec)
403 }
404
405 pub fn decode_properties(&mut self, dec: &mut Decoder) -> MltResult<()> {
409 for prop in &mut self.properties {
410 prop.decode(dec)?;
411 }
412 Ok(())
413 }
414
415 pub fn decode_all(self, dec: &mut Decoder) -> MltResult<ParsedLayer01<'a>> {
420 Ok(Layer01 {
421 name: self.name,
422 extent: self.extent,
423 id: self.id.map(|id| id.into_parsed(dec)).transpose()?,
424 geometry: self.geometry.into_parsed(dec)?,
425 properties: self
426 .properties
427 .into_iter()
428 .map(|p| p.into_parsed(dec))
429 .collect::<MltResult<Vec<_>>>()?,
430 #[cfg(fuzzing)]
431 layer_order: self.layer_order,
432 })
433 }
434}
435
436fn parse_struct_children<'a>(
437 mut input: &'a [u8],
438 column: &Column<'a>,
439 parser: &mut Parser,
440) -> MltRefResult<'a, Vec<RawSharedDictItem<'a>>> {
441 let mut children = Vec::with_capacity(column.children.len());
442 for child in &column.children {
443 let (inp, sc) = parse_varint::<u32>(input)?;
444 let (inp, child_optional) = parse_optional(child.typ, inp, parser)?;
445 let optional_stream_count = u32::from(child_optional.is_some());
446 if let Some(data_count) = sc.checked_sub(optional_stream_count)
447 && data_count != 1
448 {
449 return Err(UnexpectedStructChildCount(data_count));
450 }
451 let (inp, child_data) = RawStream::from_bytes(inp, parser)?;
452 children.push(RawSharedDictItem {
453 name: child.name.unwrap_or(""),
454 presence: RawPresence(child_optional),
455 data: child_data,
456 });
457 input = inp;
458 }
459 Ok((input, children))
460}
461
462fn parse_optional<'a>(
463 typ: ColumnType,
464 input: &'a [u8],
465 parser: &mut Parser,
466) -> MltRefResult<'a, Option<RawStream<'a>>> {
467 if typ.is_optional() {
468 let (input, optional) = RawStream::parse_bool(input, parser)?;
469 Ok((input, Some(optional)))
470 } else {
471 Ok((input, None))
472 }
473}
474
475fn parse_geometry_column<'a>(
476 input: &'a [u8],
477 geometry: &mut Option<Geometry<'a>>,
478 parser: &mut Parser,
479) -> MltResult<&'a [u8]> {
480 let (input, stream_count) = parse_varint::<u32>(input)?;
481 if stream_count == 0 {
482 return Err(GeometryWithoutStreams);
483 }
484 let stream_count_capa = stream_count.as_usize();
486 if input.len() < stream_count_capa {
487 return Err(BufferUnderflow(stream_count, input.len()));
488 }
489 let (input, meta) = RawStream::from_bytes(input, parser)?;
491 let (input, items) = RawStream::parse_multiple(input, stream_count_capa - 1, parser)?;
493 geometry.set_once(Raw(RawGeometry { meta, items }))?;
494 Ok(input)
495}
496
497fn parse_str_column<'a>(
498 mut input: &'a [u8],
499 name: &'a str,
500 typ: ColumnType,
501 parser: &mut Parser,
502) -> MltRefResult<'a, RawProperty<'a>> {
503 let mut stream_count = {
504 let stream_count_u32;
505 (input, stream_count_u32) = parse_varint::<u32>(input)?;
506 stream_count_u32.as_usize()
507 };
508 let presence;
509 (input, presence) = parse_optional(typ, input, parser)?;
510 if presence.is_some() {
511 if stream_count == 0 {
512 return Err(UnsupportedStringStreamCount(stream_count));
513 }
514 stream_count -= 1;
515 }
516 let mut str_streams = [None, None, None, None, None];
517 if stream_count > str_streams.len() {
518 return Err(UnsupportedStringStreamCount(stream_count));
519 }
520 for slot in str_streams.iter_mut().take(stream_count) {
521 let stream;
522 (input, stream) = RawStream::from_bytes(input, parser)?;
523 *slot = Some(stream);
524 }
525 let encoding = match str_streams {
526 [Some(s1), Some(s2), None, None, None] => {
527 RawStringsEncoding::plain(RawPlainData::new(s1, s2)?)
528 }
529 [Some(s1), Some(s2), Some(s3), None, None] => {
530 RawStringsEncoding::dictionary(RawPlainData::new(s1, s3)?, s2)?
531 }
532 [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
533 RawStringsEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
534 }
535 [Some(s1), Some(s2), Some(s3), Some(s4), Some(s5)] => {
536 RawStringsEncoding::fsst_dictionary(RawFsstData::new(s1, s2, s3, s4)?, s5)?
537 }
538 _ => Err(UnsupportedStringStreamCount(stream_count))?,
539 };
540 Ok((
541 input,
542 RawProperty::Str(RawStrings {
543 name,
544 presence: RawPresence(presence),
545 encoding,
546 }),
547 ))
548}
549
550fn parse_shared_dict_column<'a>(
551 mut input: &'a [u8],
552 column: &Column<'a>,
553 parser: &mut Parser,
554) -> MltRefResult<'a, RawProperty<'a>> {
555 let stream_count;
557 (input, stream_count) = parse_varint::<u32>(input)?;
558 let mut dict_streams = [None, None, None, None, None];
559 let mut streams_taken = 0_usize;
560 while streams_taken < stream_count.as_usize() {
561 let stream;
562 (input, stream) = RawStream::from_bytes(input, parser)?;
563 let is_last = matches!(
564 stream.meta.stream_type,
565 StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
566 );
567 dict_streams[streams_taken] = Some(stream);
568 streams_taken += 1;
569 if is_last {
570 break;
571 } else if streams_taken >= dict_streams.len() {
572 return Err(UnsupportedStringStreamCount(streams_taken + 1));
573 }
574 }
575 let children;
576 (input, children) = parse_struct_children(input, column, parser)?;
577
578 let children_n = u32::try_from(children.len()).or_overflow()?;
580 let optional_n = children
581 .iter()
582 .filter(|c| c.presence.0.is_some())
583 .count()
584 .try_into()
585 .or_overflow()?;
586 let dict_n = u32::try_from(streams_taken).or_overflow()?;
587 let expected = crate::utils::checked_sum3(dict_n, children_n, optional_n)?;
588 let java_legacy = expected.checked_add(1).or_overflow()?;
591 if stream_count != expected && stream_count != java_legacy {
592 return Err(InvalidSharedDictStreamCount {
593 actual: stream_count,
594 expected,
595 });
596 }
597
598 let name = column.name.unwrap_or("");
599 let encoding = match dict_streams {
600 [Some(s1), Some(s2), None, None, None] => {
601 RawSharedDictEncoding::plain(RawPlainData::new(s1, s2)?)
602 }
603 [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
604 RawSharedDictEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
605 }
606 _ => Err(SharedDictRequiresStreams(streams_taken))?,
607 };
608 Ok((
609 input,
610 RawProperty::SharedDict(RawSharedDict {
611 name,
612 encoding,
613 children,
614 }),
615 ))
616}
617
618fn parse_columns_meta<'a>(
619 mut input: &'a [u8],
620 column_count: u32,
621 parser: &mut Parser,
622) -> MltRefResult<'a, (Vec<Column<'a>>, u32)> {
623 use crate::decoder::ColumnType::{Geometry, Id, LongId, OptId, OptLongId, SharedDict};
624
625 let mut col_info = Vec::with_capacity(column_count.as_usize());
626 let mut geometries = 0;
627 let mut ids = 0;
628 for _ in 0..column_count {
629 let mut typ;
630 (input, typ) = Column::from_bytes(input, parser)?;
631 match typ.typ {
632 Geometry => geometries += 1,
633 Id | OptId | LongId | OptLongId => ids += 1,
634 SharedDict => {
635 let child_column_count;
637 (input, child_column_count) = parse_varint::<u32>(input)?;
638
639 let child_col_capacity = child_column_count.as_usize();
641 if input.len() < child_col_capacity {
642 return Err(BufferUnderflow(child_column_count, input.len()));
643 }
644 let mut children = Vec::with_capacity(child_col_capacity);
645 for _ in 0..child_column_count {
646 let child;
647 (input, child) = Column::from_bytes(input, parser)?;
648 children.push(child);
649 }
650 typ.children = children;
651 }
652 _ => {}
653 }
654 col_info.push(typ);
655 }
656 if geometries > 1 {
657 return Err(MultipleGeometryColumns);
658 }
659 if ids > 1 {
660 return Err(MultipleIdColumns);
661 }
662
663 Ok((input, (col_info, column_count - geometries - ids)))
664}
665
666fn scalar<'a>(name: &'a str, opt: Option<RawStream<'a>>, value: RawStream<'a>) -> RawScalar<'a> {
667 RawScalar {
668 name,
669 presence: RawPresence(opt),
670 data: value,
671 }
672}