1use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12 CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13 match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
40 uncompressed_data: Option<R>,
41 compressed_data: Option<W>,
42 compression_level: CompressionLevel,
43 dictionary: Option<crate::decoding::Dictionary>,
44 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
45 source_size_hint: Option<u64>,
46 state: CompressState<M>,
47 magicless: bool,
53 #[cfg(feature = "hash")]
54 hasher: XxHash64,
55 #[cfg(feature = "lsm")]
59 frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
60 #[cfg(all(feature = "lsm", feature = "hash"))]
66 per_block_checksums_enabled: bool,
67 #[cfg(all(feature = "lsm", feature = "hash"))]
72 block_checksums: Option<alloc::vec::Vec<u32>>,
73}
74
75#[derive(Clone, Default)]
76struct CachedDictionaryEntropy {
77 huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
78 ll_previous: Option<PreviousFseTable>,
79 ml_previous: Option<PreviousFseTable>,
80 of_previous: Option<PreviousFseTable>,
81}
82
83#[derive(Clone)]
84pub(crate) enum PreviousFseTable {
85 Default,
88 Custom(Box<FSETable>),
89 Rle(u8),
90}
91
92impl PreviousFseTable {
93 pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
94 match self {
95 Self::Default => Some(default),
96 Self::Custom(table) => Some(table),
97 Self::Rle(_) => None,
98 }
99 }
100}
101
102pub(crate) struct FseTables {
103 pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
115 pub(crate) ll_previous: Option<PreviousFseTable>,
116 pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
117 pub(crate) ml_previous: Option<PreviousFseTable>,
118 pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
119 pub(crate) of_previous: Option<PreviousFseTable>,
120}
121
122impl FseTables {
123 pub fn new() -> Self {
124 Self {
125 ll_default: default_ll_table(),
126 ll_previous: None,
127 ml_default: default_ml_table(),
128 ml_previous: None,
129 of_default: default_of_table(),
130 of_previous: None,
131 }
132 }
133
134 #[inline]
141 #[allow(clippy::borrow_deref_ref)]
142 pub(crate) fn ll_default_ref(&self) -> &FSETable {
143 &*self.ll_default
144 }
145
146 #[inline]
148 #[allow(clippy::borrow_deref_ref)]
149 pub(crate) fn ml_default_ref(&self) -> &FSETable {
150 &*self.ml_default
151 }
152
153 #[inline]
155 #[allow(clippy::borrow_deref_ref)]
156 pub(crate) fn of_default_ref(&self) -> &FSETable {
157 &*self.of_default
158 }
159}
160
161const PRESPLIT_BLOCK_MIN: usize = 3500;
162const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
163const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
164const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
165const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
166const PRESPLIT_HASH_LOG_MAX: usize = 10;
167const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
168const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
169const PRESPLIT_BORDERS_SEGMENT: usize = 512;
174
175#[derive(Clone)]
176struct PreSplitFingerprint {
177 events: [u32; PRESPLIT_HASH_TABLE_SIZE],
178 nb_events: usize,
179}
180
181impl Default for PreSplitFingerprint {
182 fn default() -> Self {
183 Self {
184 events: [0; PRESPLIT_HASH_TABLE_SIZE],
185 nb_events: 0,
186 }
187 }
188}
189
190fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
191 debug_assert!(hash_log >= 8);
192 if hash_log == 8 {
193 return bytes[0] as usize;
194 }
195 debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
196 let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
197 (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
198}
199
200fn presplit_record_fingerprint(
201 fp: &mut PreSplitFingerprint,
202 src: &[u8],
203 sampling_rate: usize,
204 hash_log: usize,
205) {
206 fp.events.fill(0);
207 fp.nb_events = 0;
208 if src.len() < 2 {
209 return;
210 }
211 let limit = src.len() - 1;
212 let mut n = 0usize;
213 while n < limit {
214 fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
215 n += sampling_rate;
216 }
217 fp.nb_events += limit / sampling_rate;
220}
221
222fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
228 fp.events.fill(0);
229 for &b in src {
230 fp.events[b as usize] += 1;
231 }
232 fp.nb_events = src.len();
235}
236
237fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
238 let slots = 1usize << hash_log;
239 let mut distance = 0u64;
240 for idx in 0..slots {
241 let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
242 let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
243 distance = distance.saturating_add(left.abs_diff(right) as u64);
244 }
245 distance
246}
247
248fn presplit_fingerprints_differ(
249 reference: &PreSplitFingerprint,
250 new_fp: &PreSplitFingerprint,
251 penalty: i32,
252 hash_log: usize,
253) -> bool {
254 debug_assert!(reference.nb_events > 0);
255 debug_assert!(new_fp.nb_events > 0);
256 let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
257 let deviation = presplit_distance(reference, new_fp, hash_log);
258 let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
259 / PRESPLIT_THRESHOLD_PENALTY_RATE;
260 deviation >= threshold
261}
262
263fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
264 for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
265 acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
266 }
267 acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
268}
269
270fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
271 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
272 debug_assert!((1..=4).contains(&level));
273 let (sampling_rate, hash_log) = match level - 1 {
274 0 => (43, 8),
275 1 => (11, 9),
276 2 => (5, 10),
277 _ => (1, 10),
278 };
279
280 let mut past = PreSplitFingerprint::default();
281 let mut new_events = PreSplitFingerprint::default();
282 let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
283 presplit_record_fingerprint(
284 &mut past,
285 &block[..PRESPLIT_CHUNK_SIZE],
286 sampling_rate,
287 hash_log,
288 );
289 let mut pos = PRESPLIT_CHUNK_SIZE;
290 while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
291 presplit_record_fingerprint(
292 &mut new_events,
293 &block[pos..pos + PRESPLIT_CHUNK_SIZE],
294 sampling_rate,
295 hash_log,
296 );
297 if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
298 return pos;
299 }
300 presplit_merge_events(&mut past, &new_events);
301 if penalty > 0 {
302 penalty -= 1;
303 }
304 pos += PRESPLIT_CHUNK_SIZE;
305 }
306 block.len()
307}
308
309fn donor_split_block_from_borders(block: &[u8]) -> usize {
317 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
318 let block_size = block.len();
319 let mut past = PreSplitFingerprint::default();
320 let mut new_fp = PreSplitFingerprint::default();
321 presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
322 presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
323 if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
326 return block_size;
327 }
328
329 let mut middle = PreSplitFingerprint::default();
330 let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
331 presplit_record_byte_histogram(
332 &mut middle,
333 &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
334 );
335
336 let dist_from_begin = presplit_distance(&past, &middle, 8);
337 let dist_from_end = presplit_distance(&new_fp, &middle, 8);
338 let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
342 if dist_from_begin.abs_diff(dist_from_end) < min_distance {
343 return 64 * 1024;
344 }
345 if dist_from_begin > dist_from_end {
354 32 * 1024
355 } else {
356 96 * 1024
357 }
358}
359
360fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
361 match level {
362 CompressionLevel::Level(11..=15) => Some(0),
368 CompressionLevel::Level(16..=22) => Some(4),
372 _ => None,
373 }
374}
375
376#[cfg(all(feature = "lsm", feature = "hash"))]
383#[inline]
384pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
385 let mut h = XxHash64::with_seed(0);
386 h.write(data);
387 h.finish() as u32
388}
389
390#[cfg(feature = "bench_internals")]
398pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
399 assert_eq!(
400 block.len(),
401 MAX_BLOCK_SIZE as usize,
402 "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
403 );
404 assert!(
405 split_level <= 4,
406 "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
407 );
408 if split_level == 0 {
409 donor_split_block_from_borders(block)
410 } else {
411 donor_split_block_by_chunks(block, split_level)
412 }
413}
414
415pub(crate) fn donor_optimal_block_size(
416 level: CompressionLevel,
417 block: &[u8],
418 remaining_src_size: usize,
419 block_size_max: usize,
420 savings: i64,
421) -> usize {
422 let Some(split_level) = donor_pre_split_level(level) else {
423 return remaining_src_size.min(block_size_max);
424 };
425 if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
426 return remaining_src_size.min(block_size_max);
427 }
428 if savings < 3 {
429 return MAX_BLOCK_SIZE as usize;
430 }
431 if block.len() < MAX_BLOCK_SIZE as usize {
432 return remaining_src_size.min(block_size_max);
433 }
434 let raw_split = if split_level == 0 {
439 donor_split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
440 } else {
441 donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
442 };
443 raw_split
444 .max(PRESPLIT_BLOCK_MIN)
445 .min(MAX_BLOCK_SIZE as usize)
446}
447
448pub(crate) struct CompressState<M: Matcher> {
449 pub(crate) matcher: M,
450 pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
451 pub(crate) fse_tables: FseTables,
452 pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
453 pub(crate) offset_hist: [u32; 3],
456 pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
474}
475
476impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
477 pub fn new(compression_level: CompressionLevel) -> Self {
479 Self {
480 uncompressed_data: None,
481 compressed_data: None,
482 compression_level,
483 dictionary: None,
484 dictionary_entropy_cache: None,
485 source_size_hint: None,
486 state: CompressState {
487 matcher: MatchGeneratorDriver::new(1024 * 128, 1),
488 last_huff_table: None,
489 fse_tables: FseTables::new(),
490 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
491 offset_hist: [1, 4, 8],
492 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
493 compression_level,
494 ),
495 },
496 magicless: false,
497 #[cfg(feature = "hash")]
498 hasher: XxHash64::with_seed(0),
499 #[cfg(feature = "lsm")]
500 frame_emit_info: None,
501 #[cfg(all(feature = "lsm", feature = "hash"))]
502 per_block_checksums_enabled: false,
503 #[cfg(all(feature = "lsm", feature = "hash"))]
504 block_checksums: None,
505 }
506 }
507}
508
509impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
510 pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
512 Self {
513 uncompressed_data: None,
514 compressed_data: None,
515 dictionary: None,
516 dictionary_entropy_cache: None,
517 source_size_hint: None,
518 state: CompressState {
519 matcher,
520 last_huff_table: None,
521 fse_tables: FseTables::new(),
522 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
523 offset_hist: [1, 4, 8],
524 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
525 compression_level,
526 ),
527 },
528 compression_level,
529 magicless: false,
530 #[cfg(feature = "hash")]
531 hasher: XxHash64::with_seed(0),
532 #[cfg(feature = "lsm")]
533 frame_emit_info: None,
534 #[cfg(all(feature = "lsm", feature = "hash"))]
535 per_block_checksums_enabled: false,
536 #[cfg(all(feature = "lsm", feature = "hash"))]
537 block_checksums: None,
538 }
539 }
540
541 pub fn set_magicless(&mut self, magicless: bool) {
548 self.magicless = magicless;
549 }
550
551 pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
555 self.uncompressed_data.replace(uncompressed_data)
556 }
557
558 pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
562 self.compressed_data.replace(compressed_data)
563 }
564
565 pub fn set_source_size_hint(&mut self, size: u64) {
575 self.source_size_hint = Some(size);
576 }
577
578 pub fn compress(&mut self) {
589 #[cfg(feature = "lsm")]
592 {
593 self.frame_emit_info = None;
594 }
595 #[cfg(all(feature = "lsm", feature = "hash"))]
596 {
597 if self.per_block_checksums_enabled {
598 self.block_checksums = Some(alloc::vec::Vec::new());
599 } else {
600 self.block_checksums = None;
601 }
602 }
603 let initial_size_hint = self.source_size_hint;
604 let source_size_hint_known = initial_size_hint.is_some();
605 let use_dictionary_state =
606 !matches!(self.compression_level, CompressionLevel::Uncompressed)
607 && self.state.matcher.supports_dictionary_priming()
608 && self.dictionary.is_some();
609 if let Some(size_hint) = self.source_size_hint.take() {
610 self.state.matcher.set_source_size_hint(size_hint);
613 }
614 self.state.matcher.reset(self.compression_level);
616 self.state.offset_hist = [1, 4, 8];
617 self.state.strategy_tag =
624 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level);
625 let cached_entropy = if use_dictionary_state {
626 self.dictionary_entropy_cache.as_ref()
627 } else {
628 None
629 };
630 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
631 self.state.offset_hist = dict.offset_hist;
634 self.state
635 .matcher
636 .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
637 }
638 if let Some(cache) = cached_entropy {
639 self.state.last_huff_table.clone_from(&cache.huff);
640 } else {
641 self.state.last_huff_table = None;
642 }
643 if let Some(cache) = cached_entropy {
646 self.state
647 .fse_tables
648 .ll_previous
649 .clone_from(&cache.ll_previous);
650 self.state
651 .fse_tables
652 .ml_previous
653 .clone_from(&cache.ml_previous);
654 self.state
655 .fse_tables
656 .of_previous
657 .clone_from(&cache.of_previous);
658 } else {
659 self.state.fse_tables.ll_previous = None;
660 self.state.fse_tables.ml_previous = None;
661 self.state.fse_tables.of_previous = None;
662 }
663 let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
664 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
665 _ => None,
666 });
667 let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
668 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
669 _ => None,
670 });
671 let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
672 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
673 _ => None,
674 });
675 self.state.matcher.seed_dictionary_entropy(
676 self.state.last_huff_table.as_ref(),
677 ll_entropy,
678 ml_entropy,
679 of_entropy,
680 );
681 #[cfg(feature = "hash")]
682 {
683 self.hasher = XxHash64::with_seed(0);
684 }
685 let source = self.uncompressed_data.as_mut().unwrap();
686 let drain = self.compressed_data.as_mut().unwrap();
687 let window_size = self.state.matcher.window_size();
688 assert!(
689 window_size != 0,
690 "matcher reported window_size == 0, which is invalid"
691 );
692 const ALL_BLOCKS_TINY_THRESHOLD: u64 = 4 * 1024;
712 const ALL_BLOCKS_SMALL_THRESHOLD: u64 = 64 * 1024;
713 const ALL_BLOCKS_TINY_CAP: usize = 4 * 1024;
714 const ALL_BLOCKS_SMALL_CAP: usize = 16 * 1024;
715 const ALL_BLOCKS_DEFAULT_CAP: usize = 130 * 1024;
716 let initial_all_blocks_cap = match initial_size_hint {
717 Some(h) if h <= ALL_BLOCKS_TINY_THRESHOLD => ALL_BLOCKS_TINY_CAP,
718 Some(h) if h <= ALL_BLOCKS_SMALL_THRESHOLD => ALL_BLOCKS_SMALL_CAP,
719 _ => ALL_BLOCKS_DEFAULT_CAP,
720 };
721 let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap);
722 let mut total_uncompressed: u64 = 0;
723 let mut pending_input: Vec<u8> = Vec::new();
724 let mut reached_eof = false;
725 let mut savings = 0i64;
726 loop {
728 let block_capacity = MAX_BLOCK_SIZE as usize;
732 let had_pending = !pending_input.is_empty();
733 let mut uncompressed_data = if had_pending {
734 core::mem::take(&mut pending_input)
735 } else {
736 self.state.matcher.get_next_space()
737 };
738 let mut filled = if had_pending {
739 uncompressed_data.len()
740 } else {
741 0
742 };
743 if uncompressed_data.len() < block_capacity {
744 uncompressed_data.resize(block_capacity, 0);
745 }
746 'read_loop: loop {
747 if reached_eof || filled == block_capacity {
748 break 'read_loop;
749 }
750 let new_bytes = source
751 .read(&mut uncompressed_data[filled..block_capacity])
752 .unwrap();
753 if new_bytes == 0 {
754 reached_eof = true;
755 break 'read_loop;
756 }
757 filled += new_bytes;
758 total_uncompressed += new_bytes as u64;
759 }
760 uncompressed_data.truncate(filled);
761 let mut last_block = reached_eof;
762 let remaining_for_split = if reached_eof {
763 uncompressed_data.len()
764 } else {
765 block_capacity
766 };
767 if !matches!(self.compression_level, CompressionLevel::Uncompressed)
768 && uncompressed_data.len() == block_capacity
769 {
770 let block_len = donor_optimal_block_size(
771 self.compression_level,
772 &uncompressed_data,
773 remaining_for_split,
774 block_capacity,
775 savings,
776 );
777 if block_len < uncompressed_data.len() {
778 pending_input = uncompressed_data.split_off(block_len);
779 if pending_input.capacity() < block_capacity {
786 pending_input.reserve_exact(block_capacity - pending_input.len());
787 }
788 last_block = false;
789 }
790 }
791 #[cfg(feature = "hash")]
793 self.hasher.write(&uncompressed_data);
794 if uncompressed_data.is_empty() {
803 let header = BlockHeader {
804 last_block: true,
805 block_type: crate::blocks::block::BlockType::Raw,
806 block_size: 0,
807 };
808 header.serialize(&mut all_blocks);
809 #[cfg(all(feature = "lsm", feature = "hash"))]
810 if let Some(checksums) = self.block_checksums.as_mut() {
811 checksums.push(xxh64_block_low32(&[]));
812 }
813 break;
814 }
815
816 match self.compression_level {
817 CompressionLevel::Uncompressed => {
818 let header = BlockHeader {
819 last_block,
820 block_type: crate::blocks::block::BlockType::Raw,
821 block_size: uncompressed_data.len().try_into().unwrap(),
822 };
823 header.serialize(&mut all_blocks);
824 #[cfg(all(feature = "lsm", feature = "hash"))]
825 if let Some(checksums) = self.block_checksums.as_mut() {
826 checksums.push(xxh64_block_low32(&uncompressed_data));
827 }
828 all_blocks.extend_from_slice(&uncompressed_data);
829 savings +=
830 uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
831 }
832 CompressionLevel::Fastest
833 | CompressionLevel::Default
834 | CompressionLevel::Better
835 | CompressionLevel::Best
836 | CompressionLevel::Level(_) => {
837 let before_len = all_blocks.len();
838 let block_len = uncompressed_data.len();
839 compress_block_encoded(
840 &mut self.state,
841 self.compression_level,
842 last_block,
843 uncompressed_data,
844 &mut all_blocks,
845 #[cfg(all(feature = "lsm", feature = "hash"))]
846 self.block_checksums.as_mut(),
847 );
848 savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
849 }
850 }
851 if last_block && pending_input.is_empty() {
852 break;
853 }
854 }
855
856 let single_segment = !use_dictionary_state
860 && source_size_hint_known
861 && total_uncompressed >= 512
862 && total_uncompressed <= window_size;
863 let header = FrameHeader {
864 frame_content_size: Some(total_uncompressed),
865 single_segment,
866 content_checksum: cfg!(feature = "hash"),
867 dictionary_id: if use_dictionary_state {
868 self.dictionary.as_ref().map(|dict| dict.id as u64)
869 } else {
870 None
871 },
872 window_size: if single_segment {
873 None
874 } else {
875 Some(window_size)
876 },
877 magicless: self.magicless,
878 };
879 let mut header_buf: Vec<u8> = Vec::with_capacity(14);
882 header.serialize(&mut header_buf);
883 drain.write_all(&header_buf).unwrap();
884 drain.write_all(&all_blocks).unwrap();
885
886 #[cfg(feature = "hash")]
889 {
890 let content_checksum = self.hasher.finish();
893 drain
894 .write_all(&(content_checksum as u32).to_le_bytes())
895 .unwrap();
896 }
897
898 #[cfg(feature = "lsm")]
905 {
906 use crate::blocks::block::BlockType as BT;
907 use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
908 let frame_header_len: u32 = match u32::try_from(header_buf.len()) {
920 Ok(v) => v,
921 Err(_) => return,
922 };
923 let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
924 Ok(v) => v,
925 Err(_) => return,
926 };
927 let mut blocks: Vec<FrameBlock> = Vec::new();
928 let mut cursor: usize = 0;
929 while cursor + 3 <= all_blocks.len() {
930 let mut header_u32 = [0u8; 4];
931 header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
932 let raw = u32::from_le_bytes(header_u32);
933 let last_block = (raw & 1) != 0;
934 let block_type = match (raw >> 1) & 0b11 {
935 0 => BT::Raw,
936 1 => BT::RLE,
937 2 => BT::Compressed,
938 _ => BT::Reserved,
939 };
940 let block_size_field = raw >> 3;
941 let physical_body: u32 = match block_type {
950 BT::RLE => 1,
951 _ => block_size_field,
952 };
953 let cursor_u32: u32 = match u32::try_from(cursor) {
954 Ok(v) => v,
955 Err(_) => return,
956 };
957 let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
958 Some(v) => v,
959 None => return,
960 };
961 blocks.push(FrameBlock {
962 offset_in_frame,
963 header_size: 3,
964 body_size: physical_body,
965 block_size_field,
966 block_type,
967 last_block,
968 });
969 cursor += 3 + physical_body as usize;
970 if last_block {
971 break;
972 }
973 }
974 let checksum_range = if cfg!(feature = "hash") {
975 let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
976 Some(v) => v,
977 None => return,
978 };
979 let cs_end = match cs_start.checked_add(4) {
980 Some(v) => v,
981 None => return,
982 };
983 Some(cs_start..cs_end)
984 } else {
985 None
986 };
987 let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
988 Some(v) => v,
989 None => return,
990 };
991 let total_size = if checksum_range.is_some() {
992 match body_total.checked_add(4) {
993 Some(v) => v,
994 None => return,
995 }
996 } else {
997 body_total
998 };
999 self.frame_emit_info = Some(FrameEmitInfo {
1000 frame_header_range: 0..frame_header_len,
1001 blocks,
1002 checksum_range,
1003 total_size,
1004 });
1005 }
1006 }
1007
1008 #[cfg(feature = "lsm")]
1019 pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
1020 self.frame_emit_info.as_ref()
1021 }
1022
1023 #[cfg(all(feature = "lsm", feature = "hash"))]
1042 pub fn enable_per_block_checksums(&mut self) {
1043 self.per_block_checksums_enabled = true;
1044 }
1045
1046 #[cfg(all(feature = "lsm", feature = "hash"))]
1053 pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
1054 self.block_checksums.as_deref()
1055 }
1056
1057 pub fn source_mut(&mut self) -> Option<&mut R> {
1059 self.uncompressed_data.as_mut()
1060 }
1061
1062 pub fn drain_mut(&mut self) -> Option<&mut W> {
1064 self.compressed_data.as_mut()
1065 }
1066
1067 pub fn source(&self) -> Option<&R> {
1069 self.uncompressed_data.as_ref()
1070 }
1071
1072 pub fn drain(&self) -> Option<&W> {
1074 self.compressed_data.as_ref()
1075 }
1076
1077 pub fn take_source(&mut self) -> Option<R> {
1079 self.uncompressed_data.take()
1080 }
1081
1082 pub fn take_drain(&mut self) -> Option<W> {
1084 self.compressed_data.take()
1085 }
1086
1087 pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
1089 core::mem::swap(&mut match_generator, &mut self.state.matcher);
1090 match_generator
1091 }
1092
1093 pub fn set_compression_level(
1095 &mut self,
1096 compression_level: CompressionLevel,
1097 ) -> CompressionLevel {
1098 let old = self.compression_level;
1099 self.compression_level = compression_level;
1100 old
1101 }
1102
1103 pub fn compression_level(&self) -> CompressionLevel {
1105 self.compression_level
1106 }
1107
1108 pub fn set_dictionary(
1115 &mut self,
1116 dictionary: crate::decoding::Dictionary,
1117 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
1118 {
1119 if dictionary.id == 0 {
1120 return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
1121 }
1122 if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
1123 return Err(
1124 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1125 index: index as u8,
1126 },
1127 );
1128 }
1129 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
1130 huff: dictionary.huf.table.to_encoder_table(),
1131 ll_previous: dictionary
1132 .fse
1133 .literal_lengths
1134 .to_encoder_table()
1135 .map(|table| PreviousFseTable::Custom(Box::new(table))),
1136 ml_previous: dictionary
1137 .fse
1138 .match_lengths
1139 .to_encoder_table()
1140 .map(|table| PreviousFseTable::Custom(Box::new(table))),
1141 of_previous: dictionary
1142 .fse
1143 .offsets
1144 .to_encoder_table()
1145 .map(|table| PreviousFseTable::Custom(Box::new(table))),
1146 });
1147 Ok(self.dictionary.replace(dictionary))
1148 }
1149
1150 pub fn set_dictionary_from_bytes(
1152 &mut self,
1153 raw_dictionary: &[u8],
1154 ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
1155 {
1156 let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
1157 self.set_dictionary(dictionary)
1158 }
1159
1160 pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
1162 self.dictionary_entropy_cache = None;
1163 self.dictionary.take()
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 #[cfg(all(feature = "dict_builder", feature = "std"))]
1170 use alloc::format;
1171 use alloc::vec;
1172
1173 use super::FrameCompressor;
1174 use crate::blocks::block::BlockType;
1175 use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
1176 use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
1177 use crate::encoding::{Matcher, Sequence};
1178 use alloc::vec::Vec;
1179
1180 fn generate_data(seed: u64, len: usize) -> Vec<u8> {
1181 let mut state = seed;
1182 let mut data = Vec::with_capacity(len);
1183 for _ in 0..len {
1184 state = state
1185 .wrapping_mul(6364136223846793005)
1186 .wrapping_add(1442695040888963407);
1187 data.push((state >> 33) as u8);
1188 }
1189 data
1190 }
1191
1192 fn first_block_type(frame: &[u8]) -> BlockType {
1193 let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
1194 let mut decoder = block_decoder::new();
1195 let (header, _) = decoder
1196 .read_block_header(&frame[header_size as usize..])
1197 .expect("block header should parse");
1198 header.block_type
1199 }
1200
1201 #[cfg(feature = "std")]
1203 #[test]
1204 fn fcs_header_written_and_c_zstd_compatible() {
1205 let levels = [
1206 crate::encoding::CompressionLevel::Uncompressed,
1207 crate::encoding::CompressionLevel::Fastest,
1208 crate::encoding::CompressionLevel::Default,
1209 crate::encoding::CompressionLevel::Better,
1210 crate::encoding::CompressionLevel::Best,
1211 ];
1212 let fcs_2byte = vec![0xCDu8; 300]; let large = vec![0xABu8; 100_000];
1214 let inputs: [&[u8]; 5] = [
1215 &[],
1216 &[0x00],
1217 b"abcdefghijklmnopqrstuvwxy\n",
1218 &fcs_2byte,
1219 &large,
1220 ];
1221 for level in levels {
1222 for data in &inputs {
1223 let compressed = crate::encoding::compress_to_vec(*data, level);
1224 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1226 .unwrap()
1227 .0;
1228 assert_eq!(
1229 header.frame_content_size(),
1230 data.len() as u64,
1231 "FCS mismatch for len={} level={:?}",
1232 data.len(),
1233 level,
1234 );
1235 assert_ne!(
1238 header.descriptor.frame_content_size_bytes().unwrap(),
1239 0,
1240 "FCS field must be present for len={} level={:?}",
1241 data.len(),
1242 level,
1243 );
1244 let mut decoded = Vec::new();
1246 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1247 |e| {
1248 panic!(
1249 "C zstd decode failed for len={} level={level:?}: {e}",
1250 data.len()
1251 )
1252 },
1253 );
1254 assert_eq!(
1255 decoded.as_slice(),
1256 *data,
1257 "C zstd roundtrip failed for len={}",
1258 data.len()
1259 );
1260 }
1261 }
1262 }
1263
1264 #[cfg(feature = "std")]
1265 #[test]
1266 fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
1267 let data = vec![0xAB; 2047];
1268 let compressed = {
1269 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1270 compressor.set_source_size_hint(data.len() as u64);
1271 compressor.set_source(data.as_slice());
1272 let mut out = Vec::new();
1273 compressor.set_drain(&mut out);
1274 compressor.compress();
1275 out
1276 };
1277
1278 let mut decoded = Vec::new();
1279 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1280 assert_eq!(decoded, data);
1281 }
1282
1283 #[cfg(feature = "std")]
1284 #[test]
1285 fn small_hinted_default_frame_uses_single_segment_header() {
1286 let data = generate_data(0xD15E_A5ED, 1024);
1287 let compressed = {
1288 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
1289 compressor.set_source_size_hint(data.len() as u64);
1290 compressor.set_source(data.as_slice());
1291 let mut out = Vec::new();
1292 compressor.set_drain(&mut out);
1293 compressor.compress();
1294 out
1295 };
1296
1297 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1298 assert!(
1299 frame_header.descriptor.single_segment_flag(),
1300 "small hinted default frames should use single-segment header for Rust/FFI parity"
1301 );
1302 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1303 let mut decoded = Vec::new();
1304 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1305 .expect("ffi decoder must accept single-segment small hinted default frame");
1306 assert_eq!(decoded, data);
1307 }
1308
1309 #[cfg(feature = "std")]
1310 #[test]
1311 fn small_hinted_numeric_default_levels_use_single_segment_header() {
1312 let data = generate_data(0xA11C_E003, 1024);
1313 for level in [
1314 super::CompressionLevel::Level(0),
1315 super::CompressionLevel::Level(3),
1316 ] {
1317 let compressed = {
1318 let mut compressor = FrameCompressor::new(level);
1319 compressor.set_source_size_hint(data.len() as u64);
1320 compressor.set_source(data.as_slice());
1321 let mut out = Vec::new();
1322 compressor.set_drain(&mut out);
1323 compressor.compress();
1324 out
1325 };
1326
1327 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1328 assert!(
1329 frame_header.descriptor.single_segment_flag(),
1330 "small hinted numeric default level frames should use single-segment header (level={level:?})"
1331 );
1332 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1333 let mut decoded = Vec::new();
1334 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1335 panic!(
1336 "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
1337 )
1338 });
1339 assert_eq!(decoded, data);
1340 }
1341 }
1342
1343 #[cfg(feature = "std")]
1344 #[test]
1345 fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
1346 let levels = [
1347 super::CompressionLevel::Fastest,
1348 super::CompressionLevel::Default,
1349 super::CompressionLevel::Better,
1350 super::CompressionLevel::Best,
1351 super::CompressionLevel::Level(-1),
1352 super::CompressionLevel::Level(2),
1353 super::CompressionLevel::Level(3),
1354 super::CompressionLevel::Level(4),
1355 super::CompressionLevel::Level(11),
1356 ];
1357 let sizes = [
1358 511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
1359 ];
1360
1361 for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
1362 for &size in &sizes {
1363 let data = generate_data(seed + seed_idx as u64, size);
1364 for &level in &levels {
1365 let compressed = {
1366 let mut compressor = FrameCompressor::new(level);
1367 compressor.set_source_size_hint(data.len() as u64);
1368 compressor.set_source(data.as_slice());
1369 let mut out = Vec::new();
1370 compressor.set_drain(&mut out);
1371 compressor.compress();
1372 out
1373 };
1374 if matches!(size, 511 | 512) {
1375 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1376 assert_eq!(
1377 frame_header.descriptor.single_segment_flag(),
1378 size == 512,
1379 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1380 );
1381 }
1382
1383 let mut decoded = Vec::new();
1384 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1385 |e| {
1386 panic!(
1387 "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
1388 seed + seed_idx as u64
1389 )
1390 },
1391 );
1392 assert_eq!(
1393 decoded,
1394 data,
1395 "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
1396 seed + seed_idx as u64
1397 );
1398 }
1399 }
1400 }
1401 }
1402
1403 #[cfg(feature = "std")]
1404 #[test]
1405 fn hinted_levels_use_single_segment_header_symmetrically() {
1406 let levels = [
1407 super::CompressionLevel::Fastest,
1408 super::CompressionLevel::Default,
1409 super::CompressionLevel::Better,
1410 super::CompressionLevel::Best,
1411 super::CompressionLevel::Level(0),
1412 super::CompressionLevel::Level(2),
1413 super::CompressionLevel::Level(3),
1414 super::CompressionLevel::Level(4),
1415 super::CompressionLevel::Level(11),
1416 ];
1417 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1418 let size = 1024 + seed_idx * 97;
1419 let data = generate_data(seed, size);
1420 for &level in &levels {
1421 let compressed = {
1422 let mut compressor = FrameCompressor::new(level);
1423 compressor.set_source_size_hint(data.len() as u64);
1424 compressor.set_source(data.as_slice());
1425 let mut out = Vec::new();
1426 compressor.set_drain(&mut out);
1427 compressor.compress();
1428 out
1429 };
1430 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1431 assert!(
1432 frame_header.descriptor.single_segment_flag(),
1433 "hinted frame should be single-segment for level={level:?} size={}",
1434 data.len()
1435 );
1436 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1437 let mut decoded = Vec::new();
1438 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1439 panic!(
1440 "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
1441 data.len()
1442 )
1443 });
1444 assert_eq!(decoded, data);
1445 }
1446 }
1447 }
1448
1449 #[cfg(feature = "std")]
1450 #[test]
1451 fn hinted_levels_pin_511_512_single_segment_boundary() {
1452 let levels = [
1453 super::CompressionLevel::Fastest,
1454 super::CompressionLevel::Default,
1455 super::CompressionLevel::Better,
1456 super::CompressionLevel::Best,
1457 super::CompressionLevel::Level(0),
1458 super::CompressionLevel::Level(2),
1459 super::CompressionLevel::Level(3),
1460 super::CompressionLevel::Level(4),
1461 super::CompressionLevel::Level(11),
1462 ];
1463 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1464 for &size in &[511usize, 512] {
1465 let data = generate_data(seed + seed_idx as u64, size);
1466 for &level in &levels {
1467 let compressed = {
1468 let mut compressor = FrameCompressor::new(level);
1469 compressor.set_source_size_hint(data.len() as u64);
1470 compressor.set_source(data.as_slice());
1471 let mut out = Vec::new();
1472 compressor.set_drain(&mut out);
1473 compressor.compress();
1474 out
1475 };
1476 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1477 assert_eq!(
1478 frame_header.descriptor.single_segment_flag(),
1479 size == 512,
1480 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1481 );
1482 let mut decoded = Vec::new();
1483 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1484 |e| {
1485 panic!(
1486 "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
1487 seed + seed_idx as u64
1488 )
1489 },
1490 );
1491 assert_eq!(decoded, data);
1492 }
1493 }
1494 }
1495 }
1496
1497 #[cfg(feature = "std")]
1498 #[test]
1499 fn fastest_random_block_uses_raw_fast_path() {
1500 let data = generate_data(0xC0FF_EE11, 10 * 1024);
1501 let compressed =
1502 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
1503
1504 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1505
1506 let mut decoded = Vec::new();
1507 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1508 assert_eq!(decoded, data);
1509 }
1510
1511 #[cfg(feature = "std")]
1512 #[test]
1513 fn default_random_block_uses_raw_fast_path() {
1514 let data = generate_data(0xD15E_A5ED, 10 * 1024);
1515 let compressed =
1516 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
1517
1518 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1519
1520 let mut decoded = Vec::new();
1521 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1522 assert_eq!(decoded, data);
1523 }
1524
1525 #[cfg(feature = "std")]
1526 #[test]
1527 fn best_random_block_uses_raw_fast_path() {
1528 let data = generate_data(0xB35C_AFE1, 10 * 1024);
1529 let compressed =
1530 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
1531
1532 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1533
1534 let mut decoded = Vec::new();
1535 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1536 assert_eq!(decoded, data);
1537 }
1538
1539 #[cfg(feature = "std")]
1540 #[test]
1541 fn level2_random_block_uses_raw_fast_path() {
1542 let data = generate_data(0xA11C_E222, 10 * 1024);
1543 let compressed =
1544 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
1545
1546 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1547
1548 let mut decoded = Vec::new();
1549 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1550 assert_eq!(decoded, data);
1551 }
1552
1553 #[cfg(feature = "std")]
1554 #[test]
1555 fn better_random_block_uses_raw_fast_path() {
1556 let data = generate_data(0xBE77_E111, 10 * 1024);
1557 let compressed =
1558 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
1559
1560 assert_eq!(first_block_type(&compressed), BlockType::Raw);
1561
1562 let mut decoded = Vec::new();
1563 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1564 assert_eq!(decoded, data);
1565 }
1566
1567 #[cfg(feature = "std")]
1568 #[test]
1569 fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
1570 let mut data = Vec::with_capacity(16 * 1024);
1571 const LINE: &[u8] =
1572 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1573 while data.len() < 16 * 1024 {
1574 let remaining = 16 * 1024 - data.len();
1575 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1576 }
1577
1578 fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
1579 let compressed = crate::encoding::compress_to_vec(data, level);
1580 assert_ne!(first_block_type(&compressed), BlockType::Raw);
1581 assert!(
1582 compressed.len() < data.len(),
1583 "compressible input should remain compressible for level={level:?}"
1584 );
1585 let mut decoded = Vec::new();
1586 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1587 assert_eq!(decoded, data);
1588 }
1589
1590 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
1591 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
1592 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
1593 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
1594 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
1595 }
1596
1597 #[cfg(feature = "std")]
1598 #[test]
1599 fn hinted_small_compressible_frames_use_single_segment_across_levels() {
1600 let mut data = Vec::with_capacity(4 * 1024);
1601 const LINE: &[u8] =
1602 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1603 while data.len() < 4 * 1024 {
1604 let remaining = 4 * 1024 - data.len();
1605 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1606 }
1607
1608 for level in [
1609 super::CompressionLevel::Fastest,
1610 super::CompressionLevel::Default,
1611 super::CompressionLevel::Better,
1612 super::CompressionLevel::Best,
1613 super::CompressionLevel::Level(0),
1614 super::CompressionLevel::Level(3),
1615 super::CompressionLevel::Level(4),
1616 super::CompressionLevel::Level(11),
1617 ] {
1618 let compressed = {
1619 let mut compressor = FrameCompressor::new(level);
1620 compressor.set_source_size_hint(data.len() as u64);
1621 compressor.set_source(data.as_slice());
1622 let mut out = Vec::new();
1623 compressor.set_drain(&mut out);
1624 compressor.compress();
1625 out
1626 };
1627 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1628 assert!(
1629 frame_header.descriptor.single_segment_flag(),
1630 "hinted small compressible frame should use single-segment (level={level:?})"
1631 );
1632 assert_ne!(
1633 first_block_type(&compressed),
1634 BlockType::Raw,
1635 "compressible hinted frame should stay off raw fast path (level={level:?})"
1636 );
1637 assert!(
1638 compressed.len() < data.len(),
1639 "compressible hinted frame should still shrink (level={level:?})"
1640 );
1641 let mut decoded = Vec::new();
1642 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1643 .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
1644 assert_eq!(decoded, data);
1645 }
1646 }
1647
1648 struct NoDictionaryMatcher {
1649 last_space: Vec<u8>,
1650 window_size: u64,
1651 }
1652
1653 impl NoDictionaryMatcher {
1654 fn new(window_size: u64) -> Self {
1655 Self {
1656 last_space: Vec::new(),
1657 window_size,
1658 }
1659 }
1660 }
1661
1662 impl Matcher for NoDictionaryMatcher {
1663 fn get_next_space(&mut self) -> Vec<u8> {
1664 vec![0; self.window_size as usize]
1665 }
1666
1667 fn get_last_space(&mut self) -> &[u8] {
1668 self.last_space.as_slice()
1669 }
1670
1671 fn commit_space(&mut self, space: Vec<u8>) {
1672 self.last_space = space;
1673 }
1674
1675 fn skip_matching(&mut self) {}
1676
1677 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1678 handle_sequence(Sequence::Literals {
1679 literals: self.last_space.as_slice(),
1680 });
1681 }
1682
1683 fn reset(&mut self, _level: super::CompressionLevel) {
1684 self.last_space.clear();
1685 }
1686
1687 fn window_size(&self) -> u64 {
1688 self.window_size
1689 }
1690 }
1691
1692 #[test]
1693 fn frame_starts_with_magic_num() {
1694 let mock_data = [1_u8, 2, 3].as_slice();
1695 let mut output: Vec<u8> = Vec::new();
1696 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1697 compressor.set_source(mock_data);
1698 compressor.set_drain(&mut output);
1699
1700 compressor.compress();
1701 assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1702 }
1703
1704 #[test]
1705 fn very_simple_raw_compress() {
1706 let mock_data = [1_u8, 2, 3].as_slice();
1707 let mut output: Vec<u8> = Vec::new();
1708 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1709 compressor.set_source(mock_data);
1710 compressor.set_drain(&mut output);
1711
1712 compressor.compress();
1713 }
1714
1715 #[test]
1716 fn very_simple_compress() {
1717 let mut mock_data = vec![0; 1 << 17];
1718 mock_data.extend(vec![1; (1 << 17) - 1]);
1719 mock_data.extend(vec![2; (1 << 18) - 1]);
1720 mock_data.extend(vec![2; 1 << 17]);
1721 mock_data.extend(vec![3; (1 << 17) - 1]);
1722 let mut output: Vec<u8> = Vec::new();
1723 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1724 compressor.set_source(mock_data.as_slice());
1725 compressor.set_drain(&mut output);
1726
1727 compressor.compress();
1728
1729 let mut decoder = FrameDecoder::new();
1730 let mut decoded = Vec::with_capacity(mock_data.len());
1731 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1732 assert_eq!(mock_data, decoded);
1733
1734 let mut decoded = Vec::new();
1735 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1736 assert_eq!(mock_data, decoded);
1737 }
1738
1739 #[test]
1740 fn rle_compress() {
1741 let mock_data = vec![0; 1 << 19];
1742 let mut output: Vec<u8> = Vec::new();
1743 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1744 compressor.set_source(mock_data.as_slice());
1745 compressor.set_drain(&mut output);
1746
1747 compressor.compress();
1748
1749 let mut decoder = FrameDecoder::new();
1750 let mut decoded = Vec::with_capacity(mock_data.len());
1751 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1752 assert_eq!(mock_data, decoded);
1753 }
1754
1755 #[test]
1756 fn aaa_compress() {
1757 let mock_data = vec![0, 1, 3, 4, 5];
1758 let mut output: Vec<u8> = Vec::new();
1759 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1760 compressor.set_source(mock_data.as_slice());
1761 compressor.set_drain(&mut output);
1762
1763 compressor.compress();
1764
1765 let mut decoder = FrameDecoder::new();
1766 let mut decoded = Vec::with_capacity(mock_data.len());
1767 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1768 assert_eq!(mock_data, decoded);
1769
1770 let mut decoded = Vec::new();
1771 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1772 assert_eq!(mock_data, decoded);
1773 }
1774
1775 #[test]
1776 fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1777 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1778 let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1779 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1780
1781 let mut data = Vec::new();
1782 for _ in 0..8 {
1783 data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1784 }
1785
1786 let mut with_dict = Vec::new();
1787 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1788 let previous = compressor
1789 .set_dictionary_from_bytes(dict_raw)
1790 .expect("dictionary bytes should parse");
1791 assert!(
1792 previous.is_none(),
1793 "first dictionary insert should return None"
1794 );
1795 assert_eq!(
1796 compressor
1797 .set_dictionary(dict_for_encoder)
1798 .expect("valid dictionary should attach")
1799 .expect("set_dictionary_from_bytes inserted previous dictionary")
1800 .id,
1801 dict_for_decoder.id
1802 );
1803 compressor.set_source(data.as_slice());
1804 compressor.set_drain(&mut with_dict);
1805 compressor.compress();
1806
1807 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1808 .expect("encoded stream should have a frame header");
1809 assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1810
1811 let mut decoder = FrameDecoder::new();
1812 let mut missing_dict_target = Vec::with_capacity(data.len());
1813 let err = decoder
1814 .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1815 .unwrap_err();
1816 assert!(
1817 matches!(
1818 &err,
1819 crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1820 ),
1821 "dict-compressed stream should require dictionary id, got: {err:?}"
1822 );
1823
1824 let mut decoder = FrameDecoder::new();
1825 decoder.add_dict(dict_for_decoder).unwrap();
1826 let mut decoded = Vec::with_capacity(data.len());
1827 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1828 assert_eq!(decoded, data);
1829
1830 let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1831 let mut ffi_decoded = Vec::with_capacity(data.len());
1832 let ffi_written = ffi_decoder
1833 .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1834 .unwrap();
1835 assert_eq!(ffi_written, data.len());
1836 assert_eq!(ffi_decoded, data);
1837 }
1838
1839 #[cfg(all(feature = "dict_builder", feature = "std"))]
1840 #[test]
1841 fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1842 use std::io::Cursor;
1843
1844 let mut training = Vec::new();
1845 for idx in 0..256u32 {
1846 training.extend_from_slice(
1847 format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1848 );
1849 }
1850 let mut raw_dict = Vec::new();
1851 crate::dictionary::create_raw_dict_from_source(
1852 Cursor::new(training.as_slice()),
1853 training.len(),
1854 &mut raw_dict,
1855 4096,
1856 )
1857 .expect("dict_builder training should succeed");
1858 assert!(
1859 !raw_dict.is_empty(),
1860 "dict_builder produced an empty dictionary"
1861 );
1862
1863 let dict_id = 0xD1C7_0008;
1864 let encoder_dict =
1865 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1866 let decoder_dict =
1867 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1868
1869 let mut payload = Vec::new();
1870 for idx in 0..96u32 {
1871 payload.extend_from_slice(
1872 format!(
1873 "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1874 )
1875 .as_bytes(),
1876 );
1877 }
1878
1879 let mut without_dict = Vec::new();
1880 let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1881 baseline.set_source(payload.as_slice());
1882 baseline.set_drain(&mut without_dict);
1883 baseline.compress();
1884
1885 let mut with_dict = Vec::new();
1886 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1887 compressor
1888 .set_dictionary(encoder_dict)
1889 .expect("valid dict_builder dictionary should attach");
1890 compressor.set_source(payload.as_slice());
1891 compressor.set_drain(&mut with_dict);
1892 compressor.compress();
1893
1894 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1895 .expect("encoded stream should have a frame header");
1896 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1897 let mut decoder = FrameDecoder::new();
1898 decoder.add_dict(decoder_dict).unwrap();
1899 let mut decoded = Vec::with_capacity(payload.len());
1900 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1901 assert_eq!(decoded, payload);
1902 assert!(
1903 with_dict.len() < without_dict.len(),
1904 "trained dictionary should improve compression for this small payload"
1905 );
1906 }
1907
1908 #[test]
1909 fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1910 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1911 let mut output = Vec::new();
1912 let input = b"";
1913
1914 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1915 let previous = compressor
1916 .set_dictionary_from_bytes(dict_raw)
1917 .expect("dictionary bytes should parse");
1918 assert!(previous.is_none());
1919
1920 compressor.set_source(input.as_slice());
1921 compressor.set_drain(&mut output);
1922 compressor.compress();
1923
1924 assert!(
1925 compressor.state.last_huff_table.is_some(),
1926 "dictionary entropy should seed previous huffman table before first block"
1927 );
1928 assert!(
1929 compressor.state.fse_tables.ll_previous.is_some(),
1930 "dictionary entropy should seed previous ll table before first block"
1931 );
1932 assert!(
1933 compressor.state.fse_tables.ml_previous.is_some(),
1934 "dictionary entropy should seed previous ml table before first block"
1935 );
1936 assert!(
1937 compressor.state.fse_tables.of_previous.is_some(),
1938 "dictionary entropy should seed previous of table before first block"
1939 );
1940 }
1941
1942 #[test]
1943 fn set_dictionary_rejects_zero_dictionary_id() {
1944 let invalid = crate::decoding::Dictionary {
1945 id: 0,
1946 fse: crate::decoding::scratch::FSEScratch::new(),
1947 huf: crate::decoding::scratch::HuffmanScratch::new(),
1948 dict_content: vec![1, 2, 3],
1949 offset_hist: [1, 4, 8],
1950 };
1951
1952 let mut compressor: FrameCompressor<
1953 &[u8],
1954 Vec<u8>,
1955 crate::encoding::match_generator::MatchGeneratorDriver,
1956 > = FrameCompressor::new(super::CompressionLevel::Fastest);
1957 let result = compressor.set_dictionary(invalid);
1958 assert!(matches!(
1959 result,
1960 Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1961 ));
1962 }
1963
1964 #[test]
1965 fn set_dictionary_rejects_zero_repeat_offsets() {
1966 let invalid = crate::decoding::Dictionary {
1967 id: 1,
1968 fse: crate::decoding::scratch::FSEScratch::new(),
1969 huf: crate::decoding::scratch::HuffmanScratch::new(),
1970 dict_content: vec![1, 2, 3],
1971 offset_hist: [0, 4, 8],
1972 };
1973
1974 let mut compressor: FrameCompressor<
1975 &[u8],
1976 Vec<u8>,
1977 crate::encoding::match_generator::MatchGeneratorDriver,
1978 > = FrameCompressor::new(super::CompressionLevel::Fastest);
1979 let result = compressor.set_dictionary(invalid);
1980 assert!(matches!(
1981 result,
1982 Err(
1983 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1984 index: 0
1985 }
1986 )
1987 ));
1988 }
1989
1990 #[test]
1991 fn uncompressed_mode_does_not_require_dictionary() {
1992 let dict_id = 0xABCD_0001;
1993 let dict =
1994 crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1995 .expect("raw dictionary should be valid");
1996
1997 let payload = b"plain-bytes-that-should-stay-raw";
1998 let mut output = Vec::new();
1999 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2000 compressor
2001 .set_dictionary(dict)
2002 .expect("dictionary should attach in uncompressed mode");
2003 compressor.set_source(payload.as_slice());
2004 compressor.set_drain(&mut output);
2005 compressor.compress();
2006
2007 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2008 .expect("encoded frame should have a header");
2009 assert_eq!(
2010 frame_header.dictionary_id(),
2011 None,
2012 "raw/uncompressed frames must not advertise dictionary dependency"
2013 );
2014
2015 let mut decoder = FrameDecoder::new();
2016 let mut decoded = Vec::with_capacity(payload.len());
2017 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2018 assert_eq!(decoded, payload);
2019 }
2020
2021 #[test]
2022 fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
2023 use crate::encoding::match_generator::MatchGeneratorDriver;
2024
2025 let dict_id = 0xABCD_0002;
2026 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2027 .expect("raw dictionary should be valid");
2028 let dict_for_decoder =
2029 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2030 .expect("raw dictionary should be valid");
2031
2032 let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
2037 let matcher = MatchGeneratorDriver::new(1024, 1);
2038
2039 let mut no_dict_output = Vec::new();
2040 let mut no_dict_compressor =
2041 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2042 no_dict_compressor.set_source(payload.as_slice());
2043 no_dict_compressor.set_drain(&mut no_dict_output);
2044 no_dict_compressor.compress();
2045 let (no_dict_frame_header, _) =
2046 crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
2047 .expect("baseline frame should have a header");
2048 let no_dict_window = no_dict_frame_header
2049 .window_size()
2050 .expect("window size should be present");
2051
2052 let mut output = Vec::new();
2053 let matcher = MatchGeneratorDriver::new(1024, 1);
2054 let mut compressor =
2055 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2056 compressor
2057 .set_dictionary(dict)
2058 .expect("dictionary should attach");
2059 compressor.set_source(payload.as_slice());
2060 compressor.set_drain(&mut output);
2061 compressor.compress();
2062
2063 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2064 .expect("encoded frame should have a header");
2065 let advertised_window = frame_header
2066 .window_size()
2067 .expect("window size should be present");
2068 assert_eq!(
2069 advertised_window, no_dict_window,
2070 "dictionary priming must not inflate advertised window size"
2071 );
2072 assert!(
2073 payload.len() > advertised_window as usize,
2074 "test must cross the advertised window boundary"
2075 );
2076
2077 let mut decoder = FrameDecoder::new();
2078 decoder.add_dict(dict_for_decoder).unwrap();
2079 let mut decoded = Vec::with_capacity(payload.len());
2080 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2081 assert_eq!(decoded, payload);
2082 }
2083
2084 #[test]
2085 fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
2086 let dict_id = 0xABCD_0004;
2087 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
2089 let dict_for_decoder =
2090 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
2091 let payload = b"abcdabcdabcdabcd".repeat(128);
2092
2093 let mut hinted_output = Vec::new();
2094 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
2095 hinted.set_dictionary(dict).unwrap();
2096 hinted.set_source_size_hint(1);
2097 hinted.set_source(payload.as_slice());
2098 hinted.set_drain(&mut hinted_output);
2099 hinted.compress();
2100
2101 let mut no_hint_output = Vec::new();
2102 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
2103 no_hint
2104 .set_dictionary(
2105 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
2106 .unwrap(),
2107 )
2108 .unwrap();
2109 no_hint.set_source(payload.as_slice());
2110 no_hint.set_drain(&mut no_hint_output);
2111 no_hint.compress();
2112
2113 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
2114 .expect("encoded frame should have a header")
2115 .0
2116 .window_size()
2117 .expect("window size should be present");
2118 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
2119 .expect("encoded frame should have a header")
2120 .0
2121 .window_size()
2122 .expect("window size should be present");
2123 assert!(
2124 hinted_window <= no_hint_window,
2125 "source-size hint should not increase advertised window with dictionary priming",
2126 );
2127
2128 let mut decoder = FrameDecoder::new();
2129 decoder.add_dict(dict_for_decoder).unwrap();
2130 let mut decoded = Vec::with_capacity(payload.len());
2131 decoder
2132 .decode_all_to_vec(&hinted_output, &mut decoded)
2133 .unwrap();
2134 assert_eq!(decoded, payload);
2135 }
2136
2137 #[test]
2138 fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
2139 let dict_id = 0xABCD_0005;
2140 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
2142 let dict_for_decoder =
2143 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
2144 let payload = b"abcd".repeat(1024); let payload_len = payload.len() as u64;
2146
2147 let mut hinted_output = Vec::new();
2148 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
2149 hinted.set_dictionary(dict).unwrap();
2150 hinted.set_source_size_hint(payload_len);
2151 hinted.set_source(payload.as_slice());
2152 hinted.set_drain(&mut hinted_output);
2153 hinted.compress();
2154
2155 let mut no_hint_output = Vec::new();
2156 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
2157 no_hint
2158 .set_dictionary(
2159 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
2160 .unwrap(),
2161 )
2162 .unwrap();
2163 no_hint.set_source(payload.as_slice());
2164 no_hint.set_drain(&mut no_hint_output);
2165 no_hint.compress();
2166
2167 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
2168 .expect("encoded frame should have a header")
2169 .0
2170 .window_size()
2171 .expect("window size should be present");
2172 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
2173 .expect("encoded frame should have a header")
2174 .0
2175 .window_size()
2176 .expect("window size should be present");
2177 assert!(
2178 hinted_window <= no_hint_window,
2179 "source-size hint should not increase advertised window with dictionary priming",
2180 );
2181
2182 let mut decoder = FrameDecoder::new();
2183 decoder.add_dict(dict_for_decoder).unwrap();
2184 let mut decoded = Vec::with_capacity(payload.len());
2185 decoder
2186 .decode_all_to_vec(&hinted_output, &mut decoded)
2187 .unwrap();
2188 assert_eq!(decoded, payload);
2189 }
2190
2191 #[test]
2192 fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
2193 let dict_id = 0xABCD_0003;
2194 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2195 .expect("raw dictionary should be valid");
2196 let payload = b"abcdefghabcdefgh";
2197
2198 let mut output = Vec::new();
2199 let matcher = NoDictionaryMatcher::new(64);
2200 let mut compressor =
2201 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2202 compressor
2203 .set_dictionary(dict)
2204 .expect("dictionary should attach");
2205 compressor.set_source(payload.as_slice());
2206 compressor.set_drain(&mut output);
2207 compressor.compress();
2208
2209 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2210 .expect("encoded frame should have a header");
2211 assert_eq!(
2212 frame_header.dictionary_id(),
2213 None,
2214 "matchers that do not support dictionary priming must not advertise dictionary dependency"
2215 );
2216
2217 let mut decoder = FrameDecoder::new();
2218 let mut decoded = Vec::with_capacity(payload.len());
2219 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2220 assert_eq!(decoded, payload);
2221 }
2222
2223 #[cfg(feature = "hash")]
2224 #[test]
2225 fn checksum_two_frames_reused_compressor() {
2226 let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
2232
2233 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2234
2235 let mut compressed1 = Vec::new();
2237 compressor.set_source(data.as_slice());
2238 compressor.set_drain(&mut compressed1);
2239 compressor.compress();
2240
2241 let mut compressed2 = Vec::new();
2243 compressor.set_source(data.as_slice());
2244 compressor.set_drain(&mut compressed2);
2245 compressor.compress();
2246
2247 fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
2248 let mut decoder = FrameDecoder::new();
2249 let mut source = compressed;
2250 decoder.reset(&mut source).unwrap();
2251 while !decoder.is_finished() {
2252 decoder
2253 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2254 .unwrap();
2255 }
2256 let mut decoded = Vec::new();
2257 decoder.collect_to_writer(&mut decoded).unwrap();
2258 (
2259 decoded,
2260 decoder.get_checksum_from_data(),
2261 decoder.get_calculated_checksum(),
2262 )
2263 }
2264
2265 let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
2266 assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
2267 assert_eq!(
2268 chksum_from_data1, chksum_calculated1,
2269 "frame 1: checksum mismatch"
2270 );
2271
2272 let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
2273 assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
2274 assert_eq!(
2275 chksum_from_data2, chksum_calculated2,
2276 "frame 2: checksum mismatch"
2277 );
2278
2279 assert_eq!(
2282 chksum_from_data1, chksum_from_data2,
2283 "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
2284 );
2285 }
2286
2287 #[cfg(feature = "std")]
2288 #[test]
2289 fn fuzz_targets() {
2290 use std::io::Read;
2291 fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
2292 let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
2293 let mut result: Vec<u8> = Vec::new();
2294 decoder.read_to_end(&mut result).expect("Decoding failed");
2295 result
2296 }
2297
2298 fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
2299 let mut decoder = crate::decoding::FrameDecoder::new();
2300 decoder.reset(&mut data).unwrap();
2301 let mut result = vec![];
2302 while !decoder.is_finished() || decoder.can_collect() > 0 {
2303 decoder
2304 .decode_blocks(
2305 &mut data,
2306 crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
2307 )
2308 .unwrap();
2309 decoder.collect_to_writer(&mut result).unwrap();
2310 }
2311 result
2312 }
2313
2314 fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2315 zstd::stream::encode_all(std::io::Cursor::new(data), 3)
2316 }
2317
2318 fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2319 let mut input = Vec::new();
2320 data.read_to_end(&mut input).unwrap();
2321
2322 crate::encoding::compress_to_vec(
2323 input.as_slice(),
2324 crate::encoding::CompressionLevel::Uncompressed,
2325 )
2326 }
2327
2328 fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2329 let mut input = Vec::new();
2330 data.read_to_end(&mut input).unwrap();
2331
2332 crate::encoding::compress_to_vec(
2333 input.as_slice(),
2334 crate::encoding::CompressionLevel::Fastest,
2335 )
2336 }
2337
2338 fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2339 let mut output = Vec::new();
2340 zstd::stream::copy_decode(data, &mut output)?;
2341 Ok(output)
2342 }
2343 if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
2344 for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
2345 if file.as_ref().unwrap().file_type().unwrap().is_file() {
2346 let data = std::fs::read(file.unwrap().path()).unwrap();
2347 let data = data.as_slice();
2348 let compressed = encode_zstd(data).unwrap();
2350 let decoded = decode_szstd(&mut compressed.as_slice());
2351 let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
2352 assert!(
2353 decoded == data,
2354 "Decoded data did not match the original input during decompression"
2355 );
2356 assert_eq!(
2357 decoded2, data,
2358 "Decoded data did not match the original input during decompression"
2359 );
2360
2361 let mut input = data;
2364 let compressed = encode_szstd_uncompressed(&mut input);
2365 let decoded = decode_zstd(&compressed).unwrap();
2366 assert_eq!(
2367 decoded, data,
2368 "Decoded data did not match the original input during compression"
2369 );
2370 let mut input = data;
2372 let compressed = encode_szstd_compressed(&mut input);
2373 let decoded = decode_zstd(&compressed).unwrap();
2374 assert_eq!(
2375 decoded, data,
2376 "Decoded data did not match the original input during compression"
2377 );
2378 }
2379 }
2380 }
2381 }
2382
2383 #[test]
2389 fn donor_split_block_from_borders_keeps_homogeneous_block() {
2390 let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
2391 let split = super::donor_split_block_from_borders(&block);
2392 assert_eq!(split, MAX_BLOCK_SIZE as usize);
2393 }
2394
2395 #[test]
2409 fn donor_split_block_from_borders_returns_midpoint_for_centred_transition() {
2410 let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
2411 for (i, byte) in block
2412 .iter_mut()
2413 .enumerate()
2414 .skip(MAX_BLOCK_SIZE as usize / 2)
2415 {
2416 *byte = (i % 251 + 1) as u8;
2417 }
2418 let split = super::donor_split_block_from_borders(&block);
2419 assert_eq!(
2420 split,
2421 64 * 1024,
2422 "centred-transition fixture must take the symmetric \
2423 midpoint arm (`abs_diff < min_distance`), got {split}"
2424 );
2425 }
2426
2427 #[test]
2432 fn donor_pre_split_level_dispatches_by_compression_level() {
2433 use crate::encoding::CompressionLevel;
2434 assert_eq!(
2435 super::donor_pre_split_level(CompressionLevel::Fastest),
2436 None
2437 );
2438 assert_eq!(
2439 super::donor_pre_split_level(CompressionLevel::Default),
2440 None
2441 );
2442 assert_eq!(super::donor_pre_split_level(CompressionLevel::Better), None);
2443 assert_eq!(
2444 super::donor_pre_split_level(CompressionLevel::Level(7)),
2445 None
2446 );
2447 assert_eq!(
2448 super::donor_pre_split_level(CompressionLevel::Level(11)),
2449 Some(0)
2450 );
2451 assert_eq!(
2452 super::donor_pre_split_level(CompressionLevel::Level(15)),
2453 Some(0)
2454 );
2455 assert_eq!(
2456 super::donor_pre_split_level(CompressionLevel::Level(16)),
2457 Some(4)
2458 );
2459 assert_eq!(
2460 super::donor_pre_split_level(CompressionLevel::Level(22)),
2461 Some(4)
2462 );
2463 }
2464
2465 #[test]
2472 fn level_13_borders_split_roundtrips_through_own_decoder() {
2473 use crate::encoding::CompressionLevel;
2474 let mut data = vec![0u8; 256 * 1024];
2475 for (i, byte) in data.iter_mut().enumerate() {
2478 *byte = if i < 128 * 1024 {
2479 (i & 0x07) as u8
2480 } else {
2481 (i % 251 + 1) as u8
2482 };
2483 }
2484
2485 let mut compressed = Vec::new();
2486 let mut compressor = FrameCompressor::new(CompressionLevel::Level(13));
2487 compressor.set_source(data.as_slice());
2488 compressor.set_drain(&mut compressed);
2489 compressor.compress();
2490
2491 let mut decoder = FrameDecoder::new();
2492 let mut source = compressed.as_slice();
2493 decoder
2494 .reset(&mut source)
2495 .expect("frame header should parse");
2496 while !decoder.is_finished() {
2497 decoder
2498 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2499 .expect("decode should succeed");
2500 }
2501 let mut decoded = Vec::with_capacity(data.len());
2502 decoder.collect_to_writer(&mut decoded).unwrap();
2503 assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
2504 }
2505
2506 #[cfg(feature = "std")]
2518 #[test]
2519 fn set_compression_level_then_compress_refreshes_strategy_tag() {
2520 use super::CompressionLevel;
2521 use crate::encoding::strategy::StrategyTag;
2522
2523 let data = vec![0xABu8; 256];
2524 let mut out = Vec::new();
2525 let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
2526 let initial_tag = compressor.state.strategy_tag;
2527 assert_eq!(
2528 initial_tag,
2529 StrategyTag::for_compression_level(CompressionLevel::Fastest),
2530 "construction-time strategy_tag must reflect initial level",
2531 );
2532
2533 let new_level = CompressionLevel::Level(20);
2537 compressor.set_compression_level(new_level);
2538 compressor.set_source(data.as_slice());
2539 compressor.set_drain(&mut out);
2540 compressor.compress();
2541
2542 let new_tag = compressor.state.strategy_tag;
2543 let expected = StrategyTag::for_compression_level(new_level);
2544 assert_eq!(
2545 new_tag, expected,
2546 "strategy_tag must follow set_compression_level → compress, \
2547 got {new_tag:?} expected {expected:?}",
2548 );
2549 assert_eq!(
2550 expected,
2551 StrategyTag::BtUltra2,
2552 "test fixture invariant: Level(20) must resolve to BtUltra2 \
2553 so the post-switch tag visibly crosses the band boundary",
2554 );
2555 assert_ne!(
2556 new_tag, initial_tag,
2557 "test fixture invariant: chosen levels must resolve to \
2558 different StrategyTag variants",
2559 );
2560 }
2561
2562 #[test]
2566 fn magicless_frame_omits_magic_and_roundtrips() {
2567 use crate::common::MAGIC_NUM;
2568 let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
2569
2570 let mut output: Vec<u8> = Vec::new();
2572 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
2573 compressor.set_magicless(true);
2574 compressor.set_source(input.as_slice());
2575 compressor.set_drain(&mut output);
2576 compressor.compress();
2577
2578 assert!(
2580 !output.starts_with(&MAGIC_NUM.to_le_bytes()),
2581 "magicless frame must omit the 4-byte magic prefix",
2582 );
2583
2584 let mut decoder = crate::decoding::FrameDecoder::new();
2586 decoder.set_magicless(true);
2587 let mut cursor: &[u8] = output.as_slice();
2588 decoder.init(&mut cursor).expect("magicless init");
2589 decoder
2590 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
2591 .expect("decode_blocks");
2592 let mut decoded: Vec<u8> = Vec::new();
2593 decoder
2594 .collect_to_writer(&mut decoded)
2595 .expect("collect_to_writer");
2596 assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
2597
2598 use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
2609 let mut std_decoder = crate::decoding::FrameDecoder::new();
2610 let std_init = std_decoder.init(output.as_slice());
2611 match std_init {
2612 Err(FrameDecoderError::ReadFrameHeaderError(
2613 ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
2614 )) => {}
2615 other => panic!(
2616 "standard decoder must reject a magicless frame with \
2617 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
2618 ),
2619 }
2620 }
2621}