1use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12 CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13 match_generator::MatchGeneratorDriver,
14};
15use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
16
17use crate::io::{Read, Write};
18
19pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
39 uncompressed_data: Option<R>,
40 compressed_data: Option<W>,
41 compression_level: CompressionLevel,
42 dictionary: Option<crate::decoding::Dictionary>,
43 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
44 source_size_hint: Option<u64>,
45 state: CompressState<M>,
46 #[cfg(feature = "hash")]
47 hasher: XxHash64,
48}
49
50#[derive(Clone, Default)]
51struct CachedDictionaryEntropy {
52 huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
53 ll_previous: Option<PreviousFseTable>,
54 ml_previous: Option<PreviousFseTable>,
55 of_previous: Option<PreviousFseTable>,
56}
57
58#[derive(Clone)]
59pub(crate) enum PreviousFseTable {
60 Default,
63 Custom(Box<FSETable>),
64}
65
66impl PreviousFseTable {
67 pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> &'a FSETable {
68 match self {
69 Self::Default => default,
70 Self::Custom(table) => table,
71 }
72 }
73}
74
75pub(crate) struct FseTables {
76 pub(crate) ll_default: FSETable,
77 pub(crate) ll_previous: Option<PreviousFseTable>,
78 pub(crate) ml_default: FSETable,
79 pub(crate) ml_previous: Option<PreviousFseTable>,
80 pub(crate) of_default: FSETable,
81 pub(crate) of_previous: Option<PreviousFseTable>,
82}
83
84impl FseTables {
85 pub fn new() -> Self {
86 Self {
87 ll_default: default_ll_table(),
88 ll_previous: None,
89 ml_default: default_ml_table(),
90 ml_previous: None,
91 of_default: default_of_table(),
92 of_previous: None,
93 }
94 }
95}
96
97pub(crate) struct CompressState<M: Matcher> {
98 pub(crate) matcher: M,
99 pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
100 pub(crate) fse_tables: FseTables,
101 pub(crate) offset_hist: [u32; 3],
104}
105
106impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
107 pub fn new(compression_level: CompressionLevel) -> Self {
109 Self {
110 uncompressed_data: None,
111 compressed_data: None,
112 compression_level,
113 dictionary: None,
114 dictionary_entropy_cache: None,
115 source_size_hint: None,
116 state: CompressState {
117 matcher: MatchGeneratorDriver::new(1024 * 128, 1),
118 last_huff_table: None,
119 fse_tables: FseTables::new(),
120 offset_hist: [1, 4, 8],
121 },
122 #[cfg(feature = "hash")]
123 hasher: XxHash64::with_seed(0),
124 }
125 }
126}
127
128impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
129 pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
131 Self {
132 uncompressed_data: None,
133 compressed_data: None,
134 dictionary: None,
135 dictionary_entropy_cache: None,
136 source_size_hint: None,
137 state: CompressState {
138 matcher,
139 last_huff_table: None,
140 fse_tables: FseTables::new(),
141 offset_hist: [1, 4, 8],
142 },
143 compression_level,
144 #[cfg(feature = "hash")]
145 hasher: XxHash64::with_seed(0),
146 }
147 }
148
149 pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
153 self.uncompressed_data.replace(uncompressed_data)
154 }
155
156 pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
160 self.compressed_data.replace(compressed_data)
161 }
162
163 pub fn set_source_size_hint(&mut self, size: u64) {
173 self.source_size_hint = Some(size);
174 }
175
176 pub fn compress(&mut self) {
187 let use_dictionary_state =
188 !matches!(self.compression_level, CompressionLevel::Uncompressed)
189 && self.state.matcher.supports_dictionary_priming();
190 if let Some(size_hint) = self.source_size_hint.take() {
191 self.state.matcher.set_source_size_hint(size_hint);
194 }
195 self.state.matcher.reset(self.compression_level);
197 self.state.offset_hist = [1, 4, 8];
198 let cached_entropy = if use_dictionary_state {
199 self.dictionary_entropy_cache.as_ref()
200 } else {
201 None
202 };
203 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
204 self.state.offset_hist = dict.offset_hist;
207 self.state
208 .matcher
209 .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
210 }
211 if let Some(cache) = cached_entropy {
212 self.state.last_huff_table.clone_from(&cache.huff);
213 } else {
214 self.state.last_huff_table = None;
215 }
216 if let Some(cache) = cached_entropy {
219 self.state
220 .fse_tables
221 .ll_previous
222 .clone_from(&cache.ll_previous);
223 self.state
224 .fse_tables
225 .ml_previous
226 .clone_from(&cache.ml_previous);
227 self.state
228 .fse_tables
229 .of_previous
230 .clone_from(&cache.of_previous);
231 } else {
232 self.state.fse_tables.ll_previous = None;
233 self.state.fse_tables.ml_previous = None;
234 self.state.fse_tables.of_previous = None;
235 }
236 #[cfg(feature = "hash")]
237 {
238 self.hasher = XxHash64::with_seed(0);
239 }
240 let source = self.uncompressed_data.as_mut().unwrap();
241 let drain = self.compressed_data.as_mut().unwrap();
242 let window_size = self.state.matcher.window_size();
243 assert!(
244 window_size != 0,
245 "matcher reported window_size == 0, which is invalid"
246 );
247 let mut all_blocks: Vec<u8> = Vec::with_capacity(1024 * 130);
250 let mut total_uncompressed: u64 = 0;
251 loop {
253 let mut uncompressed_data = self.state.matcher.get_next_space();
255 let mut read_bytes = 0;
256 let last_block;
257 'read_loop: loop {
258 let new_bytes = source.read(&mut uncompressed_data[read_bytes..]).unwrap();
259 if new_bytes == 0 {
260 last_block = true;
261 break 'read_loop;
262 }
263 read_bytes += new_bytes;
264 if read_bytes == uncompressed_data.len() {
265 last_block = false;
266 break 'read_loop;
267 }
268 }
269 uncompressed_data.resize(read_bytes, 0);
270 total_uncompressed += read_bytes as u64;
271 #[cfg(feature = "hash")]
273 self.hasher.write(&uncompressed_data);
274 if uncompressed_data.is_empty() {
276 let header = BlockHeader {
277 last_block: true,
278 block_type: crate::blocks::block::BlockType::Raw,
279 block_size: 0,
280 };
281 header.serialize(&mut all_blocks);
282 break;
283 }
284
285 match self.compression_level {
286 CompressionLevel::Uncompressed => {
287 let header = BlockHeader {
288 last_block,
289 block_type: crate::blocks::block::BlockType::Raw,
290 block_size: read_bytes.try_into().unwrap(),
291 };
292 header.serialize(&mut all_blocks);
293 all_blocks.extend_from_slice(&uncompressed_data);
294 }
295 CompressionLevel::Fastest
296 | CompressionLevel::Default
297 | CompressionLevel::Better
298 | CompressionLevel::Best
299 | CompressionLevel::Level(_) => compress_block_encoded(
300 &mut self.state,
301 last_block,
302 uncompressed_data,
303 &mut all_blocks,
304 ),
305 }
306 if last_block {
307 break;
308 }
309 }
310
311 let header = FrameHeader {
317 frame_content_size: Some(total_uncompressed),
318 single_segment: false,
319 content_checksum: cfg!(feature = "hash"),
320 dictionary_id: if use_dictionary_state {
321 self.dictionary.as_ref().map(|dict| dict.id as u64)
322 } else {
323 None
324 },
325 window_size: Some(window_size),
326 };
327 let mut header_buf: Vec<u8> = Vec::with_capacity(14);
330 header.serialize(&mut header_buf);
331 drain.write_all(&header_buf).unwrap();
332 drain.write_all(&all_blocks).unwrap();
333
334 #[cfg(feature = "hash")]
337 {
338 let content_checksum = self.hasher.finish();
341 drain
342 .write_all(&(content_checksum as u32).to_le_bytes())
343 .unwrap();
344 }
345 }
346
347 pub fn source_mut(&mut self) -> Option<&mut R> {
349 self.uncompressed_data.as_mut()
350 }
351
352 pub fn drain_mut(&mut self) -> Option<&mut W> {
354 self.compressed_data.as_mut()
355 }
356
357 pub fn source(&self) -> Option<&R> {
359 self.uncompressed_data.as_ref()
360 }
361
362 pub fn drain(&self) -> Option<&W> {
364 self.compressed_data.as_ref()
365 }
366
367 pub fn take_source(&mut self) -> Option<R> {
369 self.uncompressed_data.take()
370 }
371
372 pub fn take_drain(&mut self) -> Option<W> {
374 self.compressed_data.take()
375 }
376
377 pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
379 core::mem::swap(&mut match_generator, &mut self.state.matcher);
380 match_generator
381 }
382
383 pub fn set_compression_level(
385 &mut self,
386 compression_level: CompressionLevel,
387 ) -> CompressionLevel {
388 let old = self.compression_level;
389 self.compression_level = compression_level;
390 old
391 }
392
393 pub fn compression_level(&self) -> CompressionLevel {
395 self.compression_level
396 }
397
398 pub fn set_dictionary(
405 &mut self,
406 dictionary: crate::decoding::Dictionary,
407 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
408 {
409 if dictionary.id == 0 {
410 return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
411 }
412 if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
413 return Err(
414 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
415 index: index as u8,
416 },
417 );
418 }
419 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
420 huff: dictionary.huf.table.to_encoder_table(),
421 ll_previous: dictionary
422 .fse
423 .literal_lengths
424 .to_encoder_table()
425 .map(|table| PreviousFseTable::Custom(Box::new(table))),
426 ml_previous: dictionary
427 .fse
428 .match_lengths
429 .to_encoder_table()
430 .map(|table| PreviousFseTable::Custom(Box::new(table))),
431 of_previous: dictionary
432 .fse
433 .offsets
434 .to_encoder_table()
435 .map(|table| PreviousFseTable::Custom(Box::new(table))),
436 });
437 Ok(self.dictionary.replace(dictionary))
438 }
439
440 pub fn set_dictionary_from_bytes(
442 &mut self,
443 raw_dictionary: &[u8],
444 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
445 {
446 let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
447 self.set_dictionary(dictionary)
448 }
449
450 pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
452 self.dictionary_entropy_cache = None;
453 self.dictionary.take()
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 #[cfg(all(feature = "dict_builder", feature = "std"))]
460 use alloc::format;
461 use alloc::vec;
462
463 use super::FrameCompressor;
464 use crate::common::MAGIC_NUM;
465 use crate::decoding::FrameDecoder;
466 use crate::encoding::{Matcher, Sequence};
467 use alloc::vec::Vec;
468
469 #[cfg(feature = "std")]
471 #[test]
472 fn fcs_header_written_and_c_zstd_compatible() {
473 let levels = [
474 crate::encoding::CompressionLevel::Uncompressed,
475 crate::encoding::CompressionLevel::Fastest,
476 crate::encoding::CompressionLevel::Default,
477 crate::encoding::CompressionLevel::Better,
478 crate::encoding::CompressionLevel::Best,
479 ];
480 let fcs_2byte = vec![0xCDu8; 300]; let large = vec![0xABu8; 100_000];
482 let inputs: [&[u8]; 5] = [
483 &[],
484 &[0x00],
485 b"abcdefghijklmnopqrstuvwxy\n",
486 &fcs_2byte,
487 &large,
488 ];
489 for level in levels {
490 for data in &inputs {
491 let compressed = crate::encoding::compress_to_vec(*data, level);
492 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
494 .unwrap()
495 .0;
496 assert_eq!(
497 header.frame_content_size(),
498 data.len() as u64,
499 "FCS mismatch for len={} level={:?}",
500 data.len(),
501 level,
502 );
503 assert_ne!(
506 header.descriptor.frame_content_size_bytes().unwrap(),
507 0,
508 "FCS field must be present for len={} level={:?}",
509 data.len(),
510 level,
511 );
512 let mut decoded = Vec::new();
514 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
515 assert_eq!(
516 decoded.as_slice(),
517 *data,
518 "C zstd roundtrip failed for len={}",
519 data.len()
520 );
521 }
522 }
523 }
524
525 #[cfg(feature = "std")]
526 #[test]
527 fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
528 let data = vec![0xAB; 2047];
529 let compressed = {
530 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
531 compressor.set_source_size_hint(data.len() as u64);
532 compressor.set_source(data.as_slice());
533 let mut out = Vec::new();
534 compressor.set_drain(&mut out);
535 compressor.compress();
536 out
537 };
538
539 let mut decoded = Vec::new();
540 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
541 assert_eq!(decoded, data);
542 }
543
544 #[cfg(feature = "std")]
545 #[test]
546 fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
547 fn generate_data(seed: u64, len: usize) -> Vec<u8> {
548 let mut state = seed;
549 let mut data = Vec::with_capacity(len);
550 for _ in 0..len {
551 state = state
552 .wrapping_mul(6364136223846793005)
553 .wrapping_add(1442695040888963407);
554 data.push((state >> 33) as u8);
555 }
556 data
557 }
558
559 let levels = [
560 super::CompressionLevel::Fastest,
561 super::CompressionLevel::Default,
562 super::CompressionLevel::Better,
563 super::CompressionLevel::Best,
564 super::CompressionLevel::Level(-1),
565 super::CompressionLevel::Level(2),
566 super::CompressionLevel::Level(3),
567 super::CompressionLevel::Level(4),
568 super::CompressionLevel::Level(11),
569 ];
570 let sizes = [513usize, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191];
571
572 for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
573 for &size in &sizes {
574 let data = generate_data(seed + seed_idx as u64, size);
575 for &level in &levels {
576 let compressed = {
577 let mut compressor = FrameCompressor::new(level);
578 compressor.set_source_size_hint(data.len() as u64);
579 compressor.set_source(data.as_slice());
580 let mut out = Vec::new();
581 compressor.set_drain(&mut out);
582 compressor.compress();
583 out
584 };
585
586 let mut decoded = Vec::new();
587 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
588 |e| {
589 panic!(
590 "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
591 seed + seed_idx as u64
592 )
593 },
594 );
595 assert_eq!(
596 decoded,
597 data,
598 "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
599 seed + seed_idx as u64
600 );
601 }
602 }
603 }
604 }
605
606 struct NoDictionaryMatcher {
607 last_space: Vec<u8>,
608 window_size: u64,
609 }
610
611 impl NoDictionaryMatcher {
612 fn new(window_size: u64) -> Self {
613 Self {
614 last_space: Vec::new(),
615 window_size,
616 }
617 }
618 }
619
620 impl Matcher for NoDictionaryMatcher {
621 fn get_next_space(&mut self) -> Vec<u8> {
622 vec![0; self.window_size as usize]
623 }
624
625 fn get_last_space(&mut self) -> &[u8] {
626 self.last_space.as_slice()
627 }
628
629 fn commit_space(&mut self, space: Vec<u8>) {
630 self.last_space = space;
631 }
632
633 fn skip_matching(&mut self) {}
634
635 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
636 handle_sequence(Sequence::Literals {
637 literals: self.last_space.as_slice(),
638 });
639 }
640
641 fn reset(&mut self, _level: super::CompressionLevel) {
642 self.last_space.clear();
643 }
644
645 fn window_size(&self) -> u64 {
646 self.window_size
647 }
648 }
649
650 #[test]
651 fn frame_starts_with_magic_num() {
652 let mock_data = [1_u8, 2, 3].as_slice();
653 let mut output: Vec<u8> = Vec::new();
654 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
655 compressor.set_source(mock_data);
656 compressor.set_drain(&mut output);
657
658 compressor.compress();
659 assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
660 }
661
662 #[test]
663 fn very_simple_raw_compress() {
664 let mock_data = [1_u8, 2, 3].as_slice();
665 let mut output: Vec<u8> = Vec::new();
666 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
667 compressor.set_source(mock_data);
668 compressor.set_drain(&mut output);
669
670 compressor.compress();
671 }
672
673 #[test]
674 fn very_simple_compress() {
675 let mut mock_data = vec![0; 1 << 17];
676 mock_data.extend(vec![1; (1 << 17) - 1]);
677 mock_data.extend(vec![2; (1 << 18) - 1]);
678 mock_data.extend(vec![2; 1 << 17]);
679 mock_data.extend(vec![3; (1 << 17) - 1]);
680 let mut output: Vec<u8> = Vec::new();
681 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
682 compressor.set_source(mock_data.as_slice());
683 compressor.set_drain(&mut output);
684
685 compressor.compress();
686
687 let mut decoder = FrameDecoder::new();
688 let mut decoded = Vec::with_capacity(mock_data.len());
689 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
690 assert_eq!(mock_data, decoded);
691
692 let mut decoded = Vec::new();
693 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
694 assert_eq!(mock_data, decoded);
695 }
696
697 #[test]
698 fn rle_compress() {
699 let mock_data = vec![0; 1 << 19];
700 let mut output: Vec<u8> = Vec::new();
701 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
702 compressor.set_source(mock_data.as_slice());
703 compressor.set_drain(&mut output);
704
705 compressor.compress();
706
707 let mut decoder = FrameDecoder::new();
708 let mut decoded = Vec::with_capacity(mock_data.len());
709 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
710 assert_eq!(mock_data, decoded);
711 }
712
713 #[test]
714 fn aaa_compress() {
715 let mock_data = vec![0, 1, 3, 4, 5];
716 let mut output: Vec<u8> = Vec::new();
717 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
718 compressor.set_source(mock_data.as_slice());
719 compressor.set_drain(&mut output);
720
721 compressor.compress();
722
723 let mut decoder = FrameDecoder::new();
724 let mut decoded = Vec::with_capacity(mock_data.len());
725 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
726 assert_eq!(mock_data, decoded);
727
728 let mut decoded = Vec::new();
729 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
730 assert_eq!(mock_data, decoded);
731 }
732
733 #[test]
734 fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
735 let dict_raw = include_bytes!("../../dict_tests/dictionary");
736 let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
737 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
738
739 let mut data = Vec::new();
740 for _ in 0..8 {
741 data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
742 }
743
744 let mut with_dict = Vec::new();
745 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
746 let previous = compressor
747 .set_dictionary_from_bytes(dict_raw)
748 .expect("dictionary bytes should parse");
749 assert!(
750 previous.is_none(),
751 "first dictionary insert should return None"
752 );
753 assert_eq!(
754 compressor
755 .set_dictionary(dict_for_encoder)
756 .expect("valid dictionary should attach")
757 .expect("set_dictionary_from_bytes inserted previous dictionary")
758 .id,
759 dict_for_decoder.id
760 );
761 compressor.set_source(data.as_slice());
762 compressor.set_drain(&mut with_dict);
763 compressor.compress();
764
765 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
766 .expect("encoded stream should have a frame header");
767 assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
768
769 let mut decoder = FrameDecoder::new();
770 let mut missing_dict_target = Vec::with_capacity(data.len());
771 let err = decoder
772 .decode_all_to_vec(&with_dict, &mut missing_dict_target)
773 .unwrap_err();
774 assert!(
775 matches!(
776 &err,
777 crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
778 ),
779 "dict-compressed stream should require dictionary id, got: {err:?}"
780 );
781
782 let mut decoder = FrameDecoder::new();
783 decoder.add_dict(dict_for_decoder).unwrap();
784 let mut decoded = Vec::with_capacity(data.len());
785 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
786 assert_eq!(decoded, data);
787
788 let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
789 let mut ffi_decoded = Vec::with_capacity(data.len());
790 let ffi_written = ffi_decoder
791 .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
792 .unwrap();
793 assert_eq!(ffi_written, data.len());
794 assert_eq!(ffi_decoded, data);
795 }
796
797 #[cfg(all(feature = "dict_builder", feature = "std"))]
798 #[test]
799 fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
800 use std::io::Cursor;
801
802 let mut training = Vec::new();
803 for idx in 0..256u32 {
804 training.extend_from_slice(
805 format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
806 );
807 }
808 let mut raw_dict = Vec::new();
809 crate::dictionary::create_raw_dict_from_source(
810 Cursor::new(training.as_slice()),
811 training.len(),
812 &mut raw_dict,
813 4096,
814 )
815 .expect("dict_builder training should succeed");
816 assert!(
817 !raw_dict.is_empty(),
818 "dict_builder produced an empty dictionary"
819 );
820
821 let dict_id = 0xD1C7_0008;
822 let encoder_dict =
823 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
824 let decoder_dict =
825 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
826
827 let mut payload = Vec::new();
828 for idx in 0..96u32 {
829 payload.extend_from_slice(
830 format!(
831 "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
832 )
833 .as_bytes(),
834 );
835 }
836
837 let mut without_dict = Vec::new();
838 let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
839 baseline.set_source(payload.as_slice());
840 baseline.set_drain(&mut without_dict);
841 baseline.compress();
842
843 let mut with_dict = Vec::new();
844 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
845 compressor
846 .set_dictionary(encoder_dict)
847 .expect("valid dict_builder dictionary should attach");
848 compressor.set_source(payload.as_slice());
849 compressor.set_drain(&mut with_dict);
850 compressor.compress();
851
852 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
853 .expect("encoded stream should have a frame header");
854 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
855 let mut decoder = FrameDecoder::new();
856 decoder.add_dict(decoder_dict).unwrap();
857 let mut decoded = Vec::with_capacity(payload.len());
858 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
859 assert_eq!(decoded, payload);
860 assert!(
861 with_dict.len() < without_dict.len(),
862 "trained dictionary should improve compression for this small payload"
863 );
864 }
865
866 #[test]
867 fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
868 let dict_raw = include_bytes!("../../dict_tests/dictionary");
869 let mut output = Vec::new();
870 let input = b"";
871
872 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
873 let previous = compressor
874 .set_dictionary_from_bytes(dict_raw)
875 .expect("dictionary bytes should parse");
876 assert!(previous.is_none());
877
878 compressor.set_source(input.as_slice());
879 compressor.set_drain(&mut output);
880 compressor.compress();
881
882 assert!(
883 compressor.state.last_huff_table.is_some(),
884 "dictionary entropy should seed previous huffman table before first block"
885 );
886 assert!(
887 compressor.state.fse_tables.ll_previous.is_some(),
888 "dictionary entropy should seed previous ll table before first block"
889 );
890 assert!(
891 compressor.state.fse_tables.ml_previous.is_some(),
892 "dictionary entropy should seed previous ml table before first block"
893 );
894 assert!(
895 compressor.state.fse_tables.of_previous.is_some(),
896 "dictionary entropy should seed previous of table before first block"
897 );
898 }
899
900 #[test]
901 fn set_dictionary_rejects_zero_dictionary_id() {
902 let invalid = crate::decoding::Dictionary {
903 id: 0,
904 fse: crate::decoding::scratch::FSEScratch::new(),
905 huf: crate::decoding::scratch::HuffmanScratch::new(),
906 dict_content: vec![1, 2, 3],
907 offset_hist: [1, 4, 8],
908 };
909
910 let mut compressor: FrameCompressor<
911 &[u8],
912 Vec<u8>,
913 crate::encoding::match_generator::MatchGeneratorDriver,
914 > = FrameCompressor::new(super::CompressionLevel::Fastest);
915 let result = compressor.set_dictionary(invalid);
916 assert!(matches!(
917 result,
918 Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
919 ));
920 }
921
922 #[test]
923 fn set_dictionary_rejects_zero_repeat_offsets() {
924 let invalid = crate::decoding::Dictionary {
925 id: 1,
926 fse: crate::decoding::scratch::FSEScratch::new(),
927 huf: crate::decoding::scratch::HuffmanScratch::new(),
928 dict_content: vec![1, 2, 3],
929 offset_hist: [0, 4, 8],
930 };
931
932 let mut compressor: FrameCompressor<
933 &[u8],
934 Vec<u8>,
935 crate::encoding::match_generator::MatchGeneratorDriver,
936 > = FrameCompressor::new(super::CompressionLevel::Fastest);
937 let result = compressor.set_dictionary(invalid);
938 assert!(matches!(
939 result,
940 Err(
941 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
942 index: 0
943 }
944 )
945 ));
946 }
947
948 #[test]
949 fn uncompressed_mode_does_not_require_dictionary() {
950 let dict_id = 0xABCD_0001;
951 let dict =
952 crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
953 .expect("raw dictionary should be valid");
954
955 let payload = b"plain-bytes-that-should-stay-raw";
956 let mut output = Vec::new();
957 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
958 compressor
959 .set_dictionary(dict)
960 .expect("dictionary should attach in uncompressed mode");
961 compressor.set_source(payload.as_slice());
962 compressor.set_drain(&mut output);
963 compressor.compress();
964
965 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
966 .expect("encoded frame should have a header");
967 assert_eq!(
968 frame_header.dictionary_id(),
969 None,
970 "raw/uncompressed frames must not advertise dictionary dependency"
971 );
972
973 let mut decoder = FrameDecoder::new();
974 let mut decoded = Vec::with_capacity(payload.len());
975 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
976 assert_eq!(decoded, payload);
977 }
978
979 #[test]
980 fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
981 use crate::encoding::match_generator::MatchGeneratorDriver;
982
983 let dict_id = 0xABCD_0002;
984 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
985 .expect("raw dictionary should be valid");
986 let dict_for_decoder =
987 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
988 .expect("raw dictionary should be valid");
989
990 let payload = b"abcdefgh".repeat(128 * 1024 / 8 + 64);
993 let matcher = MatchGeneratorDriver::new(1024, 1);
994
995 let mut no_dict_output = Vec::new();
996 let mut no_dict_compressor =
997 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
998 no_dict_compressor.set_source(payload.as_slice());
999 no_dict_compressor.set_drain(&mut no_dict_output);
1000 no_dict_compressor.compress();
1001 let (no_dict_frame_header, _) =
1002 crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
1003 .expect("baseline frame should have a header");
1004 let no_dict_window = no_dict_frame_header
1005 .window_size()
1006 .expect("window size should be present");
1007
1008 let mut output = Vec::new();
1009 let matcher = MatchGeneratorDriver::new(1024, 1);
1010 let mut compressor =
1011 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1012 compressor
1013 .set_dictionary(dict)
1014 .expect("dictionary should attach");
1015 compressor.set_source(payload.as_slice());
1016 compressor.set_drain(&mut output);
1017 compressor.compress();
1018
1019 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1020 .expect("encoded frame should have a header");
1021 let advertised_window = frame_header
1022 .window_size()
1023 .expect("window size should be present");
1024 assert_eq!(
1025 advertised_window, no_dict_window,
1026 "dictionary priming must not inflate advertised window size"
1027 );
1028 assert!(
1029 payload.len() > advertised_window as usize,
1030 "test must cross the advertised window boundary"
1031 );
1032
1033 let mut decoder = FrameDecoder::new();
1034 decoder.add_dict(dict_for_decoder).unwrap();
1035 let mut decoded = Vec::with_capacity(payload.len());
1036 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1037 assert_eq!(decoded, payload);
1038 }
1039
1040 #[test]
1041 fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
1042 let dict_id = 0xABCD_0004;
1043 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1045 let dict_for_decoder =
1046 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1047 let payload = b"abcdabcdabcdabcd".repeat(128);
1048
1049 let mut hinted_output = Vec::new();
1050 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1051 hinted.set_dictionary(dict).unwrap();
1052 hinted.set_source_size_hint(1);
1053 hinted.set_source(payload.as_slice());
1054 hinted.set_drain(&mut hinted_output);
1055 hinted.compress();
1056
1057 let mut no_hint_output = Vec::new();
1058 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1059 no_hint
1060 .set_dictionary(
1061 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1062 .unwrap(),
1063 )
1064 .unwrap();
1065 no_hint.set_source(payload.as_slice());
1066 no_hint.set_drain(&mut no_hint_output);
1067 no_hint.compress();
1068
1069 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1070 .expect("encoded frame should have a header")
1071 .0
1072 .window_size()
1073 .expect("window size should be present");
1074 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1075 .expect("encoded frame should have a header")
1076 .0
1077 .window_size()
1078 .expect("window size should be present");
1079 assert!(
1080 hinted_window <= no_hint_window,
1081 "source-size hint should not increase advertised window with dictionary priming",
1082 );
1083
1084 let mut decoder = FrameDecoder::new();
1085 decoder.add_dict(dict_for_decoder).unwrap();
1086 let mut decoded = Vec::with_capacity(payload.len());
1087 decoder
1088 .decode_all_to_vec(&hinted_output, &mut decoded)
1089 .unwrap();
1090 assert_eq!(decoded, payload);
1091 }
1092
1093 #[test]
1094 fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
1095 let dict_id = 0xABCD_0005;
1096 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1098 let dict_for_decoder =
1099 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1100 let payload = b"abcd".repeat(1024); let payload_len = payload.len() as u64;
1102
1103 let mut hinted_output = Vec::new();
1104 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1105 hinted.set_dictionary(dict).unwrap();
1106 hinted.set_source_size_hint(payload_len);
1107 hinted.set_source(payload.as_slice());
1108 hinted.set_drain(&mut hinted_output);
1109 hinted.compress();
1110
1111 let mut no_hint_output = Vec::new();
1112 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1113 no_hint
1114 .set_dictionary(
1115 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1116 .unwrap(),
1117 )
1118 .unwrap();
1119 no_hint.set_source(payload.as_slice());
1120 no_hint.set_drain(&mut no_hint_output);
1121 no_hint.compress();
1122
1123 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1124 .expect("encoded frame should have a header")
1125 .0
1126 .window_size()
1127 .expect("window size should be present");
1128 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1129 .expect("encoded frame should have a header")
1130 .0
1131 .window_size()
1132 .expect("window size should be present");
1133 assert!(
1134 hinted_window <= no_hint_window,
1135 "source-size hint should not increase advertised window with dictionary priming",
1136 );
1137
1138 let mut decoder = FrameDecoder::new();
1139 decoder.add_dict(dict_for_decoder).unwrap();
1140 let mut decoded = Vec::with_capacity(payload.len());
1141 decoder
1142 .decode_all_to_vec(&hinted_output, &mut decoded)
1143 .unwrap();
1144 assert_eq!(decoded, payload);
1145 }
1146
1147 #[test]
1148 fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
1149 let dict_id = 0xABCD_0003;
1150 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1151 .expect("raw dictionary should be valid");
1152 let payload = b"abcdefghabcdefgh";
1153
1154 let mut output = Vec::new();
1155 let matcher = NoDictionaryMatcher::new(64);
1156 let mut compressor =
1157 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1158 compressor
1159 .set_dictionary(dict)
1160 .expect("dictionary should attach");
1161 compressor.set_source(payload.as_slice());
1162 compressor.set_drain(&mut output);
1163 compressor.compress();
1164
1165 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1166 .expect("encoded frame should have a header");
1167 assert_eq!(
1168 frame_header.dictionary_id(),
1169 None,
1170 "matchers that do not support dictionary priming must not advertise dictionary dependency"
1171 );
1172
1173 let mut decoder = FrameDecoder::new();
1174 let mut decoded = Vec::with_capacity(payload.len());
1175 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1176 assert_eq!(decoded, payload);
1177 }
1178
1179 #[cfg(feature = "hash")]
1180 #[test]
1181 fn checksum_two_frames_reused_compressor() {
1182 let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
1188
1189 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1190
1191 let mut compressed1 = Vec::new();
1193 compressor.set_source(data.as_slice());
1194 compressor.set_drain(&mut compressed1);
1195 compressor.compress();
1196
1197 let mut compressed2 = Vec::new();
1199 compressor.set_source(data.as_slice());
1200 compressor.set_drain(&mut compressed2);
1201 compressor.compress();
1202
1203 fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
1204 let mut decoder = FrameDecoder::new();
1205 let mut source = compressed;
1206 decoder.reset(&mut source).unwrap();
1207 while !decoder.is_finished() {
1208 decoder
1209 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
1210 .unwrap();
1211 }
1212 let mut decoded = Vec::new();
1213 decoder.collect_to_writer(&mut decoded).unwrap();
1214 (
1215 decoded,
1216 decoder.get_checksum_from_data(),
1217 decoder.get_calculated_checksum(),
1218 )
1219 }
1220
1221 let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
1222 assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
1223 assert_eq!(
1224 chksum_from_data1, chksum_calculated1,
1225 "frame 1: checksum mismatch"
1226 );
1227
1228 let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
1229 assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
1230 assert_eq!(
1231 chksum_from_data2, chksum_calculated2,
1232 "frame 2: checksum mismatch"
1233 );
1234
1235 assert_eq!(
1238 chksum_from_data1, chksum_from_data2,
1239 "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
1240 );
1241 }
1242
1243 #[cfg(feature = "std")]
1244 #[test]
1245 fn fuzz_targets() {
1246 use std::io::Read;
1247 fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
1248 let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
1249 let mut result: Vec<u8> = Vec::new();
1250 decoder.read_to_end(&mut result).expect("Decoding failed");
1251 result
1252 }
1253
1254 fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
1255 let mut decoder = crate::decoding::FrameDecoder::new();
1256 decoder.reset(&mut data).unwrap();
1257 let mut result = vec![];
1258 while !decoder.is_finished() || decoder.can_collect() > 0 {
1259 decoder
1260 .decode_blocks(
1261 &mut data,
1262 crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
1263 )
1264 .unwrap();
1265 decoder.collect_to_writer(&mut result).unwrap();
1266 }
1267 result
1268 }
1269
1270 fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1271 zstd::stream::encode_all(std::io::Cursor::new(data), 3)
1272 }
1273
1274 fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1275 let mut input = Vec::new();
1276 data.read_to_end(&mut input).unwrap();
1277
1278 crate::encoding::compress_to_vec(
1279 input.as_slice(),
1280 crate::encoding::CompressionLevel::Uncompressed,
1281 )
1282 }
1283
1284 fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1285 let mut input = Vec::new();
1286 data.read_to_end(&mut input).unwrap();
1287
1288 crate::encoding::compress_to_vec(
1289 input.as_slice(),
1290 crate::encoding::CompressionLevel::Fastest,
1291 )
1292 }
1293
1294 fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1295 let mut output = Vec::new();
1296 zstd::stream::copy_decode(data, &mut output)?;
1297 Ok(output)
1298 }
1299 if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
1300 for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
1301 if file.as_ref().unwrap().file_type().unwrap().is_file() {
1302 let data = std::fs::read(file.unwrap().path()).unwrap();
1303 let data = data.as_slice();
1304 let compressed = encode_zstd(data).unwrap();
1306 let decoded = decode_szstd(&mut compressed.as_slice());
1307 let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
1308 assert!(
1309 decoded == data,
1310 "Decoded data did not match the original input during decompression"
1311 );
1312 assert_eq!(
1313 decoded2, data,
1314 "Decoded data did not match the original input during decompression"
1315 );
1316
1317 let mut input = data;
1320 let compressed = encode_szstd_uncompressed(&mut input);
1321 let decoded = decode_zstd(&compressed).unwrap();
1322 assert_eq!(
1323 decoded, data,
1324 "Decoded data did not match the original input during compression"
1325 );
1326 let mut input = data;
1328 let compressed = encode_szstd_compressed(&mut input);
1329 let decoded = decode_zstd(&compressed).unwrap();
1330 assert_eq!(
1331 decoded, data,
1332 "Decoded data did not match the original input during compression"
1333 );
1334 }
1335 }
1336 }
1337 }
1338}