1use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12 CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13 match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
40 uncompressed_data: Option<R>,
41 compressed_data: Option<W>,
42 compression_level: CompressionLevel,
43 dictionary: Option<crate::decoding::Dictionary>,
44 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
45 source_size_hint: Option<u64>,
46 state: CompressState<M>,
47 #[cfg(feature = "hash")]
48 hasher: XxHash64,
49}
50
51#[derive(Clone, Default)]
52struct CachedDictionaryEntropy {
53 huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
54 ll_previous: Option<PreviousFseTable>,
55 ml_previous: Option<PreviousFseTable>,
56 of_previous: Option<PreviousFseTable>,
57}
58
59#[derive(Clone)]
60pub(crate) enum PreviousFseTable {
61 Default,
64 Custom(Box<FSETable>),
65 Rle(u8),
66}
67
68impl PreviousFseTable {
69 pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
70 match self {
71 Self::Default => Some(default),
72 Self::Custom(table) => Some(table),
73 Self::Rle(_) => None,
74 }
75 }
76}
77
78pub(crate) struct FseTables {
79 pub(crate) ll_default: FSETable,
80 pub(crate) ll_previous: Option<PreviousFseTable>,
81 pub(crate) ml_default: FSETable,
82 pub(crate) ml_previous: Option<PreviousFseTable>,
83 pub(crate) of_default: FSETable,
84 pub(crate) of_previous: Option<PreviousFseTable>,
85}
86
87impl FseTables {
88 pub fn new() -> Self {
89 Self {
90 ll_default: default_ll_table(),
91 ll_previous: None,
92 ml_default: default_ml_table(),
93 ml_previous: None,
94 of_default: default_of_table(),
95 of_previous: None,
96 }
97 }
98}
99
100const PRESPLIT_BLOCK_MIN: usize = 3500;
101const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
102const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
103const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
104const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
105const PRESPLIT_HASH_LOG_MAX: usize = 10;
106const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
107const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
108
109#[derive(Clone)]
110struct PreSplitFingerprint {
111 events: [u32; PRESPLIT_HASH_TABLE_SIZE],
112 nb_events: usize,
113}
114
115impl Default for PreSplitFingerprint {
116 fn default() -> Self {
117 Self {
118 events: [0; PRESPLIT_HASH_TABLE_SIZE],
119 nb_events: 0,
120 }
121 }
122}
123
124fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
125 debug_assert!(hash_log >= 8);
126 if hash_log == 8 {
127 return bytes[0] as usize;
128 }
129 debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
130 let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
131 (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
132}
133
134fn presplit_record_fingerprint(
135 fp: &mut PreSplitFingerprint,
136 src: &[u8],
137 sampling_rate: usize,
138 hash_log: usize,
139) {
140 fp.events.fill(0);
141 fp.nb_events = 0;
142 if src.len() < 2 {
143 return;
144 }
145 let limit = src.len() - 1;
146 let mut n = 0usize;
147 while n < limit {
148 fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
149 n += sampling_rate;
150 }
151 fp.nb_events += limit / sampling_rate;
154}
155
156fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
157 let slots = 1usize << hash_log;
158 let mut distance = 0u64;
159 for idx in 0..slots {
160 let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
161 let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
162 distance = distance.saturating_add(left.abs_diff(right) as u64);
163 }
164 distance
165}
166
167fn presplit_fingerprints_differ(
168 reference: &PreSplitFingerprint,
169 new_fp: &PreSplitFingerprint,
170 penalty: i32,
171 hash_log: usize,
172) -> bool {
173 debug_assert!(reference.nb_events > 0);
174 debug_assert!(new_fp.nb_events > 0);
175 let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
176 let deviation = presplit_distance(reference, new_fp, hash_log);
177 let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
178 / PRESPLIT_THRESHOLD_PENALTY_RATE;
179 deviation >= threshold
180}
181
182fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
183 for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
184 acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
185 }
186 acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
187}
188
189fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
190 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
191 debug_assert!((1..=4).contains(&level));
192 let (sampling_rate, hash_log) = match level - 1 {
193 0 => (43, 8),
194 1 => (11, 9),
195 2 => (5, 10),
196 _ => (1, 10),
197 };
198
199 let mut past = PreSplitFingerprint::default();
200 let mut new_events = PreSplitFingerprint::default();
201 let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
202 presplit_record_fingerprint(
203 &mut past,
204 &block[..PRESPLIT_CHUNK_SIZE],
205 sampling_rate,
206 hash_log,
207 );
208 let mut pos = PRESPLIT_CHUNK_SIZE;
209 while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
210 presplit_record_fingerprint(
211 &mut new_events,
212 &block[pos..pos + PRESPLIT_CHUNK_SIZE],
213 sampling_rate,
214 hash_log,
215 );
216 if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
217 return pos;
218 }
219 presplit_merge_events(&mut past, &new_events);
220 if penalty > 0 {
221 penalty -= 1;
222 }
223 pos += PRESPLIT_CHUNK_SIZE;
224 }
225 block.len()
226}
227
228fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
229 match level {
230 CompressionLevel::Level(16..=22) => Some(4),
232 _ => None,
233 }
234}
235
236pub(crate) fn donor_optimal_block_size(
237 level: CompressionLevel,
238 block: &[u8],
239 remaining_src_size: usize,
240 block_size_max: usize,
241 savings: i64,
242) -> usize {
243 let Some(split_level) = donor_pre_split_level(level) else {
244 return remaining_src_size.min(block_size_max);
245 };
246 if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
247 return remaining_src_size.min(block_size_max);
248 }
249 if savings < 3 {
250 return MAX_BLOCK_SIZE as usize;
251 }
252 if block.len() < MAX_BLOCK_SIZE as usize {
253 return remaining_src_size.min(block_size_max);
254 }
255 donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
256 .max(PRESPLIT_BLOCK_MIN)
257 .min(MAX_BLOCK_SIZE as usize)
258}
259
260pub(crate) struct CompressState<M: Matcher> {
261 pub(crate) matcher: M,
262 pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
263 pub(crate) fse_tables: FseTables,
264 pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
265 pub(crate) offset_hist: [u32; 3],
268}
269
270impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
271 pub fn new(compression_level: CompressionLevel) -> Self {
273 Self {
274 uncompressed_data: None,
275 compressed_data: None,
276 compression_level,
277 dictionary: None,
278 dictionary_entropy_cache: None,
279 source_size_hint: None,
280 state: CompressState {
281 matcher: MatchGeneratorDriver::new(1024 * 128, 1),
282 last_huff_table: None,
283 fse_tables: FseTables::new(),
284 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
285 offset_hist: [1, 4, 8],
286 },
287 #[cfg(feature = "hash")]
288 hasher: XxHash64::with_seed(0),
289 }
290 }
291}
292
293impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
294 pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
296 Self {
297 uncompressed_data: None,
298 compressed_data: None,
299 dictionary: None,
300 dictionary_entropy_cache: None,
301 source_size_hint: None,
302 state: CompressState {
303 matcher,
304 last_huff_table: None,
305 fse_tables: FseTables::new(),
306 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
307 offset_hist: [1, 4, 8],
308 },
309 compression_level,
310 #[cfg(feature = "hash")]
311 hasher: XxHash64::with_seed(0),
312 }
313 }
314
315 pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
319 self.uncompressed_data.replace(uncompressed_data)
320 }
321
322 pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
326 self.compressed_data.replace(compressed_data)
327 }
328
329 pub fn set_source_size_hint(&mut self, size: u64) {
339 self.source_size_hint = Some(size);
340 }
341
342 pub fn compress(&mut self) {
353 let source_size_hint_known = self.source_size_hint.is_some();
354 let use_dictionary_state =
355 !matches!(self.compression_level, CompressionLevel::Uncompressed)
356 && self.state.matcher.supports_dictionary_priming()
357 && self.dictionary.is_some();
358 if let Some(size_hint) = self.source_size_hint.take() {
359 self.state.matcher.set_source_size_hint(size_hint);
362 }
363 self.state.matcher.reset(self.compression_level);
365 self.state.offset_hist = [1, 4, 8];
366 let cached_entropy = if use_dictionary_state {
367 self.dictionary_entropy_cache.as_ref()
368 } else {
369 None
370 };
371 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
372 self.state.offset_hist = dict.offset_hist;
375 self.state
376 .matcher
377 .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
378 }
379 if let Some(cache) = cached_entropy {
380 self.state.last_huff_table.clone_from(&cache.huff);
381 } else {
382 self.state.last_huff_table = None;
383 }
384 if let Some(cache) = cached_entropy {
387 self.state
388 .fse_tables
389 .ll_previous
390 .clone_from(&cache.ll_previous);
391 self.state
392 .fse_tables
393 .ml_previous
394 .clone_from(&cache.ml_previous);
395 self.state
396 .fse_tables
397 .of_previous
398 .clone_from(&cache.of_previous);
399 } else {
400 self.state.fse_tables.ll_previous = None;
401 self.state.fse_tables.ml_previous = None;
402 self.state.fse_tables.of_previous = None;
403 }
404 let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
405 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
406 _ => None,
407 });
408 let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
409 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
410 _ => None,
411 });
412 let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
413 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
414 _ => None,
415 });
416 self.state.matcher.seed_dictionary_entropy(
417 self.state.last_huff_table.as_ref(),
418 ll_entropy,
419 ml_entropy,
420 of_entropy,
421 );
422 #[cfg(feature = "hash")]
423 {
424 self.hasher = XxHash64::with_seed(0);
425 }
426 let source = self.uncompressed_data.as_mut().unwrap();
427 let drain = self.compressed_data.as_mut().unwrap();
428 let window_size = self.state.matcher.window_size();
429 assert!(
430 window_size != 0,
431 "matcher reported window_size == 0, which is invalid"
432 );
433 let mut all_blocks: Vec<u8> = Vec::with_capacity(1024 * 130);
436 let mut total_uncompressed: u64 = 0;
437 let mut pending_input: Vec<u8> = Vec::new();
438 let mut reached_eof = false;
439 let mut savings = 0i64;
440 loop {
442 let block_capacity = MAX_BLOCK_SIZE as usize;
446 let had_pending = !pending_input.is_empty();
447 let mut uncompressed_data = if had_pending {
448 core::mem::take(&mut pending_input)
449 } else {
450 self.state.matcher.get_next_space()
451 };
452 let mut filled = if had_pending {
453 uncompressed_data.len()
454 } else {
455 0
456 };
457 if uncompressed_data.len() < block_capacity {
458 uncompressed_data.resize(block_capacity, 0);
459 }
460 'read_loop: loop {
461 if reached_eof || filled == block_capacity {
462 break 'read_loop;
463 }
464 let new_bytes = source
465 .read(&mut uncompressed_data[filled..block_capacity])
466 .unwrap();
467 if new_bytes == 0 {
468 reached_eof = true;
469 break 'read_loop;
470 }
471 filled += new_bytes;
472 total_uncompressed += new_bytes as u64;
473 }
474 uncompressed_data.truncate(filled);
475 let mut last_block = reached_eof;
476 let remaining_for_split = if reached_eof {
477 uncompressed_data.len()
478 } else {
479 block_capacity
480 };
481 if !matches!(self.compression_level, CompressionLevel::Uncompressed)
482 && uncompressed_data.len() == block_capacity
483 {
484 let block_len = donor_optimal_block_size(
485 self.compression_level,
486 &uncompressed_data,
487 remaining_for_split,
488 block_capacity,
489 savings,
490 );
491 if block_len < uncompressed_data.len() {
492 pending_input = uncompressed_data.split_off(block_len);
493 if pending_input.capacity() < block_capacity {
500 pending_input.reserve_exact(block_capacity - pending_input.len());
501 }
502 last_block = false;
503 }
504 }
505 #[cfg(feature = "hash")]
507 self.hasher.write(&uncompressed_data);
508 if uncompressed_data.is_empty() {
510 let header = BlockHeader {
511 last_block: true,
512 block_type: crate::blocks::block::BlockType::Raw,
513 block_size: 0,
514 };
515 header.serialize(&mut all_blocks);
516 break;
517 }
518
519 match self.compression_level {
520 CompressionLevel::Uncompressed => {
521 let header = BlockHeader {
522 last_block,
523 block_type: crate::blocks::block::BlockType::Raw,
524 block_size: uncompressed_data.len().try_into().unwrap(),
525 };
526 header.serialize(&mut all_blocks);
527 all_blocks.extend_from_slice(&uncompressed_data);
528 savings +=
529 uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
530 }
531 CompressionLevel::Fastest
532 | CompressionLevel::Default
533 | CompressionLevel::Better
534 | CompressionLevel::Best
535 | CompressionLevel::Level(_) => {
536 let before_len = all_blocks.len();
537 let block_len = uncompressed_data.len();
538 compress_block_encoded(
539 &mut self.state,
540 self.compression_level,
541 last_block,
542 uncompressed_data,
543 &mut all_blocks,
544 );
545 savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
546 }
547 }
548 if last_block && pending_input.is_empty() {
549 break;
550 }
551 }
552
553 let single_segment = !use_dictionary_state
557 && source_size_hint_known
558 && total_uncompressed >= 512
559 && total_uncompressed <= window_size;
560 let header = FrameHeader {
561 frame_content_size: Some(total_uncompressed),
562 single_segment,
563 content_checksum: cfg!(feature = "hash"),
564 dictionary_id: if use_dictionary_state {
565 self.dictionary.as_ref().map(|dict| dict.id as u64)
566 } else {
567 None
568 },
569 window_size: if single_segment {
570 None
571 } else {
572 Some(window_size)
573 },
574 };
575 let mut header_buf: Vec<u8> = Vec::with_capacity(14);
578 header.serialize(&mut header_buf);
579 drain.write_all(&header_buf).unwrap();
580 drain.write_all(&all_blocks).unwrap();
581
582 #[cfg(feature = "hash")]
585 {
586 let content_checksum = self.hasher.finish();
589 drain
590 .write_all(&(content_checksum as u32).to_le_bytes())
591 .unwrap();
592 }
593 }
594
595 pub fn source_mut(&mut self) -> Option<&mut R> {
597 self.uncompressed_data.as_mut()
598 }
599
600 pub fn drain_mut(&mut self) -> Option<&mut W> {
602 self.compressed_data.as_mut()
603 }
604
605 pub fn source(&self) -> Option<&R> {
607 self.uncompressed_data.as_ref()
608 }
609
610 pub fn drain(&self) -> Option<&W> {
612 self.compressed_data.as_ref()
613 }
614
615 pub fn take_source(&mut self) -> Option<R> {
617 self.uncompressed_data.take()
618 }
619
620 pub fn take_drain(&mut self) -> Option<W> {
622 self.compressed_data.take()
623 }
624
625 pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
627 core::mem::swap(&mut match_generator, &mut self.state.matcher);
628 match_generator
629 }
630
631 pub fn set_compression_level(
633 &mut self,
634 compression_level: CompressionLevel,
635 ) -> CompressionLevel {
636 let old = self.compression_level;
637 self.compression_level = compression_level;
638 old
639 }
640
641 pub fn compression_level(&self) -> CompressionLevel {
643 self.compression_level
644 }
645
646 pub fn set_dictionary(
653 &mut self,
654 dictionary: crate::decoding::Dictionary,
655 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
656 {
657 if dictionary.id == 0 {
658 return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
659 }
660 if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
661 return Err(
662 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
663 index: index as u8,
664 },
665 );
666 }
667 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
668 huff: dictionary.huf.table.to_encoder_table(),
669 ll_previous: dictionary
670 .fse
671 .literal_lengths
672 .to_encoder_table()
673 .map(|table| PreviousFseTable::Custom(Box::new(table))),
674 ml_previous: dictionary
675 .fse
676 .match_lengths
677 .to_encoder_table()
678 .map(|table| PreviousFseTable::Custom(Box::new(table))),
679 of_previous: dictionary
680 .fse
681 .offsets
682 .to_encoder_table()
683 .map(|table| PreviousFseTable::Custom(Box::new(table))),
684 });
685 Ok(self.dictionary.replace(dictionary))
686 }
687
688 pub fn set_dictionary_from_bytes(
690 &mut self,
691 raw_dictionary: &[u8],
692 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
693 {
694 let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
695 self.set_dictionary(dictionary)
696 }
697
698 pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
700 self.dictionary_entropy_cache = None;
701 self.dictionary.take()
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 #[cfg(all(feature = "dict_builder", feature = "std"))]
708 use alloc::format;
709 use alloc::vec;
710
711 use super::FrameCompressor;
712 use crate::blocks::block::BlockType;
713 use crate::common::MAGIC_NUM;
714 use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
715 use crate::encoding::{Matcher, Sequence};
716 use alloc::vec::Vec;
717
718 fn generate_data(seed: u64, len: usize) -> Vec<u8> {
719 let mut state = seed;
720 let mut data = Vec::with_capacity(len);
721 for _ in 0..len {
722 state = state
723 .wrapping_mul(6364136223846793005)
724 .wrapping_add(1442695040888963407);
725 data.push((state >> 33) as u8);
726 }
727 data
728 }
729
730 fn first_block_type(frame: &[u8]) -> BlockType {
731 let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
732 let mut decoder = block_decoder::new();
733 let (header, _) = decoder
734 .read_block_header(&frame[header_size as usize..])
735 .expect("block header should parse");
736 header.block_type
737 }
738
739 #[cfg(feature = "std")]
741 #[test]
742 fn fcs_header_written_and_c_zstd_compatible() {
743 let levels = [
744 crate::encoding::CompressionLevel::Uncompressed,
745 crate::encoding::CompressionLevel::Fastest,
746 crate::encoding::CompressionLevel::Default,
747 crate::encoding::CompressionLevel::Better,
748 crate::encoding::CompressionLevel::Best,
749 ];
750 let fcs_2byte = vec![0xCDu8; 300]; let large = vec![0xABu8; 100_000];
752 let inputs: [&[u8]; 5] = [
753 &[],
754 &[0x00],
755 b"abcdefghijklmnopqrstuvwxy\n",
756 &fcs_2byte,
757 &large,
758 ];
759 for level in levels {
760 for data in &inputs {
761 let compressed = crate::encoding::compress_to_vec(*data, level);
762 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
764 .unwrap()
765 .0;
766 assert_eq!(
767 header.frame_content_size(),
768 data.len() as u64,
769 "FCS mismatch for len={} level={:?}",
770 data.len(),
771 level,
772 );
773 assert_ne!(
776 header.descriptor.frame_content_size_bytes().unwrap(),
777 0,
778 "FCS field must be present for len={} level={:?}",
779 data.len(),
780 level,
781 );
782 let mut decoded = Vec::new();
784 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
785 |e| {
786 panic!(
787 "C zstd decode failed for len={} level={level:?}: {e}",
788 data.len()
789 )
790 },
791 );
792 assert_eq!(
793 decoded.as_slice(),
794 *data,
795 "C zstd roundtrip failed for len={}",
796 data.len()
797 );
798 }
799 }
800 }
801
802 #[cfg(feature = "std")]
803 #[test]
804 fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
805 let data = vec![0xAB; 2047];
806 let compressed = {
807 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
808 compressor.set_source_size_hint(data.len() as u64);
809 compressor.set_source(data.as_slice());
810 let mut out = Vec::new();
811 compressor.set_drain(&mut out);
812 compressor.compress();
813 out
814 };
815
816 let mut decoded = Vec::new();
817 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
818 assert_eq!(decoded, data);
819 }
820
821 #[cfg(feature = "std")]
822 #[test]
823 fn small_hinted_default_frame_uses_single_segment_header() {
824 let data = generate_data(0xD15E_A5ED, 1024);
825 let compressed = {
826 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
827 compressor.set_source_size_hint(data.len() as u64);
828 compressor.set_source(data.as_slice());
829 let mut out = Vec::new();
830 compressor.set_drain(&mut out);
831 compressor.compress();
832 out
833 };
834
835 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
836 assert!(
837 frame_header.descriptor.single_segment_flag(),
838 "small hinted default frames should use single-segment header for Rust/FFI parity"
839 );
840 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
841 let mut decoded = Vec::new();
842 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
843 .expect("ffi decoder must accept single-segment small hinted default frame");
844 assert_eq!(decoded, data);
845 }
846
847 #[cfg(feature = "std")]
848 #[test]
849 fn small_hinted_numeric_default_levels_use_single_segment_header() {
850 let data = generate_data(0xA11C_E003, 1024);
851 for level in [
852 super::CompressionLevel::Level(0),
853 super::CompressionLevel::Level(3),
854 ] {
855 let compressed = {
856 let mut compressor = FrameCompressor::new(level);
857 compressor.set_source_size_hint(data.len() as u64);
858 compressor.set_source(data.as_slice());
859 let mut out = Vec::new();
860 compressor.set_drain(&mut out);
861 compressor.compress();
862 out
863 };
864
865 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
866 assert!(
867 frame_header.descriptor.single_segment_flag(),
868 "small hinted numeric default level frames should use single-segment header (level={level:?})"
869 );
870 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
871 let mut decoded = Vec::new();
872 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
873 panic!(
874 "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
875 )
876 });
877 assert_eq!(decoded, data);
878 }
879 }
880
881 #[cfg(feature = "std")]
882 #[test]
883 fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
884 let levels = [
885 super::CompressionLevel::Fastest,
886 super::CompressionLevel::Default,
887 super::CompressionLevel::Better,
888 super::CompressionLevel::Best,
889 super::CompressionLevel::Level(-1),
890 super::CompressionLevel::Level(2),
891 super::CompressionLevel::Level(3),
892 super::CompressionLevel::Level(4),
893 super::CompressionLevel::Level(11),
894 ];
895 let sizes = [
896 511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
897 ];
898
899 for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
900 for &size in &sizes {
901 let data = generate_data(seed + seed_idx as u64, size);
902 for &level in &levels {
903 let compressed = {
904 let mut compressor = FrameCompressor::new(level);
905 compressor.set_source_size_hint(data.len() as u64);
906 compressor.set_source(data.as_slice());
907 let mut out = Vec::new();
908 compressor.set_drain(&mut out);
909 compressor.compress();
910 out
911 };
912 if matches!(size, 511 | 512) {
913 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
914 assert_eq!(
915 frame_header.descriptor.single_segment_flag(),
916 size == 512,
917 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
918 );
919 }
920
921 let mut decoded = Vec::new();
922 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
923 |e| {
924 panic!(
925 "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
926 seed + seed_idx as u64
927 )
928 },
929 );
930 assert_eq!(
931 decoded,
932 data,
933 "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
934 seed + seed_idx as u64
935 );
936 }
937 }
938 }
939 }
940
941 #[cfg(feature = "std")]
942 #[test]
943 fn hinted_levels_use_single_segment_header_symmetrically() {
944 let levels = [
945 super::CompressionLevel::Fastest,
946 super::CompressionLevel::Default,
947 super::CompressionLevel::Better,
948 super::CompressionLevel::Best,
949 super::CompressionLevel::Level(0),
950 super::CompressionLevel::Level(2),
951 super::CompressionLevel::Level(3),
952 super::CompressionLevel::Level(4),
953 super::CompressionLevel::Level(11),
954 ];
955 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
956 let size = 1024 + seed_idx * 97;
957 let data = generate_data(seed, size);
958 for &level in &levels {
959 let compressed = {
960 let mut compressor = FrameCompressor::new(level);
961 compressor.set_source_size_hint(data.len() as u64);
962 compressor.set_source(data.as_slice());
963 let mut out = Vec::new();
964 compressor.set_drain(&mut out);
965 compressor.compress();
966 out
967 };
968 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
969 assert!(
970 frame_header.descriptor.single_segment_flag(),
971 "hinted frame should be single-segment for level={level:?} size={}",
972 data.len()
973 );
974 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
975 let mut decoded = Vec::new();
976 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
977 panic!(
978 "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
979 data.len()
980 )
981 });
982 assert_eq!(decoded, data);
983 }
984 }
985 }
986
987 #[cfg(feature = "std")]
988 #[test]
989 fn hinted_levels_pin_511_512_single_segment_boundary() {
990 let levels = [
991 super::CompressionLevel::Fastest,
992 super::CompressionLevel::Default,
993 super::CompressionLevel::Better,
994 super::CompressionLevel::Best,
995 super::CompressionLevel::Level(0),
996 super::CompressionLevel::Level(2),
997 super::CompressionLevel::Level(3),
998 super::CompressionLevel::Level(4),
999 super::CompressionLevel::Level(11),
1000 ];
1001 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1002 for &size in &[511usize, 512] {
1003 let data = generate_data(seed + seed_idx as u64, size);
1004 for &level in &levels {
1005 let compressed = {
1006 let mut compressor = FrameCompressor::new(level);
1007 compressor.set_source_size_hint(data.len() as u64);
1008 compressor.set_source(data.as_slice());
1009 let mut out = Vec::new();
1010 compressor.set_drain(&mut out);
1011 compressor.compress();
1012 out
1013 };
1014 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1015 assert_eq!(
1016 frame_header.descriptor.single_segment_flag(),
1017 size == 512,
1018 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1019 );
1020 let mut decoded = Vec::new();
1021 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1022 |e| {
1023 panic!(
1024 "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
1025 seed + seed_idx as u64
1026 )
1027 },
1028 );
1029 assert_eq!(decoded, data);
1030 }
1031 }
1032 }
1033 }
1034
1035 #[cfg(feature = "std")]
1036 #[test]
1037 fn fastest_random_block_uses_raw_fast_path() {
1038 let data = generate_data(0xC0FF_EE11, 10 * 1024);
1039 let compressed =
1040 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
1041
1042 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1043
1044 let mut decoded = Vec::new();
1045 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1046 assert_eq!(decoded, data);
1047 }
1048
1049 #[cfg(feature = "std")]
1050 #[test]
1051 fn default_random_block_uses_raw_fast_path() {
1052 let data = generate_data(0xD15E_A5ED, 10 * 1024);
1053 let compressed =
1054 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
1055
1056 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1057
1058 let mut decoded = Vec::new();
1059 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1060 assert_eq!(decoded, data);
1061 }
1062
1063 #[cfg(feature = "std")]
1064 #[test]
1065 fn best_random_block_uses_raw_fast_path() {
1066 let data = generate_data(0xB35C_AFE1, 10 * 1024);
1067 let compressed =
1068 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
1069
1070 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1071
1072 let mut decoded = Vec::new();
1073 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1074 assert_eq!(decoded, data);
1075 }
1076
1077 #[cfg(feature = "std")]
1078 #[test]
1079 fn level2_random_block_uses_raw_fast_path() {
1080 let data = generate_data(0xA11C_E222, 10 * 1024);
1081 let compressed =
1082 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
1083
1084 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1085
1086 let mut decoded = Vec::new();
1087 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1088 assert_eq!(decoded, data);
1089 }
1090
1091 #[cfg(feature = "std")]
1092 #[test]
1093 fn better_random_block_uses_raw_fast_path() {
1094 let data = generate_data(0xBE77_E111, 10 * 1024);
1095 let compressed =
1096 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
1097
1098 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1099
1100 let mut decoded = Vec::new();
1101 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1102 assert_eq!(decoded, data);
1103 }
1104
1105 #[cfg(feature = "std")]
1106 #[test]
1107 fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
1108 let mut data = Vec::with_capacity(16 * 1024);
1109 const LINE: &[u8] =
1110 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1111 while data.len() < 16 * 1024 {
1112 let remaining = 16 * 1024 - data.len();
1113 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1114 }
1115
1116 fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
1117 let compressed = crate::encoding::compress_to_vec(data, level);
1118 assert_ne!(first_block_type(&compressed), BlockType::Raw);
1119 assert!(
1120 compressed.len() < data.len(),
1121 "compressible input should remain compressible for level={level:?}"
1122 );
1123 let mut decoded = Vec::new();
1124 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1125 assert_eq!(decoded, data);
1126 }
1127
1128 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
1129 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
1130 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
1131 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
1132 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
1133 }
1134
1135 #[cfg(feature = "std")]
1136 #[test]
1137 fn hinted_small_compressible_frames_use_single_segment_across_levels() {
1138 let mut data = Vec::with_capacity(4 * 1024);
1139 const LINE: &[u8] =
1140 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1141 while data.len() < 4 * 1024 {
1142 let remaining = 4 * 1024 - data.len();
1143 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1144 }
1145
1146 for level in [
1147 super::CompressionLevel::Fastest,
1148 super::CompressionLevel::Default,
1149 super::CompressionLevel::Better,
1150 super::CompressionLevel::Best,
1151 super::CompressionLevel::Level(0),
1152 super::CompressionLevel::Level(3),
1153 super::CompressionLevel::Level(4),
1154 super::CompressionLevel::Level(11),
1155 ] {
1156 let compressed = {
1157 let mut compressor = FrameCompressor::new(level);
1158 compressor.set_source_size_hint(data.len() as u64);
1159 compressor.set_source(data.as_slice());
1160 let mut out = Vec::new();
1161 compressor.set_drain(&mut out);
1162 compressor.compress();
1163 out
1164 };
1165 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1166 assert!(
1167 frame_header.descriptor.single_segment_flag(),
1168 "hinted small compressible frame should use single-segment (level={level:?})"
1169 );
1170 assert_ne!(
1171 first_block_type(&compressed),
1172 BlockType::Raw,
1173 "compressible hinted frame should stay off raw fast path (level={level:?})"
1174 );
1175 assert!(
1176 compressed.len() < data.len(),
1177 "compressible hinted frame should still shrink (level={level:?})"
1178 );
1179 let mut decoded = Vec::new();
1180 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1181 .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
1182 assert_eq!(decoded, data);
1183 }
1184 }
1185
1186 struct NoDictionaryMatcher {
1187 last_space: Vec<u8>,
1188 window_size: u64,
1189 }
1190
1191 impl NoDictionaryMatcher {
1192 fn new(window_size: u64) -> Self {
1193 Self {
1194 last_space: Vec::new(),
1195 window_size,
1196 }
1197 }
1198 }
1199
1200 impl Matcher for NoDictionaryMatcher {
1201 fn get_next_space(&mut self) -> Vec<u8> {
1202 vec![0; self.window_size as usize]
1203 }
1204
1205 fn get_last_space(&mut self) -> &[u8] {
1206 self.last_space.as_slice()
1207 }
1208
1209 fn commit_space(&mut self, space: Vec<u8>) {
1210 self.last_space = space;
1211 }
1212
1213 fn skip_matching(&mut self) {}
1214
1215 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1216 handle_sequence(Sequence::Literals {
1217 literals: self.last_space.as_slice(),
1218 });
1219 }
1220
1221 fn reset(&mut self, _level: super::CompressionLevel) {
1222 self.last_space.clear();
1223 }
1224
1225 fn window_size(&self) -> u64 {
1226 self.window_size
1227 }
1228 }
1229
1230 #[test]
1231 fn frame_starts_with_magic_num() {
1232 let mock_data = [1_u8, 2, 3].as_slice();
1233 let mut output: Vec<u8> = Vec::new();
1234 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1235 compressor.set_source(mock_data);
1236 compressor.set_drain(&mut output);
1237
1238 compressor.compress();
1239 assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1240 }
1241
1242 #[test]
1243 fn very_simple_raw_compress() {
1244 let mock_data = [1_u8, 2, 3].as_slice();
1245 let mut output: Vec<u8> = Vec::new();
1246 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1247 compressor.set_source(mock_data);
1248 compressor.set_drain(&mut output);
1249
1250 compressor.compress();
1251 }
1252
1253 #[test]
1254 fn very_simple_compress() {
1255 let mut mock_data = vec![0; 1 << 17];
1256 mock_data.extend(vec![1; (1 << 17) - 1]);
1257 mock_data.extend(vec![2; (1 << 18) - 1]);
1258 mock_data.extend(vec![2; 1 << 17]);
1259 mock_data.extend(vec![3; (1 << 17) - 1]);
1260 let mut output: Vec<u8> = Vec::new();
1261 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1262 compressor.set_source(mock_data.as_slice());
1263 compressor.set_drain(&mut output);
1264
1265 compressor.compress();
1266
1267 let mut decoder = FrameDecoder::new();
1268 let mut decoded = Vec::with_capacity(mock_data.len());
1269 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1270 assert_eq!(mock_data, decoded);
1271
1272 let mut decoded = Vec::new();
1273 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1274 assert_eq!(mock_data, decoded);
1275 }
1276
1277 #[test]
1278 fn rle_compress() {
1279 let mock_data = vec![0; 1 << 19];
1280 let mut output: Vec<u8> = Vec::new();
1281 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1282 compressor.set_source(mock_data.as_slice());
1283 compressor.set_drain(&mut output);
1284
1285 compressor.compress();
1286
1287 let mut decoder = FrameDecoder::new();
1288 let mut decoded = Vec::with_capacity(mock_data.len());
1289 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1290 assert_eq!(mock_data, decoded);
1291 }
1292
1293 #[test]
1294 fn aaa_compress() {
1295 let mock_data = vec![0, 1, 3, 4, 5];
1296 let mut output: Vec<u8> = Vec::new();
1297 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1298 compressor.set_source(mock_data.as_slice());
1299 compressor.set_drain(&mut output);
1300
1301 compressor.compress();
1302
1303 let mut decoder = FrameDecoder::new();
1304 let mut decoded = Vec::with_capacity(mock_data.len());
1305 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1306 assert_eq!(mock_data, decoded);
1307
1308 let mut decoded = Vec::new();
1309 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1310 assert_eq!(mock_data, decoded);
1311 }
1312
1313 #[test]
1314 fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1315 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1316 let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1317 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1318
1319 let mut data = Vec::new();
1320 for _ in 0..8 {
1321 data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1322 }
1323
1324 let mut with_dict = Vec::new();
1325 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1326 let previous = compressor
1327 .set_dictionary_from_bytes(dict_raw)
1328 .expect("dictionary bytes should parse");
1329 assert!(
1330 previous.is_none(),
1331 "first dictionary insert should return None"
1332 );
1333 assert_eq!(
1334 compressor
1335 .set_dictionary(dict_for_encoder)
1336 .expect("valid dictionary should attach")
1337 .expect("set_dictionary_from_bytes inserted previous dictionary")
1338 .id,
1339 dict_for_decoder.id
1340 );
1341 compressor.set_source(data.as_slice());
1342 compressor.set_drain(&mut with_dict);
1343 compressor.compress();
1344
1345 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1346 .expect("encoded stream should have a frame header");
1347 assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1348
1349 let mut decoder = FrameDecoder::new();
1350 let mut missing_dict_target = Vec::with_capacity(data.len());
1351 let err = decoder
1352 .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1353 .unwrap_err();
1354 assert!(
1355 matches!(
1356 &err,
1357 crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1358 ),
1359 "dict-compressed stream should require dictionary id, got: {err:?}"
1360 );
1361
1362 let mut decoder = FrameDecoder::new();
1363 decoder.add_dict(dict_for_decoder).unwrap();
1364 let mut decoded = Vec::with_capacity(data.len());
1365 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1366 assert_eq!(decoded, data);
1367
1368 let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1369 let mut ffi_decoded = Vec::with_capacity(data.len());
1370 let ffi_written = ffi_decoder
1371 .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1372 .unwrap();
1373 assert_eq!(ffi_written, data.len());
1374 assert_eq!(ffi_decoded, data);
1375 }
1376
1377 #[cfg(all(feature = "dict_builder", feature = "std"))]
1378 #[test]
1379 fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1380 use std::io::Cursor;
1381
1382 let mut training = Vec::new();
1383 for idx in 0..256u32 {
1384 training.extend_from_slice(
1385 format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1386 );
1387 }
1388 let mut raw_dict = Vec::new();
1389 crate::dictionary::create_raw_dict_from_source(
1390 Cursor::new(training.as_slice()),
1391 training.len(),
1392 &mut raw_dict,
1393 4096,
1394 )
1395 .expect("dict_builder training should succeed");
1396 assert!(
1397 !raw_dict.is_empty(),
1398 "dict_builder produced an empty dictionary"
1399 );
1400
1401 let dict_id = 0xD1C7_0008;
1402 let encoder_dict =
1403 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1404 let decoder_dict =
1405 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1406
1407 let mut payload = Vec::new();
1408 for idx in 0..96u32 {
1409 payload.extend_from_slice(
1410 format!(
1411 "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1412 )
1413 .as_bytes(),
1414 );
1415 }
1416
1417 let mut without_dict = Vec::new();
1418 let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1419 baseline.set_source(payload.as_slice());
1420 baseline.set_drain(&mut without_dict);
1421 baseline.compress();
1422
1423 let mut with_dict = Vec::new();
1424 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1425 compressor
1426 .set_dictionary(encoder_dict)
1427 .expect("valid dict_builder dictionary should attach");
1428 compressor.set_source(payload.as_slice());
1429 compressor.set_drain(&mut with_dict);
1430 compressor.compress();
1431
1432 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1433 .expect("encoded stream should have a frame header");
1434 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1435 let mut decoder = FrameDecoder::new();
1436 decoder.add_dict(decoder_dict).unwrap();
1437 let mut decoded = Vec::with_capacity(payload.len());
1438 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1439 assert_eq!(decoded, payload);
1440 assert!(
1441 with_dict.len() < without_dict.len(),
1442 "trained dictionary should improve compression for this small payload"
1443 );
1444 }
1445
1446 #[test]
1447 fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1448 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1449 let mut output = Vec::new();
1450 let input = b"";
1451
1452 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1453 let previous = compressor
1454 .set_dictionary_from_bytes(dict_raw)
1455 .expect("dictionary bytes should parse");
1456 assert!(previous.is_none());
1457
1458 compressor.set_source(input.as_slice());
1459 compressor.set_drain(&mut output);
1460 compressor.compress();
1461
1462 assert!(
1463 compressor.state.last_huff_table.is_some(),
1464 "dictionary entropy should seed previous huffman table before first block"
1465 );
1466 assert!(
1467 compressor.state.fse_tables.ll_previous.is_some(),
1468 "dictionary entropy should seed previous ll table before first block"
1469 );
1470 assert!(
1471 compressor.state.fse_tables.ml_previous.is_some(),
1472 "dictionary entropy should seed previous ml table before first block"
1473 );
1474 assert!(
1475 compressor.state.fse_tables.of_previous.is_some(),
1476 "dictionary entropy should seed previous of table before first block"
1477 );
1478 }
1479
1480 #[test]
1481 fn set_dictionary_rejects_zero_dictionary_id() {
1482 let invalid = crate::decoding::Dictionary {
1483 id: 0,
1484 fse: crate::decoding::scratch::FSEScratch::new(),
1485 huf: crate::decoding::scratch::HuffmanScratch::new(),
1486 dict_content: vec![1, 2, 3],
1487 offset_hist: [1, 4, 8],
1488 };
1489
1490 let mut compressor: FrameCompressor<
1491 &[u8],
1492 Vec<u8>,
1493 crate::encoding::match_generator::MatchGeneratorDriver,
1494 > = FrameCompressor::new(super::CompressionLevel::Fastest);
1495 let result = compressor.set_dictionary(invalid);
1496 assert!(matches!(
1497 result,
1498 Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1499 ));
1500 }
1501
1502 #[test]
1503 fn set_dictionary_rejects_zero_repeat_offsets() {
1504 let invalid = crate::decoding::Dictionary {
1505 id: 1,
1506 fse: crate::decoding::scratch::FSEScratch::new(),
1507 huf: crate::decoding::scratch::HuffmanScratch::new(),
1508 dict_content: vec![1, 2, 3],
1509 offset_hist: [0, 4, 8],
1510 };
1511
1512 let mut compressor: FrameCompressor<
1513 &[u8],
1514 Vec<u8>,
1515 crate::encoding::match_generator::MatchGeneratorDriver,
1516 > = FrameCompressor::new(super::CompressionLevel::Fastest);
1517 let result = compressor.set_dictionary(invalid);
1518 assert!(matches!(
1519 result,
1520 Err(
1521 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1522 index: 0
1523 }
1524 )
1525 ));
1526 }
1527
1528 #[test]
1529 fn uncompressed_mode_does_not_require_dictionary() {
1530 let dict_id = 0xABCD_0001;
1531 let dict =
1532 crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1533 .expect("raw dictionary should be valid");
1534
1535 let payload = b"plain-bytes-that-should-stay-raw";
1536 let mut output = Vec::new();
1537 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1538 compressor
1539 .set_dictionary(dict)
1540 .expect("dictionary should attach in uncompressed mode");
1541 compressor.set_source(payload.as_slice());
1542 compressor.set_drain(&mut output);
1543 compressor.compress();
1544
1545 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1546 .expect("encoded frame should have a header");
1547 assert_eq!(
1548 frame_header.dictionary_id(),
1549 None,
1550 "raw/uncompressed frames must not advertise dictionary dependency"
1551 );
1552
1553 let mut decoder = FrameDecoder::new();
1554 let mut decoded = Vec::with_capacity(payload.len());
1555 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1556 assert_eq!(decoded, payload);
1557 }
1558
1559 #[test]
1560 fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
1561 use crate::encoding::match_generator::MatchGeneratorDriver;
1562
1563 let dict_id = 0xABCD_0002;
1564 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1565 .expect("raw dictionary should be valid");
1566 let dict_for_decoder =
1567 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1568 .expect("raw dictionary should be valid");
1569
1570 let payload = b"abcdefgh".repeat(128 * 1024 / 8 + 64);
1573 let matcher = MatchGeneratorDriver::new(1024, 1);
1574
1575 let mut no_dict_output = Vec::new();
1576 let mut no_dict_compressor =
1577 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1578 no_dict_compressor.set_source(payload.as_slice());
1579 no_dict_compressor.set_drain(&mut no_dict_output);
1580 no_dict_compressor.compress();
1581 let (no_dict_frame_header, _) =
1582 crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
1583 .expect("baseline frame should have a header");
1584 let no_dict_window = no_dict_frame_header
1585 .window_size()
1586 .expect("window size should be present");
1587
1588 let mut output = Vec::new();
1589 let matcher = MatchGeneratorDriver::new(1024, 1);
1590 let mut compressor =
1591 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1592 compressor
1593 .set_dictionary(dict)
1594 .expect("dictionary should attach");
1595 compressor.set_source(payload.as_slice());
1596 compressor.set_drain(&mut output);
1597 compressor.compress();
1598
1599 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1600 .expect("encoded frame should have a header");
1601 let advertised_window = frame_header
1602 .window_size()
1603 .expect("window size should be present");
1604 assert_eq!(
1605 advertised_window, no_dict_window,
1606 "dictionary priming must not inflate advertised window size"
1607 );
1608 assert!(
1609 payload.len() > advertised_window as usize,
1610 "test must cross the advertised window boundary"
1611 );
1612
1613 let mut decoder = FrameDecoder::new();
1614 decoder.add_dict(dict_for_decoder).unwrap();
1615 let mut decoded = Vec::with_capacity(payload.len());
1616 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1617 assert_eq!(decoded, payload);
1618 }
1619
1620 #[test]
1621 fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
1622 let dict_id = 0xABCD_0004;
1623 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1625 let dict_for_decoder =
1626 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1627 let payload = b"abcdabcdabcdabcd".repeat(128);
1628
1629 let mut hinted_output = Vec::new();
1630 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1631 hinted.set_dictionary(dict).unwrap();
1632 hinted.set_source_size_hint(1);
1633 hinted.set_source(payload.as_slice());
1634 hinted.set_drain(&mut hinted_output);
1635 hinted.compress();
1636
1637 let mut no_hint_output = Vec::new();
1638 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1639 no_hint
1640 .set_dictionary(
1641 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1642 .unwrap(),
1643 )
1644 .unwrap();
1645 no_hint.set_source(payload.as_slice());
1646 no_hint.set_drain(&mut no_hint_output);
1647 no_hint.compress();
1648
1649 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1650 .expect("encoded frame should have a header")
1651 .0
1652 .window_size()
1653 .expect("window size should be present");
1654 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1655 .expect("encoded frame should have a header")
1656 .0
1657 .window_size()
1658 .expect("window size should be present");
1659 assert!(
1660 hinted_window <= no_hint_window,
1661 "source-size hint should not increase advertised window with dictionary priming",
1662 );
1663
1664 let mut decoder = FrameDecoder::new();
1665 decoder.add_dict(dict_for_decoder).unwrap();
1666 let mut decoded = Vec::with_capacity(payload.len());
1667 decoder
1668 .decode_all_to_vec(&hinted_output, &mut decoded)
1669 .unwrap();
1670 assert_eq!(decoded, payload);
1671 }
1672
1673 #[test]
1674 fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
1675 let dict_id = 0xABCD_0005;
1676 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1678 let dict_for_decoder =
1679 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1680 let payload = b"abcd".repeat(1024); let payload_len = payload.len() as u64;
1682
1683 let mut hinted_output = Vec::new();
1684 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1685 hinted.set_dictionary(dict).unwrap();
1686 hinted.set_source_size_hint(payload_len);
1687 hinted.set_source(payload.as_slice());
1688 hinted.set_drain(&mut hinted_output);
1689 hinted.compress();
1690
1691 let mut no_hint_output = Vec::new();
1692 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1693 no_hint
1694 .set_dictionary(
1695 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1696 .unwrap(),
1697 )
1698 .unwrap();
1699 no_hint.set_source(payload.as_slice());
1700 no_hint.set_drain(&mut no_hint_output);
1701 no_hint.compress();
1702
1703 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1704 .expect("encoded frame should have a header")
1705 .0
1706 .window_size()
1707 .expect("window size should be present");
1708 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1709 .expect("encoded frame should have a header")
1710 .0
1711 .window_size()
1712 .expect("window size should be present");
1713 assert!(
1714 hinted_window <= no_hint_window,
1715 "source-size hint should not increase advertised window with dictionary priming",
1716 );
1717
1718 let mut decoder = FrameDecoder::new();
1719 decoder.add_dict(dict_for_decoder).unwrap();
1720 let mut decoded = Vec::with_capacity(payload.len());
1721 decoder
1722 .decode_all_to_vec(&hinted_output, &mut decoded)
1723 .unwrap();
1724 assert_eq!(decoded, payload);
1725 }
1726
1727 #[test]
1728 fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
1729 let dict_id = 0xABCD_0003;
1730 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1731 .expect("raw dictionary should be valid");
1732 let payload = b"abcdefghabcdefgh";
1733
1734 let mut output = Vec::new();
1735 let matcher = NoDictionaryMatcher::new(64);
1736 let mut compressor =
1737 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1738 compressor
1739 .set_dictionary(dict)
1740 .expect("dictionary should attach");
1741 compressor.set_source(payload.as_slice());
1742 compressor.set_drain(&mut output);
1743 compressor.compress();
1744
1745 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1746 .expect("encoded frame should have a header");
1747 assert_eq!(
1748 frame_header.dictionary_id(),
1749 None,
1750 "matchers that do not support dictionary priming must not advertise dictionary dependency"
1751 );
1752
1753 let mut decoder = FrameDecoder::new();
1754 let mut decoded = Vec::with_capacity(payload.len());
1755 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1756 assert_eq!(decoded, payload);
1757 }
1758
1759 #[cfg(feature = "hash")]
1760 #[test]
1761 fn checksum_two_frames_reused_compressor() {
1762 let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
1768
1769 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1770
1771 let mut compressed1 = Vec::new();
1773 compressor.set_source(data.as_slice());
1774 compressor.set_drain(&mut compressed1);
1775 compressor.compress();
1776
1777 let mut compressed2 = Vec::new();
1779 compressor.set_source(data.as_slice());
1780 compressor.set_drain(&mut compressed2);
1781 compressor.compress();
1782
1783 fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
1784 let mut decoder = FrameDecoder::new();
1785 let mut source = compressed;
1786 decoder.reset(&mut source).unwrap();
1787 while !decoder.is_finished() {
1788 decoder
1789 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
1790 .unwrap();
1791 }
1792 let mut decoded = Vec::new();
1793 decoder.collect_to_writer(&mut decoded).unwrap();
1794 (
1795 decoded,
1796 decoder.get_checksum_from_data(),
1797 decoder.get_calculated_checksum(),
1798 )
1799 }
1800
1801 let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
1802 assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
1803 assert_eq!(
1804 chksum_from_data1, chksum_calculated1,
1805 "frame 1: checksum mismatch"
1806 );
1807
1808 let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
1809 assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
1810 assert_eq!(
1811 chksum_from_data2, chksum_calculated2,
1812 "frame 2: checksum mismatch"
1813 );
1814
1815 assert_eq!(
1818 chksum_from_data1, chksum_from_data2,
1819 "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
1820 );
1821 }
1822
1823 #[cfg(feature = "std")]
1824 #[test]
1825 fn fuzz_targets() {
1826 use std::io::Read;
1827 fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
1828 let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
1829 let mut result: Vec<u8> = Vec::new();
1830 decoder.read_to_end(&mut result).expect("Decoding failed");
1831 result
1832 }
1833
1834 fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
1835 let mut decoder = crate::decoding::FrameDecoder::new();
1836 decoder.reset(&mut data).unwrap();
1837 let mut result = vec![];
1838 while !decoder.is_finished() || decoder.can_collect() > 0 {
1839 decoder
1840 .decode_blocks(
1841 &mut data,
1842 crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
1843 )
1844 .unwrap();
1845 decoder.collect_to_writer(&mut result).unwrap();
1846 }
1847 result
1848 }
1849
1850 fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1851 zstd::stream::encode_all(std::io::Cursor::new(data), 3)
1852 }
1853
1854 fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1855 let mut input = Vec::new();
1856 data.read_to_end(&mut input).unwrap();
1857
1858 crate::encoding::compress_to_vec(
1859 input.as_slice(),
1860 crate::encoding::CompressionLevel::Uncompressed,
1861 )
1862 }
1863
1864 fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1865 let mut input = Vec::new();
1866 data.read_to_end(&mut input).unwrap();
1867
1868 crate::encoding::compress_to_vec(
1869 input.as_slice(),
1870 crate::encoding::CompressionLevel::Fastest,
1871 )
1872 }
1873
1874 fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1875 let mut output = Vec::new();
1876 zstd::stream::copy_decode(data, &mut output)?;
1877 Ok(output)
1878 }
1879 if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
1880 for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
1881 if file.as_ref().unwrap().file_type().unwrap().is_file() {
1882 let data = std::fs::read(file.unwrap().path()).unwrap();
1883 let data = data.as_slice();
1884 let compressed = encode_zstd(data).unwrap();
1886 let decoded = decode_szstd(&mut compressed.as_slice());
1887 let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
1888 assert!(
1889 decoded == data,
1890 "Decoded data did not match the original input during decompression"
1891 );
1892 assert_eq!(
1893 decoded2, data,
1894 "Decoded data did not match the original input during decompression"
1895 );
1896
1897 let mut input = data;
1900 let compressed = encode_szstd_uncompressed(&mut input);
1901 let decoded = decode_zstd(&compressed).unwrap();
1902 assert_eq!(
1903 decoded, data,
1904 "Decoded data did not match the original input during compression"
1905 );
1906 let mut input = data;
1908 let compressed = encode_szstd_compressed(&mut input);
1909 let decoded = decode_zstd(&compressed).unwrap();
1910 assert_eq!(
1911 decoded, data,
1912 "Decoded data did not match the original input during compression"
1913 );
1914 }
1915 }
1916 }
1917 }
1918}