1use crate::LazyParsed::Raw;
2use crate::MltError::{
3 BufferUnderflow, GeometryWithoutStreams, InvalidSharedDictStreamCount, MissingGeometry,
4 MultipleGeometryColumns, MultipleIdColumns, SharedDictRequiresStreams, TrailingLayerData,
5 UnexpectedStructChildCount, UnsupportedStringStreamCount,
6};
7use crate::codecs::varint::parse_varint;
8use crate::decoder::{
9 Column, ColumnType, DictionaryType, Geometry, Id, Layer01, ParsedLayer01, RawFsstData,
10 RawGeometry, RawId, RawIdValue, RawPlainData, RawPresence, RawProperty, RawScalar,
11 RawSharedDict, RawSharedDictEncoding, RawSharedDictItem, RawStream, RawStrings,
12 RawStringsEncoding, StreamType,
13};
14use crate::errors::AsMltError as _;
15use crate::utils::{AsUsize as _, SetOptionOnce as _, parse_string};
16use crate::{Layer, Lazy, MltError, MltRefResult, MltResult, ParsedLayer};
17
18const DEFAULT_MAX_BYTES: u32 = 20 * 1024 * 1024;
20
21#[derive(Debug, Clone, PartialEq, Eq, Default)]
38pub struct Decoder {
39 budget: MemBudget,
41 pub(crate) buffer_u32: Vec<u32>,
44 pub(crate) buffer_u64: Vec<u64>,
47}
48
49impl Decoder {
50 #[must_use]
52 pub fn with_max_size(max_bytes: u32) -> Self {
53 Self {
54 budget: MemBudget::with_max_size(max_bytes),
55 ..Default::default()
56 }
57 }
58
59 pub fn decode_all<'a>(
60 &mut self,
61 layers: impl IntoIterator<Item = Layer<'a>>,
62 ) -> MltResult<Vec<ParsedLayer<'a>>> {
63 layers
64 .into_iter()
65 .map(|l| l.decode_all(self))
66 .collect::<MltResult<_>>()
67 }
68
69 #[inline]
72 pub(crate) fn alloc<T>(&mut self, capacity: usize) -> MltResult<Vec<T>> {
73 let bytes = capacity.checked_mul(size_of::<T>()).or_overflow()?;
74 let bytes_u32 = u32::try_from(bytes).or_overflow()?;
75 self.budget.consume(bytes_u32)?;
76 Ok(Vec::with_capacity(capacity))
77 }
78
79 #[inline]
82 pub(crate) fn consume(&mut self, size: u32) -> MltResult<()> {
83 self.budget.consume(size)
84 }
85
86 #[inline]
88 pub(crate) fn consume_items<T>(&mut self, count: usize) -> MltResult<()> {
89 let bytes = count.checked_mul(size_of::<T>()).or_overflow()?;
90 self.budget.consume(u32::try_from(bytes).or_overflow()?)
91 }
92
93 #[inline]
94 pub(crate) fn adjust(&mut self, adjustment: u32) {
95 self.budget.adjust(adjustment);
96 }
97
98 #[inline]
106 pub(crate) fn adjust_alloc<T>(&mut self, buf: &[T], alloc_size: usize) -> MltResult<()> {
107 if buf.len() > alloc_size {
108 return Err(MltError::InvalidDecodingStreamSize(buf.len(), alloc_size));
109 }
110 let unused = (alloc_size - buf.len()) * size_of::<T>();
112 #[expect(
115 clippy::cast_possible_truncation,
116 reason = "unused <= alloc_size * size_of::<T>() which was verified to fit in u32 by alloc()"
117 )]
118 self.budget.adjust(unused as u32);
119 Ok(())
120 }
121
122 #[must_use]
123 pub fn consumed(&self) -> u32 {
124 self.budget.consumed()
125 }
126
127 pub fn reset_budget(&mut self) {
142 self.budget.reset();
143 }
144}
145
146impl MemBudget {
147 fn reset(&mut self) {
152 self.bytes_used = 0;
153 }
154}
155#[derive(Debug, Clone, PartialEq, Eq, Default)]
170pub struct Parser {
171 budget: MemBudget,
172}
173
174impl Parser {
175 #[must_use]
177 pub fn with_max_size(max_bytes: u32) -> Self {
178 Self {
179 budget: MemBudget::with_max_size(max_bytes),
180 }
181 }
182
183 pub fn parse_layers<'a>(&mut self, mut input: &'a [u8]) -> MltResult<Vec<Layer<'a>>> {
185 let mut result = Vec::new();
186 while !input.is_empty() {
187 let layer;
188 (input, layer) = Layer::from_bytes(input, self)?;
189 result.push(layer);
190 }
191 Ok(result)
192 }
193
194 #[inline]
196 pub(crate) fn reserve(&mut self, size: u32) -> MltResult<()> {
197 self.budget.consume(size)
198 }
199
200 #[must_use]
201 pub fn reserved(&self) -> u32 {
202 self.budget.consumed()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct MemBudget {
208 pub max_bytes: u32,
210 pub bytes_used: u32,
212}
213
214impl Default for MemBudget {
215 fn default() -> Self {
217 Self::with_max_size(DEFAULT_MAX_BYTES)
218 }
219}
220
221impl MemBudget {
222 #[must_use]
224 fn with_max_size(max_bytes: u32) -> Self {
225 Self {
226 max_bytes,
227 bytes_used: 0,
228 }
229 }
230
231 #[inline]
233 fn adjust(&mut self, adjustment: u32) {
234 self.bytes_used = self.bytes_used.checked_sub(adjustment).unwrap();
235 }
236
237 #[inline]
239 fn consume(&mut self, size: u32) -> MltResult<()> {
240 let accumulator = &mut self.bytes_used;
241 let max_bytes = self.max_bytes;
242 if let Some(new_value) = accumulator
243 .checked_add(size)
244 .and_then(|v| if v > max_bytes { None } else { Some(v) })
245 {
246 *accumulator = new_value;
247 Ok(())
248 } else {
249 Err(MltError::MemoryLimitExceeded {
250 limit: max_bytes,
251 used: *accumulator,
252 requested: size,
253 })
254 }
255 }
256
257 fn consumed(&self) -> u32 {
258 self.bytes_used
259 }
260}
261
262impl<'a> Layer01<'a, Lazy> {
263 pub(crate) fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltResult<Self> {
265 let (input, layer_name) = parse_string(input)?;
266 let (input, extent) = parse_varint::<u32>(input)?;
267 let (input, column_count) = parse_varint::<u32>(input)?;
268
269 if input.len() < column_count.as_usize() {
271 return Err(BufferUnderflow(column_count, input.len()));
272 }
273
274 let (mut input, (col_info, prop_count)) = parse_columns_meta(input, column_count, parser)?;
277 #[cfg(fuzzing)]
278 let layer_order = col_info
279 .iter()
280 .map(|column| column.typ)
281 .map(crate::decoder::fuzzing::LayerOrdering::from)
282 .collect();
283
284 let mut properties = Vec::with_capacity(prop_count.as_usize());
285 let mut id_column: Option<Id> = None;
286 let mut geometry: Option<Geometry> = None;
287
288 for column in col_info {
289 use crate::decoder::RawProperty as RP;
290
291 let opt;
292 let value;
293 let name = column.name.unwrap_or("");
294
295 match column.typ {
296 ColumnType::Id | ColumnType::OptId => {
297 (input, opt) = parse_optional(column.typ, input, parser)?;
298 (input, value) = RawStream::from_bytes(input, parser)?;
299 id_column.set_once(Raw(RawId {
300 presence: RawPresence(opt),
301 value: RawIdValue::Id32(value),
302 }))?;
303 }
304 ColumnType::LongId | ColumnType::OptLongId => {
305 (input, opt) = parse_optional(column.typ, input, parser)?;
306 (input, value) = RawStream::from_bytes(input, parser)?;
307 id_column.set_once(Raw(RawId {
308 presence: RawPresence(opt),
309 value: RawIdValue::Id64(value),
310 }))?;
311 }
312 ColumnType::Geometry => {
313 input = parse_geometry_column(input, &mut geometry, parser)?;
314 }
315 ColumnType::Bool | ColumnType::OptBool => {
316 (input, opt) = parse_optional(column.typ, input, parser)?;
317 (input, value) = RawStream::parse_bool(input, parser)?;
318 properties.push(Raw(RP::Bool(scalar(name, opt, value))));
319 }
320 ColumnType::I8 | ColumnType::OptI8 => {
321 (input, opt) = parse_optional(column.typ, input, parser)?;
322 (input, value) = RawStream::from_bytes(input, parser)?;
323 properties.push(Raw(RP::I8(scalar(name, opt, value))));
324 }
325 ColumnType::U8 | ColumnType::OptU8 => {
326 (input, opt) = parse_optional(column.typ, input, parser)?;
327 (input, value) = RawStream::from_bytes(input, parser)?;
328 properties.push(Raw(RP::U8(scalar(name, opt, value))));
329 }
330 ColumnType::I32 | ColumnType::OptI32 => {
331 (input, opt) = parse_optional(column.typ, input, parser)?;
332 (input, value) = RawStream::from_bytes(input, parser)?;
333 properties.push(Raw(RP::I32(scalar(name, opt, value))));
334 }
335 ColumnType::U32 | ColumnType::OptU32 => {
336 (input, opt) = parse_optional(column.typ, input, parser)?;
337 (input, value) = RawStream::from_bytes(input, parser)?;
338 properties.push(Raw(RP::U32(scalar(name, opt, value))));
339 }
340 ColumnType::I64 | ColumnType::OptI64 => {
341 (input, opt) = parse_optional(column.typ, input, parser)?;
342 (input, value) = RawStream::from_bytes(input, parser)?;
343 properties.push(Raw(RP::I64(scalar(name, opt, value))));
344 }
345 ColumnType::U64 | ColumnType::OptU64 => {
346 (input, opt) = parse_optional(column.typ, input, parser)?;
347 (input, value) = RawStream::from_bytes(input, parser)?;
348 properties.push(Raw(RP::U64(scalar(name, opt, value))));
349 }
350 ColumnType::F32 | ColumnType::OptF32 => {
351 (input, opt) = parse_optional(column.typ, input, parser)?;
352 (input, value) = RawStream::from_bytes(input, parser)?;
353 properties.push(Raw(RP::F32(scalar(name, opt, value))));
354 }
355 ColumnType::F64 | ColumnType::OptF64 => {
356 (input, opt) = parse_optional(column.typ, input, parser)?;
357 (input, value) = RawStream::from_bytes(input, parser)?;
358 properties.push(Raw(RP::F64(scalar(name, opt, value))));
359 }
360 ColumnType::Str | ColumnType::OptStr => {
361 let prop;
362 (input, prop) = parse_str_column(input, name, column.typ, parser)?;
363 properties.push(Raw(prop));
364 }
365 ColumnType::SharedDict => {
366 let prop;
367 (input, prop) = parse_shared_dict_column(input, &column, parser)?;
368 properties.push(Raw(prop));
369 }
370 }
371 }
372 if input.is_empty() {
373 Ok(Layer01 {
374 name: layer_name,
375 extent,
376 id: id_column,
377 geometry: geometry.ok_or(MissingGeometry)?,
378 properties,
379 #[cfg(fuzzing)]
380 layer_order,
381 })
382 } else {
383 Err(TrailingLayerData(input.len()))
384 }
385 }
386
387 pub fn decode_all(self, dec: &mut Decoder) -> MltResult<ParsedLayer01<'a>> {
392 Ok(Layer01 {
393 name: self.name,
394 extent: self.extent,
395 id: self.id.map(|id| id.into_parsed(dec)).transpose()?,
396 geometry: self.geometry.into_parsed(dec)?,
397 properties: self
398 .properties
399 .into_iter()
400 .map(|p| p.into_parsed(dec))
401 .collect::<MltResult<Vec<_>>>()?,
402 #[cfg(fuzzing)]
403 layer_order: self.layer_order,
404 })
405 }
406}
407
408fn parse_struct_children<'a>(
409 mut input: &'a [u8],
410 column: &Column<'a>,
411 parser: &mut Parser,
412) -> MltRefResult<'a, Vec<RawSharedDictItem<'a>>> {
413 let mut children = Vec::with_capacity(column.children.len());
414 for child in &column.children {
415 let (inp, sc) = parse_varint::<u32>(input)?;
416 let (inp, child_optional) = parse_optional(child.typ, inp, parser)?;
417 let optional_stream_count = u32::from(child_optional.is_some());
418 if let Some(data_count) = sc.checked_sub(optional_stream_count)
419 && data_count != 1
420 {
421 return Err(UnexpectedStructChildCount(data_count));
422 }
423 let (inp, child_data) = RawStream::from_bytes(inp, parser)?;
424 children.push(RawSharedDictItem {
425 name: child.name.unwrap_or(""),
426 presence: RawPresence(child_optional),
427 data: child_data,
428 });
429 input = inp;
430 }
431 Ok((input, children))
432}
433
434fn parse_optional<'a>(
435 typ: ColumnType,
436 input: &'a [u8],
437 parser: &mut Parser,
438) -> MltRefResult<'a, Option<RawStream<'a>>> {
439 if typ.is_optional() {
440 let (input, optional) = RawStream::parse_bool(input, parser)?;
441 Ok((input, Some(optional)))
442 } else {
443 Ok((input, None))
444 }
445}
446
447fn parse_geometry_column<'a>(
448 input: &'a [u8],
449 geometry: &mut Option<Geometry<'a>>,
450 parser: &mut Parser,
451) -> MltResult<&'a [u8]> {
452 let (input, stream_count) = parse_varint::<u32>(input)?;
453 if stream_count == 0 {
454 return Err(GeometryWithoutStreams);
455 }
456 let stream_count_capa = stream_count.as_usize();
458 if input.len() < stream_count_capa {
459 return Err(BufferUnderflow(stream_count, input.len()));
460 }
461 let (input, meta) = RawStream::from_bytes(input, parser)?;
463 let (input, items) = RawStream::parse_multiple(input, stream_count_capa - 1, parser)?;
465 geometry.set_once(Raw(RawGeometry { meta, items }))?;
466 Ok(input)
467}
468
469fn parse_str_column<'a>(
470 mut input: &'a [u8],
471 name: &'a str,
472 typ: ColumnType,
473 parser: &mut Parser,
474) -> MltRefResult<'a, RawProperty<'a>> {
475 let mut stream_count = {
476 let stream_count_u32;
477 (input, stream_count_u32) = parse_varint::<u32>(input)?;
478 stream_count_u32.as_usize()
479 };
480 let presence;
481 (input, presence) = parse_optional(typ, input, parser)?;
482 if presence.is_some() {
483 if stream_count == 0 {
484 return Err(UnsupportedStringStreamCount(stream_count));
485 }
486 stream_count -= 1;
487 }
488 let mut str_streams = [None, None, None, None, None];
489 if stream_count > str_streams.len() {
490 return Err(UnsupportedStringStreamCount(stream_count));
491 }
492 for slot in str_streams.iter_mut().take(stream_count) {
493 let stream;
494 (input, stream) = RawStream::from_bytes(input, parser)?;
495 *slot = Some(stream);
496 }
497 let encoding = match str_streams {
498 [Some(s1), Some(s2), None, None, None] => {
499 RawStringsEncoding::plain(RawPlainData::new(s1, s2)?)
500 }
501 [Some(s1), Some(s2), Some(s3), None, None] => {
502 RawStringsEncoding::dictionary(RawPlainData::new(s1, s3)?, s2)?
503 }
504 [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
505 RawStringsEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
506 }
507 [Some(s1), Some(s2), Some(s3), Some(s4), Some(s5)] => {
508 RawStringsEncoding::fsst_dictionary(RawFsstData::new(s1, s2, s3, s4)?, s5)?
509 }
510 _ => Err(UnsupportedStringStreamCount(stream_count))?,
511 };
512 Ok((
513 input,
514 RawProperty::Str(RawStrings {
515 name,
516 presence: RawPresence(presence),
517 encoding,
518 }),
519 ))
520}
521
522fn parse_shared_dict_column<'a>(
523 mut input: &'a [u8],
524 column: &Column<'a>,
525 parser: &mut Parser,
526) -> MltRefResult<'a, RawProperty<'a>> {
527 let stream_count;
529 (input, stream_count) = parse_varint::<u32>(input)?;
530 let mut dict_streams = [None, None, None, None, None];
531 let mut streams_taken = 0_usize;
532 while streams_taken < stream_count.as_usize() {
533 let stream;
534 (input, stream) = RawStream::from_bytes(input, parser)?;
535 let is_last = matches!(
536 stream.meta.stream_type,
537 StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
538 );
539 dict_streams[streams_taken] = Some(stream);
540 streams_taken += 1;
541 if is_last {
542 break;
543 } else if streams_taken >= dict_streams.len() {
544 return Err(UnsupportedStringStreamCount(streams_taken + 1));
545 }
546 }
547 let children;
548 (input, children) = parse_struct_children(input, column, parser)?;
549
550 let children_n = u32::try_from(children.len()).or_overflow()?;
552 let optional_n = children
553 .iter()
554 .filter(|c| c.presence.0.is_some())
555 .count()
556 .try_into()
557 .or_overflow()?;
558 let dict_n = u32::try_from(streams_taken).or_overflow()?;
559 let expected = crate::utils::checked_sum3(dict_n, children_n, optional_n)?;
560 let java_legacy = expected.checked_add(1).or_overflow()?;
563 if stream_count != expected && stream_count != java_legacy {
564 return Err(InvalidSharedDictStreamCount {
565 actual: stream_count,
566 expected,
567 });
568 }
569
570 let name = column.name.unwrap_or("");
571 let encoding = match dict_streams {
572 [Some(s1), Some(s2), None, None, None] => {
573 RawSharedDictEncoding::plain(RawPlainData::new(s1, s2)?)
574 }
575 [Some(s1), Some(s2), Some(s3), Some(s4), None] => {
576 RawSharedDictEncoding::fsst_plain(RawFsstData::new(s1, s2, s3, s4)?)
577 }
578 _ => Err(SharedDictRequiresStreams(streams_taken))?,
579 };
580 Ok((
581 input,
582 RawProperty::SharedDict(RawSharedDict {
583 name,
584 encoding,
585 children,
586 }),
587 ))
588}
589
590fn parse_columns_meta<'a>(
591 mut input: &'a [u8],
592 column_count: u32,
593 parser: &mut Parser,
594) -> MltRefResult<'a, (Vec<Column<'a>>, u32)> {
595 use crate::decoder::ColumnType::{Geometry, Id, LongId, OptId, OptLongId, SharedDict};
596
597 let mut col_info = Vec::with_capacity(column_count.as_usize());
598 let mut geometries = 0;
599 let mut ids = 0;
600 for _ in 0..column_count {
601 let mut typ;
602 (input, typ) = Column::from_bytes(input, parser)?;
603 match typ.typ {
604 Geometry => geometries += 1,
605 Id | OptId | LongId | OptLongId => ids += 1,
606 SharedDict => {
607 let child_column_count;
609 (input, child_column_count) = parse_varint::<u32>(input)?;
610
611 let child_col_capacity = child_column_count.as_usize();
613 if input.len() < child_col_capacity {
614 return Err(BufferUnderflow(child_column_count, input.len()));
615 }
616 let mut children = Vec::with_capacity(child_col_capacity);
617 for _ in 0..child_column_count {
618 let child;
619 (input, child) = Column::from_bytes(input, parser)?;
620 children.push(child);
621 }
622 typ.children = children;
623 }
624 _ => {}
625 }
626 col_info.push(typ);
627 }
628 if geometries > 1 {
629 return Err(MultipleGeometryColumns);
630 }
631 if ids > 1 {
632 return Err(MultipleIdColumns);
633 }
634
635 Ok((input, (col_info, column_count - geometries - ids)))
636}
637
638fn scalar<'a>(name: &'a str, opt: Option<RawStream<'a>>, value: RawStream<'a>) -> RawScalar<'a> {
639 RawScalar {
640 name,
641 presence: RawPresence(opt),
642 data: value,
643 }
644}