1use crate::checksum::compute_xxh64;
8use crate::error::{CrousError, Result};
9use crate::header::{FileHeader, HEADER_SIZE};
10use crate::limits::Limits;
11use crate::value::{CrousValue, Value};
12use crate::varint::{decode_signed_varint, decode_varint};
13use crate::wire::{BlockType, CompressionType, WireType};
14
15pub struct Decoder<'a> {
30 data: &'a [u8],
32 pos: usize,
34 header: Option<FileHeader>,
36 limits: Limits,
38 depth: usize,
40 current_block: Option<(usize, usize)>,
42 block_pos: usize,
44 memory_used: usize,
46 str_slices: Vec<&'a str>,
48 owned_strings: Vec<String>,
50 dict_preloaded: bool,
53 decompressed_buf: Option<Vec<u8>>,
56}
57
58impl<'a> Decoder<'a> {
59 pub fn new(data: &'a [u8]) -> Self {
61 Self {
62 data,
63 pos: 0,
64 header: None,
65 limits: Limits::default(),
66 depth: 0,
67 current_block: None,
68 block_pos: 0,
69 memory_used: 0,
70 str_slices: Vec::new(),
71 owned_strings: Vec::new(),
72 dict_preloaded: false,
73 decompressed_buf: None,
74 }
75 }
76
77 pub fn with_limits(data: &'a [u8], limits: Limits) -> Self {
79 Self {
80 limits,
81 ..Self::new(data)
82 }
83 }
84
85 fn track_alloc(&mut self, bytes: usize) -> Result<()> {
87 self.memory_used = self.memory_used.saturating_add(bytes);
88 if self.memory_used > self.limits.max_memory {
89 return Err(CrousError::MemoryLimitExceeded(
90 self.memory_used,
91 self.limits.max_memory,
92 ));
93 }
94 Ok(())
95 }
96
97 pub fn skip_value_at(&mut self, block_end: usize) -> Result<()> {
104 if self.block_pos >= block_end {
105 return Err(CrousError::UnexpectedEof(self.block_pos));
106 }
107
108 let tag = self.data[self.block_pos];
109 self.block_pos += 1;
110
111 let wire_type = WireType::from_tag(tag).ok_or(CrousError::InvalidWireType(tag))?;
112
113 match wire_type {
114 WireType::Null => {} WireType::EndObject | WireType::EndArray => {} WireType::Bool => {
118 if self.block_pos >= block_end {
119 return Err(CrousError::UnexpectedEof(self.block_pos));
120 }
121 self.block_pos += 1;
122 }
123
124 WireType::VarUInt | WireType::VarInt | WireType::Reference => {
125 if self.block_pos >= block_end {
126 return Err(CrousError::UnexpectedEof(self.block_pos));
127 }
128 let (_val, consumed) = decode_varint(self.data, self.block_pos)?;
129 if self.block_pos + consumed > block_end {
130 return Err(CrousError::UnexpectedEof(self.block_pos + consumed));
131 }
132 self.block_pos += consumed;
133 }
134
135 WireType::Fixed64 => {
136 if self.block_pos + 8 > block_end {
137 return Err(CrousError::UnexpectedEof(self.block_pos));
138 }
139 self.block_pos += 8;
140 }
141
142 WireType::LenDelimited => {
143 if self.block_pos >= block_end {
144 return Err(CrousError::UnexpectedEof(self.block_pos));
145 }
146 self.block_pos += 1; let (len, consumed) = decode_varint(self.data, self.block_pos)?;
148 self.block_pos += consumed;
149 let len = len as usize;
150 if self.block_pos + len > block_end {
151 return Err(CrousError::UnexpectedEof(self.block_pos + len));
152 }
153 self.block_pos += len;
154 }
155
156 WireType::StartArray => {
157 let (count, consumed) = decode_varint(self.data, self.block_pos)?;
158 self.block_pos += consumed;
159 let count = count as usize;
160 if count > self.limits.max_items {
162 return Err(CrousError::TooManyItems(count, self.limits.max_items));
163 }
164 self.depth += 1;
165 if self.depth > self.limits.max_nesting_depth {
166 self.depth -= 1;
167 return Err(CrousError::NestingTooDeep(
168 self.depth,
169 self.limits.max_nesting_depth,
170 ));
171 }
172 for _ in 0..count {
173 self.skip_value_at(block_end)?;
174 }
175 self.depth -= 1;
176 if self.block_pos < block_end
178 && self.data[self.block_pos] == WireType::EndArray.to_tag()
179 {
180 self.block_pos += 1;
181 }
182 }
183
184 WireType::StartObject => {
185 let (count, consumed) = decode_varint(self.data, self.block_pos)?;
186 self.block_pos += consumed;
187 let count = count as usize;
188 if count > self.limits.max_items {
190 return Err(CrousError::TooManyItems(count, self.limits.max_items));
191 }
192 self.depth += 1;
193 if self.depth > self.limits.max_nesting_depth {
194 self.depth -= 1;
195 return Err(CrousError::NestingTooDeep(
196 self.depth,
197 self.limits.max_nesting_depth,
198 ));
199 }
200 for _ in 0..count {
201 let (key_len, kc) = decode_varint(self.data, self.block_pos)?;
203 self.block_pos += kc;
204 let key_len = key_len as usize;
205 if self.block_pos + key_len > block_end {
206 return Err(CrousError::UnexpectedEof(self.block_pos + key_len));
207 }
208 self.block_pos += key_len;
209 self.skip_value_at(block_end)?;
211 }
212 self.depth -= 1;
213 if self.block_pos < block_end
215 && self.data[self.block_pos] == WireType::EndObject.to_tag()
216 {
217 self.block_pos += 1;
218 }
219 }
220 }
221
222 Ok(())
223 }
224
225 fn ensure_header(&mut self) -> Result<()> {
227 if self.header.is_none() {
228 let hdr = FileHeader::decode(self.data)?;
229 self.header = Some(hdr);
230 self.pos = HEADER_SIZE;
231 }
232 Ok(())
233 }
234
235 pub fn header(&mut self) -> Result<&FileHeader> {
237 self.ensure_header()?;
238 Ok(self.header.as_ref().unwrap())
239 }
240
241 fn read_next_block(&mut self) -> Result<Option<(BlockType, usize, usize)>> {
253 self.ensure_header()?;
254
255 loop {
256 if self.pos >= self.data.len() {
257 return Ok(None);
258 }
259
260 let block_type_byte = self.data[self.pos];
262 self.pos += 1;
263
264 let block_type = BlockType::from_byte(block_type_byte)
265 .ok_or(CrousError::InvalidBlockType(block_type_byte))?;
266
267 if block_type == BlockType::Trailer {
268 return Ok(None); }
270
271 let (block_len, varint_bytes) = decode_varint(self.data, self.pos)?;
272 self.pos += varint_bytes;
273 let block_len = block_len as usize;
274
275 if block_len > self.limits.max_block_size {
276 return Err(CrousError::BlockTooLarge(
277 block_len,
278 self.limits.max_block_size,
279 ));
280 }
281
282 let comp_byte = self.data[self.pos];
283 self.pos += 1;
284 let comp_type = CompressionType::from_byte(comp_byte)
285 .ok_or(CrousError::UnknownCompression(comp_byte))?;
286
287 if self.pos + 8 > self.data.len() {
289 return Err(CrousError::UnexpectedEof(self.pos));
290 }
291 let expected_checksum =
292 u64::from_le_bytes(self.data[self.pos..self.pos + 8].try_into().unwrap());
293 self.pos += 8;
294
295 let payload_start = self.pos;
297 let payload_end = self.pos + block_len;
298 if payload_end > self.data.len() {
299 return Err(CrousError::UnexpectedEof(payload_end));
300 }
301
302 self.pos = payload_end;
303
304 if block_type == BlockType::StringDict {
305 let actual_checksum = compute_xxh64(&self.data[payload_start..payload_end]);
307 if actual_checksum != expected_checksum {
308 return Err(CrousError::ChecksumMismatch {
309 expected: expected_checksum,
310 actual: actual_checksum,
311 });
312 }
313 self.decode_string_dict_block(&self.data[payload_start..payload_end])?;
314 continue; }
316
317 if comp_type != CompressionType::None {
318 let wire_payload = &self.data[payload_start..payload_end];
320 let decompressed = self.decompress_block(comp_type, wire_payload)?;
321
322 let actual_checksum = compute_xxh64(&decompressed);
324 if actual_checksum != expected_checksum {
325 return Err(CrousError::ChecksumMismatch {
326 expected: expected_checksum,
327 actual: actual_checksum,
328 });
329 }
330
331 let len = decompressed.len();
332 self.decompressed_buf = Some(decompressed);
333 return Ok(Some((block_type, 0, len)));
334 } else {
335 let actual_checksum = compute_xxh64(&self.data[payload_start..payload_end]);
337 if actual_checksum != expected_checksum {
338 return Err(CrousError::ChecksumMismatch {
339 expected: expected_checksum,
340 actual: actual_checksum,
341 });
342 }
343
344 self.decompressed_buf = None;
345 return Ok(Some((block_type, payload_start, payload_end)));
346 }
347 }
348 }
349
350 fn decode_string_dict_block(&mut self, payload: &[u8]) -> Result<()> {
362 let mut pos = 0;
363 let (count, consumed) = decode_varint(payload, pos)?;
364 pos += consumed;
365 let count = count as usize;
366
367 if count > self.limits.max_items {
368 return Err(CrousError::TooManyItems(count, self.limits.max_items));
369 }
370
371 let mut entries: Vec<(usize, String)> = Vec::with_capacity(count.min(4096));
373 let mut prev = String::new();
374
375 for _ in 0..count {
376 let (original_idx, c1) = decode_varint(payload, pos)?;
377 pos += c1;
378 let original_idx = original_idx as usize;
379
380 if original_idx >= count {
382 return Err(CrousError::InvalidData(format!(
383 "StringDict entry has original_idx {original_idx} >= count {count}"
384 )));
385 }
386
387 let (prefix_len, c2) = decode_varint(payload, pos)?;
388 pos += c2;
389 let prefix_len = prefix_len as usize;
390
391 let (suffix_len, c3) = decode_varint(payload, pos)?;
392 pos += c3;
393 let suffix_len = suffix_len as usize;
394
395 if pos.checked_add(suffix_len).is_none_or(|end| end > payload.len()) {
396 return Err(CrousError::UnexpectedEof(pos));
397 }
398 let suffix = &payload[pos..pos + suffix_len];
399 pos += suffix_len;
400
401 let prefix_end = prefix_len.min(prev.len());
403 let prefix_end = prev.floor_char_boundary(prefix_end);
406 let mut full = String::with_capacity(prefix_end + suffix_len);
407 full.push_str(&prev[..prefix_end]);
408 full.push_str(std::str::from_utf8(suffix).map_err(|_| CrousError::InvalidUtf8(pos))?);
409
410 prev.clone_from(&full);
411 entries.push((original_idx, full));
412 }
413
414 let max_idx = entries.iter().map(|(idx, _)| *idx).max().unwrap_or(0);
416 let table_size = max_idx + 1;
417
418 self.owned_strings.clear();
420 self.owned_strings.resize(table_size, String::new());
421 for (idx, s) in &entries {
422 if *idx < table_size {
423 self.owned_strings[*idx].clone_from(s);
424 }
425 }
426
427 self.dict_preloaded = true;
428 Ok(())
429 }
430
431 #[allow(unused_variables)]
436 fn decompress_block(&self, comp_type: CompressionType, wire: &[u8]) -> Result<Vec<u8>> {
437 let (uncomp_len, prefix_consumed) = decode_varint(wire, 0)?;
439 let uncomp_len = uncomp_len as usize;
440
441 if uncomp_len > self.limits.max_block_size {
442 return Err(CrousError::BlockTooLarge(
443 uncomp_len,
444 self.limits.max_block_size,
445 ));
446 }
447
448 let compressed = &wire[prefix_consumed..];
449
450 match comp_type {
451 CompressionType::None => Ok(compressed.to_vec()),
452 CompressionType::Zstd => {
453 #[cfg(feature = "zstd")]
454 {
455 zstd::decode_all(std::io::Cursor::new(compressed))
456 .map_err(|e| CrousError::DecompressionError(format!("zstd: {e}")))
457 }
458 #[cfg(not(feature = "zstd"))]
459 {
460 Err(CrousError::DecompressionError(
461 "zstd decompression not available (enable 'zstd' feature)".into(),
462 ))
463 }
464 }
465 CompressionType::Snappy => {
466 #[cfg(feature = "snappy")]
467 {
468 let mut dec = snap::raw::Decoder::new();
469 dec.decompress_vec(compressed)
470 .map_err(|e| CrousError::DecompressionError(format!("snappy: {e}")))
471 }
472 #[cfg(not(feature = "snappy"))]
473 {
474 Err(CrousError::DecompressionError(
475 "snappy decompression not available (enable 'snappy' feature)".into(),
476 ))
477 }
478 }
479 CompressionType::Lz4 => {
480 #[cfg(feature = "lz4")]
481 {
482 lz4_flex::decompress_size_prepended(compressed)
484 .map_err(|e| CrousError::DecompressionError(format!("lz4: {e}")))
485 }
486 #[cfg(not(feature = "lz4"))]
487 {
488 Err(CrousError::DecompressionError(
489 "lz4 decompression not available (enable 'lz4' feature)".into(),
490 ))
491 }
492 }
493 }
494 }
495
496 #[inline]
501 fn block_data(&self) -> &[u8] {
502 self.decompressed_buf.as_deref().unwrap_or(self.data)
503 }
504
505 pub fn decode_next(&mut self) -> Result<CrousValue<'a>> {
514 if self.current_block.is_none() {
516 match self.read_next_block()? {
517 Some((BlockType::Data, start, end)) => {
518 if self.decompressed_buf.is_some() {
521 return Err(CrousError::DecompressionError(
522 "zero-copy decode_next() cannot borrow from decompressed block; \
523 use decode_all_owned() or decode_next_owned() instead"
524 .into(),
525 ));
526 }
527 self.current_block = Some((start, end));
528 self.block_pos = start;
529 self.str_slices.clear();
532 self.dict_preloaded = false;
533 }
534 Some(_) => {
535 return self.decode_next();
537 }
538 None => {
539 return Err(CrousError::UnexpectedEof(self.pos));
540 }
541 }
542 }
543
544 let (block_start, block_end) = self.current_block.unwrap();
545 let _ = block_start;
546
547 if self.block_pos >= block_end {
548 self.current_block = None;
550 return self.decode_next();
551 }
552
553 self.decode_value_at(block_end)
554 }
555
556 pub fn decode_next_owned(&mut self) -> Result<Value> {
561 if self.current_block.is_none() {
563 match self.read_next_block()? {
564 Some((BlockType::Data, start, end)) => {
565 self.current_block = Some((start, end));
566 self.block_pos = start;
567 self.str_slices.clear();
568 if !self.dict_preloaded {
570 self.owned_strings.clear();
571 }
572 self.dict_preloaded = false;
574 }
575 Some(_) => {
576 return self.decode_next_owned();
577 }
578 None => {
579 return Err(CrousError::UnexpectedEof(self.pos));
580 }
581 }
582 }
583
584 let (_block_start, block_end) = self.current_block.unwrap();
585
586 if self.block_pos >= block_end {
587 self.current_block = None;
588 return self.decode_next_owned();
589 }
590
591 self.decode_value_owned_at(block_end)
592 }
593
594 fn decode_value_owned_at(&mut self, block_end: usize) -> Result<Value> {
598 let data = self.block_data();
599 if self.block_pos >= block_end {
600 return Err(CrousError::UnexpectedEof(self.block_pos));
601 }
602
603 let tag = data[self.block_pos];
604 self.block_pos += 1;
605
606 let wire_type = WireType::from_tag(tag).ok_or(CrousError::InvalidWireType(tag))?;
607
608 match wire_type {
609 WireType::Null => Ok(Value::Null),
610
611 WireType::Bool => {
612 if self.block_pos >= block_end {
613 return Err(CrousError::UnexpectedEof(self.block_pos));
614 }
615 let data = self.block_data();
616 let b = data[self.block_pos] != 0;
617 self.block_pos += 1;
618 Ok(Value::Bool(b))
619 }
620
621 WireType::VarUInt => {
622 let data = self.block_data();
623 let (val, consumed) = decode_varint(data, self.block_pos)?;
624 self.block_pos += consumed;
625 Ok(Value::UInt(val))
626 }
627
628 WireType::VarInt => {
629 let data = self.block_data();
630 let (val, consumed) = decode_signed_varint(data, self.block_pos)?;
631 self.block_pos += consumed;
632 Ok(Value::Int(val))
633 }
634
635 WireType::Fixed64 => {
636 let data = self.block_data();
637 if self.block_pos + 8 > block_end {
638 return Err(CrousError::UnexpectedEof(self.block_pos));
639 }
640 let bytes: [u8; 8] = data[self.block_pos..self.block_pos + 8].try_into().unwrap();
641 self.block_pos += 8;
642 Ok(Value::Float(f64::from_le_bytes(bytes)))
643 }
644
645 WireType::LenDelimited => {
646 let data = self.block_data();
647 if self.block_pos >= block_end {
648 return Err(CrousError::UnexpectedEof(self.block_pos));
649 }
650 let sub_type = data[self.block_pos];
651 self.block_pos += 1;
652
653 let data = self.block_data();
654 let (len, consumed) = decode_varint(data, self.block_pos)?;
655 self.block_pos += consumed;
656 let len = len as usize;
657
658 if len > self.limits.max_string_length {
659 return Err(CrousError::MemoryLimitExceeded(
660 len,
661 self.limits.max_string_length,
662 ));
663 }
664 self.track_alloc(len)?;
665 if self.block_pos + len > block_end {
666 return Err(CrousError::UnexpectedEof(self.block_pos + len));
667 }
668
669 let data = self.block_data();
670 let payload_slice = data[self.block_pos..self.block_pos + len].to_vec();
671 self.block_pos += len;
672
673 match sub_type {
674 0x00 => {
675 let s = std::str::from_utf8(&payload_slice)
676 .map_err(|_| CrousError::InvalidUtf8(self.block_pos - len))?
677 .to_string();
678 self.owned_strings.push(s.clone());
680 Ok(Value::Str(s))
681 }
682 0x01 => Ok(Value::Bytes(payload_slice)),
683 _ => Ok(Value::Bytes(payload_slice)),
684 }
685 }
686
687 WireType::StartArray => {
688 if self.depth >= self.limits.max_nesting_depth {
689 return Err(CrousError::NestingTooDeep(
690 self.depth,
691 self.limits.max_nesting_depth,
692 ));
693 }
694 let data = self.block_data();
695 let (count, consumed) = decode_varint(data, self.block_pos)?;
696 self.block_pos += consumed;
697 let count = count as usize;
698
699 if count > self.limits.max_items {
700 return Err(CrousError::TooManyItems(count, self.limits.max_items));
701 }
702
703 self.depth += 1;
704 let mut items = Vec::with_capacity(count.min(1024));
705 for _ in 0..count {
706 items.push(self.decode_value_owned_at(block_end)?);
707 }
708 self.depth -= 1;
709
710 let data = self.block_data();
711 if self.block_pos < block_end && data[self.block_pos] == WireType::EndArray.to_tag()
712 {
713 self.block_pos += 1;
714 }
715
716 Ok(Value::Array(items))
717 }
718
719 WireType::StartObject => {
720 if self.depth >= self.limits.max_nesting_depth {
721 return Err(CrousError::NestingTooDeep(
722 self.depth,
723 self.limits.max_nesting_depth,
724 ));
725 }
726 let data = self.block_data();
727 let (count, consumed) = decode_varint(data, self.block_pos)?;
728 self.block_pos += consumed;
729 let count = count as usize;
730
731 if count > self.limits.max_items {
732 return Err(CrousError::TooManyItems(count, self.limits.max_items));
733 }
734
735 self.depth += 1;
736 let mut entries = Vec::with_capacity(count.min(1024));
737 for _ in 0..count {
738 let data = self.block_data();
739 let (key_len, kc) = decode_varint(data, self.block_pos)?;
740 self.block_pos += kc;
741 let key_len = key_len as usize;
742
743 if self.block_pos + key_len > block_end {
744 return Err(CrousError::UnexpectedEof(self.block_pos + key_len));
745 }
746 let data = self.block_data();
747 let key_bytes = data[self.block_pos..self.block_pos + key_len].to_vec();
748 let key = std::str::from_utf8(&key_bytes)
749 .map_err(|_| CrousError::InvalidUtf8(self.block_pos))?
750 .to_string();
751 self.block_pos += key_len;
752
753 let val = self.decode_value_owned_at(block_end)?;
754 entries.push((key, val));
755 }
756 self.depth -= 1;
757
758 let data = self.block_data();
759 if self.block_pos < block_end
760 && data[self.block_pos] == WireType::EndObject.to_tag()
761 {
762 self.block_pos += 1;
763 }
764
765 Ok(Value::Object(entries))
766 }
767
768 WireType::EndObject | WireType::EndArray => Err(CrousError::InvalidWireType(tag)),
769
770 WireType::Reference => {
771 let data = self.block_data();
772 let (ref_id, consumed) = decode_varint(data, self.block_pos)?;
773 self.block_pos += consumed;
774 let ref_id = ref_id as usize;
775
776 if let Some(s) = self.owned_strings.get(ref_id) {
778 Ok(Value::Str(s.clone()))
779 } else {
780 Ok(Value::UInt(ref_id as u64))
782 }
783 }
784 }
785 }
786
787 fn decode_value_at(&mut self, block_end: usize) -> Result<CrousValue<'a>> {
789 if self.block_pos >= block_end {
790 return Err(CrousError::UnexpectedEof(self.block_pos));
791 }
792
793 let tag = self.data[self.block_pos];
794 self.block_pos += 1;
795
796 let wire_type = WireType::from_tag(tag).ok_or(CrousError::InvalidWireType(tag))?;
797
798 match wire_type {
799 WireType::Null => Ok(CrousValue::Null),
800
801 WireType::Bool => {
802 if self.block_pos >= block_end {
803 return Err(CrousError::UnexpectedEof(self.block_pos));
804 }
805 let b = self.data[self.block_pos] != 0;
806 self.block_pos += 1;
807 Ok(CrousValue::Bool(b))
808 }
809
810 WireType::VarUInt => {
811 let (val, consumed) = decode_varint(self.data, self.block_pos)?;
812 self.block_pos += consumed;
813 Ok(CrousValue::UInt(val))
814 }
815
816 WireType::VarInt => {
817 let (val, consumed) = decode_signed_varint(self.data, self.block_pos)?;
818 self.block_pos += consumed;
819 Ok(CrousValue::Int(val))
820 }
821
822 WireType::Fixed64 => {
823 if self.block_pos + 8 > block_end {
824 return Err(CrousError::UnexpectedEof(self.block_pos));
825 }
826 let bytes: [u8; 8] = self.data[self.block_pos..self.block_pos + 8]
827 .try_into()
828 .unwrap();
829 self.block_pos += 8;
830 Ok(CrousValue::Float(f64::from_le_bytes(bytes)))
831 }
832
833 WireType::LenDelimited => {
834 if self.block_pos >= block_end {
835 return Err(CrousError::UnexpectedEof(self.block_pos));
836 }
837 let sub_type = self.data[self.block_pos];
838 self.block_pos += 1;
839
840 let (len, consumed) = decode_varint(self.data, self.block_pos)?;
841 self.block_pos += consumed;
842 let len = len as usize;
843
844 if len > self.limits.max_string_length {
845 return Err(CrousError::MemoryLimitExceeded(
846 len,
847 self.limits.max_string_length,
848 ));
849 }
850 self.track_alloc(len)?;
851 if self.block_pos + len > block_end {
852 return Err(CrousError::UnexpectedEof(self.block_pos + len));
853 }
854
855 let payload = &self.data[self.block_pos..self.block_pos + len];
856 self.block_pos += len;
857
858 match sub_type {
859 0x00 => {
860 let s = std::str::from_utf8(payload)
862 .map_err(|_| CrousError::InvalidUtf8(self.block_pos - len))?;
863 self.str_slices.push(s);
865 Ok(CrousValue::Str(s))
866 }
867 0x01 => {
868 Ok(CrousValue::Bytes(payload))
870 }
871 _ => {
872 Ok(CrousValue::Bytes(payload))
874 }
875 }
876 }
877
878 WireType::StartArray => {
879 if self.depth >= self.limits.max_nesting_depth {
880 return Err(CrousError::NestingTooDeep(
881 self.depth,
882 self.limits.max_nesting_depth,
883 ));
884 }
885 let (count, consumed) = decode_varint(self.data, self.block_pos)?;
886 self.block_pos += consumed;
887 let count = count as usize;
888
889 if count > self.limits.max_items {
890 return Err(CrousError::TooManyItems(count, self.limits.max_items));
891 }
892
893 let alloc = count.min(1024) * std::mem::size_of::<CrousValue>();
894 self.track_alloc(alloc)?;
895
896 self.depth += 1;
897 let mut items = Vec::with_capacity(count.min(1024)); for _ in 0..count {
899 items.push(self.decode_value_at(block_end)?);
900 }
901 self.depth -= 1;
902
903 if self.block_pos < block_end
905 && self.data[self.block_pos] == WireType::EndArray.to_tag()
906 {
907 self.block_pos += 1;
908 }
909
910 Ok(CrousValue::Array(items))
911 }
912
913 WireType::StartObject => {
914 if self.depth >= self.limits.max_nesting_depth {
915 return Err(CrousError::NestingTooDeep(
916 self.depth,
917 self.limits.max_nesting_depth,
918 ));
919 }
920 let (count, consumed) = decode_varint(self.data, self.block_pos)?;
921 self.block_pos += consumed;
922 let count = count as usize;
923
924 if count > self.limits.max_items {
925 return Err(CrousError::TooManyItems(count, self.limits.max_items));
926 }
927
928 let alloc = count.min(1024)
929 * (std::mem::size_of::<&str>() + std::mem::size_of::<CrousValue>());
930 self.track_alloc(alloc)?;
931
932 self.depth += 1;
933 let mut entries = Vec::with_capacity(count.min(1024));
934 for _ in 0..count {
935 let (key_len, kc) = decode_varint(self.data, self.block_pos)?;
937 self.block_pos += kc;
938 let key_len = key_len as usize;
939
940 if self.block_pos + key_len > block_end {
941 return Err(CrousError::UnexpectedEof(self.block_pos + key_len));
942 }
943 let key =
944 std::str::from_utf8(&self.data[self.block_pos..self.block_pos + key_len])
945 .map_err(|_| CrousError::InvalidUtf8(self.block_pos))?;
946 self.block_pos += key_len;
947
948 let val = self.decode_value_at(block_end)?;
950 entries.push((key, val));
951 }
952 self.depth -= 1;
953
954 if self.block_pos < block_end
956 && self.data[self.block_pos] == WireType::EndObject.to_tag()
957 {
958 self.block_pos += 1;
959 }
960
961 Ok(CrousValue::Object(entries))
962 }
963
964 WireType::EndObject | WireType::EndArray => {
965 Err(CrousError::InvalidWireType(tag))
967 }
968
969 WireType::Reference => {
970 let (ref_id, consumed) = decode_varint(self.data, self.block_pos)?;
972 self.block_pos += consumed;
973 let ref_id = ref_id as usize;
974
975 if let Some(&s) = self.str_slices.get(ref_id) {
977 Ok(CrousValue::Str(s))
978 } else {
979 Ok(CrousValue::UInt(ref_id as u64))
981 }
982 }
983 }
984 }
985
986 pub fn decode_all(&mut self) -> Result<Vec<CrousValue<'a>>> {
988 let mut values = Vec::new();
989 loop {
990 match self.decode_next() {
991 Ok(v) => values.push(v),
992 Err(CrousError::UnexpectedEof(_)) => break,
993 Err(e) => return Err(e),
994 }
995 }
996 Ok(values)
997 }
998
999 pub fn decode_all_owned(&mut self) -> Result<Vec<Value>> {
1003 let mut values = Vec::new();
1004 loop {
1005 match self.decode_next_owned() {
1006 Ok(v) => values.push(v),
1007 Err(CrousError::UnexpectedEof(_)) => break,
1008 Err(e) => return Err(e),
1009 }
1010 }
1011 Ok(values)
1012 }
1013
1014 pub fn position(&self) -> usize {
1016 self.pos
1017 }
1018
1019 pub fn memory_used(&self) -> usize {
1021 self.memory_used
1022 }
1023}
1024
1025#[cfg(feature = "fast-alloc")]
1051pub struct BumpDecoder<'a> {
1052 decoder: Decoder<'a>,
1053 arena: bumpalo::Bump,
1054}
1055
1056#[cfg(feature = "fast-alloc")]
1057impl<'a> BumpDecoder<'a> {
1058 pub fn new(data: &'a [u8]) -> Self {
1060 Self {
1061 decoder: Decoder::new(data),
1062 arena: bumpalo::Bump::with_capacity(4096),
1063 }
1064 }
1065
1066 pub fn with_limits(data: &'a [u8], limits: Limits) -> Self {
1068 Self {
1069 decoder: Decoder::with_limits(data, limits),
1070 arena: bumpalo::Bump::with_capacity(4096),
1071 }
1072 }
1073
1074 pub fn with_capacity(data: &'a [u8], arena_capacity: usize) -> Self {
1076 Self {
1077 decoder: Decoder::new(data),
1078 arena: bumpalo::Bump::with_capacity(arena_capacity),
1079 }
1080 }
1081
1082 pub fn reset_arena(&mut self) {
1085 self.arena.reset();
1086 }
1087
1088 pub fn arena_allocated(&self) -> usize {
1090 self.arena.allocated_bytes()
1091 }
1092
1093 pub fn alloc_bytes(&self, src: &[u8]) -> &[u8] {
1095 self.arena.alloc_slice_copy(src)
1096 }
1097
1098 pub fn alloc_str(&self, s: &str) -> &str {
1100 self.arena.alloc_str(s)
1101 }
1102
1103 pub fn decode_next(&mut self) -> Result<CrousValue<'a>> {
1105 self.decoder.decode_next()
1106 }
1107
1108 pub fn decode_all(&mut self) -> Result<Vec<CrousValue<'a>>> {
1110 self.decoder.decode_all()
1111 }
1112
1113 pub fn decode_all_owned(&mut self) -> Result<Vec<Value>> {
1115 self.decoder.decode_all_owned()
1116 }
1117
1118 pub fn inner(&self) -> &Decoder<'a> {
1120 &self.decoder
1121 }
1122
1123 pub fn inner_mut(&mut self) -> &mut Decoder<'a> {
1125 &mut self.decoder
1126 }
1127
1128 pub fn memory_used(&self) -> usize {
1130 self.decoder.memory_used()
1131 }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 use super::*;
1137 use crate::encoder::Encoder;
1138
1139 fn roundtrip(value: &Value) -> Value {
1141 let mut enc = Encoder::new();
1142 enc.encode_value(value).unwrap();
1143 let bytes = enc.finish().unwrap();
1144 let mut dec = Decoder::new(&bytes);
1145 dec.decode_next().unwrap().to_owned_value()
1146 }
1147
1148 #[test]
1149 fn roundtrip_null() {
1150 assert_eq!(roundtrip(&Value::Null), Value::Null);
1151 }
1152
1153 #[test]
1154 fn roundtrip_bool() {
1155 assert_eq!(roundtrip(&Value::Bool(true)), Value::Bool(true));
1156 assert_eq!(roundtrip(&Value::Bool(false)), Value::Bool(false));
1157 }
1158
1159 #[test]
1160 fn roundtrip_uint() {
1161 for &v in &[0u64, 1, 127, 128, 300, 65535, u64::MAX] {
1162 assert_eq!(
1163 roundtrip(&Value::UInt(v)),
1164 Value::UInt(v),
1165 "uint roundtrip failed for {v}"
1166 );
1167 }
1168 }
1169
1170 #[test]
1171 fn roundtrip_int() {
1172 for &v in &[0i64, 1, -1, 127, -128, 1000, -1000, i64::MAX, i64::MIN] {
1173 assert_eq!(
1174 roundtrip(&Value::Int(v)),
1175 Value::Int(v),
1176 "int roundtrip failed for {v}"
1177 );
1178 }
1179 }
1180
1181 #[test]
1182 fn roundtrip_float() {
1183 for &v in &[0.0f64, 1.0, -1.0, 3.125, f64::MAX, f64::MIN, f64::INFINITY] {
1184 assert_eq!(
1185 roundtrip(&Value::Float(v)),
1186 Value::Float(v),
1187 "float roundtrip failed for {v}"
1188 );
1189 }
1190 }
1191
1192 #[test]
1193 fn roundtrip_string() {
1194 let long_str = "a".repeat(1000);
1195 for s in &["", "hello", "こんにちは", long_str.as_str()] {
1196 assert_eq!(
1197 roundtrip(&Value::Str(s.to_string())),
1198 Value::Str(s.to_string()),
1199 "string roundtrip failed for {s:?}"
1200 );
1201 }
1202 }
1203
1204 #[test]
1205 fn roundtrip_bytes() {
1206 let data = vec![0xDE, 0xAD, 0xBE, 0xEF];
1207 assert_eq!(roundtrip(&Value::Bytes(data.clone())), Value::Bytes(data));
1208 }
1209
1210 #[test]
1211 fn roundtrip_array() {
1212 let arr = Value::Array(vec![
1213 Value::UInt(1),
1214 Value::Str("two".into()),
1215 Value::Bool(true),
1216 Value::Null,
1217 ]);
1218 assert_eq!(roundtrip(&arr), arr);
1219 }
1220
1221 #[test]
1222 fn roundtrip_object() {
1223 let obj = Value::Object(vec![
1224 ("name".into(), Value::Str("Alice".into())),
1225 ("age".into(), Value::UInt(30)),
1226 ("active".into(), Value::Bool(true)),
1227 ]);
1228 assert_eq!(roundtrip(&obj), obj);
1229 }
1230
1231 #[test]
1232 fn roundtrip_nested() {
1233 let val = Value::Object(vec![
1234 (
1235 "users".into(),
1236 Value::Array(vec![Value::Object(vec![
1237 ("name".into(), Value::Str("Bob".into())),
1238 (
1239 "scores".into(),
1240 Value::Array(vec![Value::UInt(100), Value::UInt(95), Value::UInt(87)]),
1241 ),
1242 ])]),
1243 ),
1244 ("count".into(), Value::UInt(1)),
1245 ]);
1246 assert_eq!(roundtrip(&val), val);
1247 }
1248
1249 #[test]
1250 fn checksum_verification() {
1251 let mut enc = Encoder::new();
1252 enc.encode_value(&Value::UInt(42)).unwrap();
1253 let mut bytes = enc.finish().unwrap();
1254
1255 let corrupt_pos = HEADER_SIZE + 12; if corrupt_pos < bytes.len() {
1258 bytes[corrupt_pos] ^= 0xFF;
1259 }
1260
1261 let mut dec = Decoder::new(&bytes);
1262 assert!(dec.decode_next().is_err());
1263 }
1264
1265 #[test]
1266 fn nesting_depth_limit() {
1267 let limits = Limits {
1268 max_nesting_depth: 2,
1269 ..Limits::default()
1270 };
1271 let val = Value::Array(vec![Value::Array(vec![Value::Array(vec![])])]);
1273 let mut enc = Encoder::with_limits(Limits::unlimited());
1274 enc.encode_value(&val).unwrap();
1275 let bytes = enc.finish().unwrap();
1276
1277 let mut dec = Decoder::with_limits(&bytes, limits);
1278 assert!(dec.decode_next().is_err());
1279 }
1280
1281 #[test]
1282 fn memory_tracking() {
1283 let big_str = "x".repeat(1000);
1285 let val = Value::Str(big_str);
1286 let mut enc = Encoder::new();
1287 enc.encode_value(&val).unwrap();
1288 let bytes = enc.finish().unwrap();
1289
1290 let mut dec = Decoder::new(&bytes);
1291 let _ = dec.decode_next().unwrap();
1292 assert!(
1293 dec.memory_used() >= 1000,
1294 "memory should track string allocation"
1295 );
1296 }
1297
1298 #[test]
1299 fn memory_limit_enforcement() {
1300 let big_str = "x".repeat(1000);
1301 let val = Value::Str(big_str);
1302 let mut enc = Encoder::new();
1303 enc.encode_value(&val).unwrap();
1304 let bytes = enc.finish().unwrap();
1305
1306 let limits = Limits {
1307 max_memory: 500,
1308 ..Limits::default()
1309 };
1310 let mut dec = Decoder::with_limits(&bytes, limits);
1311 assert!(
1312 dec.decode_next().is_err(),
1313 "should fail when memory limit exceeded"
1314 );
1315 }
1316
1317 #[test]
1318 fn skip_value_works() {
1319 let val = Value::Object(vec![
1321 ("name".into(), Value::Str("Alice".into())),
1322 (
1323 "scores".into(),
1324 Value::Array(vec![Value::UInt(1), Value::UInt(2)]),
1325 ),
1326 ]);
1327 let mut enc = Encoder::new();
1328 enc.encode_value(&val).unwrap();
1329 let bytes = enc.finish().unwrap();
1330
1331 let mut dec = Decoder::new(&bytes);
1333 let decoded = dec.decode_next().unwrap().to_owned_value();
1334 assert_eq!(decoded, val);
1335 }
1336
1337 #[test]
1338 fn string_dedup_roundtrip() {
1339 let val = Value::Array(vec![
1342 Value::Str("hello_world_long_string".into()),
1343 Value::Str("another_reasonably_long_string".into()),
1344 Value::Str("hello_world_long_string".into()), Value::Str("another_reasonably_long_string".into()), Value::Str("hello_world_long_string".into()), Value::Str("another_reasonably_long_string".into()), Value::Str("hello_world_long_string".into()), Value::Str("another_reasonably_long_string".into()), Value::Str("hello_world_long_string".into()), Value::Str("another_reasonably_long_string".into()), ]);
1353
1354 let mut enc = Encoder::new();
1355 enc.enable_dedup();
1356 enc.encode_value(&val).unwrap();
1357 let bytes = enc.finish().unwrap();
1358
1359 let mut enc_no_dedup = Encoder::new();
1361 enc_no_dedup.encode_value(&val).unwrap();
1362 let bytes_no_dedup = enc_no_dedup.finish().unwrap();
1363 assert!(
1364 bytes.len() < bytes_no_dedup.len(),
1365 "dedup ({}) should be smaller than no-dedup ({})",
1366 bytes.len(),
1367 bytes_no_dedup.len()
1368 );
1369
1370 let mut dec = Decoder::new(&bytes);
1372 let decoded = dec.decode_next().unwrap().to_owned_value();
1373 assert_eq!(
1374 decoded, val,
1375 "dedup roundtrip should produce identical value"
1376 );
1377 }
1378
1379 #[test]
1380 fn string_dedup_in_object() {
1381 let val = Value::Object(vec![
1383 ("greeting".into(), Value::Str("hello".into())),
1384 ("farewell".into(), Value::Str("goodbye".into())),
1385 ("echo".into(), Value::Str("hello".into())), ]);
1387
1388 let mut enc = Encoder::new();
1389 enc.enable_dedup();
1390 enc.encode_value(&val).unwrap();
1391 let bytes = enc.finish().unwrap();
1392
1393 let mut dec = Decoder::new(&bytes);
1394 let decoded = dec.decode_next().unwrap().to_owned_value();
1395 assert_eq!(decoded, val, "dedup in object should roundtrip correctly");
1396 }
1397
1398 #[cfg(feature = "fast-alloc")]
1399 #[test]
1400 fn bump_decoder_roundtrip() {
1401 let val = Value::Array(vec![
1402 Value::Str("hello".into()),
1403 Value::UInt(42),
1404 Value::Object(vec![("key".into(), Value::Bool(true))]),
1405 ]);
1406 let mut enc = Encoder::new();
1407 enc.encode_value(&val).unwrap();
1408 let bytes = enc.finish().unwrap();
1409
1410 let mut dec = BumpDecoder::new(&bytes);
1411 let decoded = dec.decode_all_owned().unwrap();
1412 assert_eq!(decoded.len(), 1);
1413 assert_eq!(decoded[0], val);
1414 assert!(dec.arena_allocated() > 0, "arena should have been used");
1415 }
1416
1417 #[cfg(feature = "lz4")]
1418 #[test]
1419 fn lz4_compression_roundtrip() {
1420 let val = Value::Object(vec![
1421 ("name".into(), Value::Str("Alice".into())),
1422 (
1423 "bio".into(),
1424 Value::Str(
1425 "A long string that is repeated many times to test compression effectiveness. "
1426 .repeat(20),
1427 ),
1428 ),
1429 ("scores".into(), Value::Array(vec![Value::UInt(100); 50])),
1430 ]);
1431 let mut enc = Encoder::new();
1432 enc.set_compression(CompressionType::Lz4);
1433 enc.encode_value(&val).unwrap();
1434 let bytes = enc.finish().unwrap();
1435
1436 let mut dec = Decoder::new(&bytes);
1438 let decoded = dec.decode_all_owned().unwrap();
1439 assert_eq!(decoded.len(), 1);
1440 assert_eq!(decoded[0], val);
1441 }
1442
1443 #[cfg(feature = "lz4")]
1444 #[test]
1445 fn lz4_zero_copy_errors_cleanly() {
1446 let val = Value::Str("A".repeat(1000));
1449 let mut enc = Encoder::new();
1450 enc.set_compression(CompressionType::Lz4);
1451 enc.encode_value(&val).unwrap();
1452 let bytes = enc.finish().unwrap();
1453
1454 let mut dec = Decoder::new(&bytes);
1456 assert!(
1457 dec.decode_next().is_err(),
1458 "zero-copy decode should fail on compressed block"
1459 );
1460
1461 let mut dec2 = Decoder::new(&bytes);
1463 let decoded = dec2.decode_all_owned().unwrap();
1464 assert_eq!(decoded.len(), 1);
1465 assert_eq!(decoded[0], val);
1466 }
1467
1468 #[test]
1469 fn decode_next_owned_uncompressed() {
1470 let val = Value::Object(vec![
1471 ("x".into(), Value::UInt(10)),
1472 ("y".into(), Value::Float(3.15)),
1473 ]);
1474 let mut enc = Encoder::new();
1475 enc.encode_value(&val).unwrap();
1476 let bytes = enc.finish().unwrap();
1477
1478 let mut dec = Decoder::new(&bytes);
1479 let decoded = dec.decode_next_owned().unwrap();
1480 assert_eq!(decoded, val);
1481 }
1482
1483 #[test]
1484 fn dedup_owned_roundtrip() {
1485 let val = Value::Array(vec![
1487 Value::Str("alpha".into()),
1488 Value::Str("beta".into()),
1489 Value::Str("alpha".into()), Value::Str("beta".into()), Value::Str("gamma".into()),
1492 Value::Str("alpha".into()), ]);
1494 let mut enc = Encoder::new();
1495 enc.enable_dedup();
1496 enc.encode_value(&val).unwrap();
1497 let bytes = enc.finish().unwrap();
1498
1499 let mut dec = Decoder::new(&bytes);
1500 let decoded = dec.decode_next_owned().unwrap();
1501 assert_eq!(decoded, val, "owned decode should resolve dedup references");
1502 }
1503
1504 #[cfg(feature = "lz4")]
1505 #[test]
1506 fn dedup_plus_lz4_roundtrip() {
1507 let val = Value::Object(vec![
1509 ("city".into(), Value::Str("Tokyo".into())),
1510 ("country".into(), Value::Str("Japan".into())),
1511 (
1512 "description".into(),
1513 Value::Str("Tokyo is the capital of Japan. ".repeat(30)),
1514 ),
1515 ("origin".into(), Value::Str("Tokyo".into())), ("nation".into(), Value::Str("Japan".into())), ]);
1518 let mut enc = Encoder::new();
1519 enc.enable_dedup();
1520 enc.set_compression(CompressionType::Lz4);
1521 enc.encode_value(&val).unwrap();
1522 let bytes = enc.finish().unwrap();
1523
1524 let mut dec = Decoder::new(&bytes);
1525 let decoded = dec.decode_all_owned().unwrap();
1526 assert_eq!(decoded.len(), 1);
1527 assert_eq!(decoded[0], val, "dedup + lz4 should roundtrip correctly");
1528 }
1529
1530 #[test]
1531 fn string_dict_block_roundtrip() {
1532 let val = Value::Array(vec![
1535 Value::Str("config_database_host".into()),
1536 Value::Str("config_database_port".into()),
1537 Value::Str("config_database_name".into()),
1538 Value::Str("config_cache_host".into()),
1539 Value::Str("config_cache_port".into()),
1540 Value::Str("config_database_host".into()),
1542 Value::Str("config_database_port".into()),
1543 Value::Str("config_cache_host".into()),
1544 Value::Str("config_database_host".into()),
1545 Value::Str("config_cache_port".into()),
1546 ]);
1547
1548 let mut enc = Encoder::new();
1549 enc.enable_dedup();
1550 enc.encode_value(&val).unwrap();
1551 let bytes = enc.finish().unwrap();
1552
1553 let mut enc_no_dedup = Encoder::new();
1555 enc_no_dedup.encode_value(&val).unwrap();
1556 let bytes_no_dedup = enc_no_dedup.finish().unwrap();
1557
1558 assert!(
1559 bytes.len() < bytes_no_dedup.len(),
1560 "dedup with StringDict ({}) should be smaller than no-dedup ({})",
1561 bytes.len(),
1562 bytes_no_dedup.len()
1563 );
1564
1565 let mut dec = Decoder::new(&bytes);
1567 let decoded = dec.decode_next().unwrap().to_owned_value();
1568 assert_eq!(
1569 decoded, val,
1570 "StringDict block roundtrip should produce identical value"
1571 );
1572
1573 let mut dec2 = Decoder::new(&bytes);
1575 let decoded2 = dec2.decode_next_owned().unwrap();
1576 assert_eq!(
1577 decoded2, val,
1578 "StringDict block owned roundtrip should produce identical value"
1579 );
1580 }
1581
1582 #[test]
1583 fn string_dict_prefix_delta_compression() {
1584 let shared_prefix_val = Value::Array(vec![
1587 Value::Str("application_settings_theme".into()),
1588 Value::Str("application_settings_language".into()),
1589 Value::Str("application_settings_timezone".into()),
1590 Value::Str("application_settings_theme".into()),
1592 Value::Str("application_settings_language".into()),
1593 Value::Str("application_settings_timezone".into()),
1594 Value::Str("application_settings_theme".into()),
1595 Value::Str("application_settings_language".into()),
1596 ]);
1597
1598 let mut enc = Encoder::new();
1599 enc.enable_dedup();
1600 enc.encode_value(&shared_prefix_val).unwrap();
1601 let bytes = enc.finish().unwrap();
1602
1603 let mut dec = Decoder::new(&bytes);
1605 let decoded = dec.decode_all_owned().unwrap();
1606 assert_eq!(decoded.len(), 1);
1607 assert_eq!(decoded[0], shared_prefix_val);
1608 }
1609}