1use alloc::vec::Vec;
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12 CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13 match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20#[derive(Clone)]
33pub struct EncoderDictionary {
34 pub(crate) inner: crate::decoding::Dictionary,
35}
36
37impl EncoderDictionary {
38 pub fn from_dictionary(dictionary: crate::decoding::Dictionary) -> Self {
42 Self { inner: dictionary }
43 }
44
45 pub fn from_bytes(
50 raw_dictionary: &[u8],
51 ) -> Result<Self, crate::decoding::errors::DictionaryDecodeError> {
52 Ok(Self {
53 inner: crate::decoding::Dictionary::decode_dict_for_encoding(raw_dictionary)?,
54 })
55 }
56
57 pub fn id(&self) -> u32 {
65 self.inner.id
66 }
67}
68
69pub struct FrameCompressor<
89 R: Read = &'static [u8],
90 W: Write = Vec<u8>,
91 M: Matcher = MatchGeneratorDriver,
92> {
93 uncompressed_data: Option<R>,
94 compressed_data: Option<W>,
95 compression_level: CompressionLevel,
96 dictionary: Option<EncoderDictionary>,
97 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
98 source_size_hint: Option<u64>,
99 state: CompressState<M>,
100 magicless: bool,
106 content_checksum: bool,
113 content_size_flag: bool,
119 dict_id_flag: bool,
125 target_block_size: Option<u32>,
133 #[cfg(feature = "hash")]
134 hasher: XxHash64,
135 #[cfg(feature = "lsm")]
139 frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
140 #[cfg(all(feature = "lsm", feature = "hash"))]
146 per_block_checksums_enabled: bool,
147 #[cfg(all(feature = "lsm", feature = "hash"))]
152 block_checksums: Option<alloc::vec::Vec<u32>>,
153 #[cfg(feature = "lsm")]
160 block_decompressed_sizes: alloc::vec::Vec<u32>,
161 strategy_override: Option<crate::encoding::strategy::StrategyTag>,
168}
169
170#[derive(Clone, Default)]
171pub(crate) struct CachedDictionaryEntropy {
172 pub(crate) huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
173 pub(crate) ll_previous: Option<PreviousFseTable>,
174 pub(crate) ml_previous: Option<PreviousFseTable>,
175 pub(crate) of_previous: Option<PreviousFseTable>,
176}
177
178impl CachedDictionaryEntropy {
179 pub(crate) fn heap_size(&self) -> usize {
183 let mut total = self.huff.as_ref().map_or(0, |h| h.heap_size());
184 for prev in [&self.ll_previous, &self.ml_previous, &self.of_previous] {
185 if let Some(PreviousFseTable::Custom(table)) = prev {
186 total +=
187 core::mem::size_of::<crate::fse::fse_encoder::FSETable>() + table.heap_size();
188 }
189 }
190 total
191 }
192
193 pub(crate) fn from_dictionary(dictionary: &crate::decoding::Dictionary) -> Self {
199 Self {
200 huff: dictionary.huf.table.to_encoder_table(),
201 ll_previous: dictionary
202 .fse
203 .literal_lengths
204 .to_encoder_table()
205 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
206 ml_previous: dictionary
207 .fse
208 .match_lengths
209 .to_encoder_table()
210 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
211 of_previous: dictionary
212 .fse
213 .offsets
214 .to_encoder_table()
215 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
216 }
217 }
218}
219
220#[cfg(target_has_atomic = "ptr")]
228pub(crate) type SharedFseTable = alloc::sync::Arc<FSETable>;
229#[cfg(not(target_has_atomic = "ptr"))]
230pub(crate) type SharedFseTable = alloc::rc::Rc<FSETable>;
231
232#[derive(Clone)]
233pub(crate) enum PreviousFseTable {
234 Default,
237 Custom(SharedFseTable),
242 Rle(u8),
243}
244
245impl PreviousFseTable {
246 pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
247 match self {
248 Self::Default => Some(default),
249 Self::Custom(table) => Some(table),
250 Self::Rle(_) => None,
251 }
252 }
253}
254
255pub(crate) struct FseTables {
256 pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
268 pub(crate) ll_previous: Option<PreviousFseTable>,
269 pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
270 pub(crate) ml_previous: Option<PreviousFseTable>,
271 pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
272 pub(crate) of_previous: Option<PreviousFseTable>,
273}
274
275impl FseTables {
276 pub fn new() -> Self {
277 Self {
278 ll_default: default_ll_table(),
279 ll_previous: None,
280 ml_default: default_ml_table(),
281 ml_previous: None,
282 of_default: default_of_table(),
283 of_previous: None,
284 }
285 }
286
287 #[inline]
294 #[allow(clippy::borrow_deref_ref)]
295 pub(crate) fn ll_default_ref(&self) -> &FSETable {
296 &*self.ll_default
297 }
298
299 #[inline]
301 #[allow(clippy::borrow_deref_ref)]
302 pub(crate) fn ml_default_ref(&self) -> &FSETable {
303 &*self.ml_default
304 }
305
306 #[inline]
308 #[allow(clippy::borrow_deref_ref)]
309 pub(crate) fn of_default_ref(&self) -> &FSETable {
310 &*self.of_default
311 }
312}
313
314const PRESPLIT_BLOCK_MIN: usize = 3500;
315const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
316const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
317const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
318const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
319const PRESPLIT_HASH_LOG_MAX: usize = 10;
320const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
321const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
322const PRESPLIT_BORDERS_SEGMENT: usize = 512;
327
328#[derive(Clone)]
329struct PreSplitFingerprint {
330 events: [u32; PRESPLIT_HASH_TABLE_SIZE],
331 nb_events: usize,
332}
333
334impl Default for PreSplitFingerprint {
335 fn default() -> Self {
336 Self {
337 events: [0; PRESPLIT_HASH_TABLE_SIZE],
338 nb_events: 0,
339 }
340 }
341}
342
343fn reserve_for_next_block(
358 out: &mut Vec<u8>,
359 blocks_start: usize,
360 consumed: u64,
361 remaining: usize,
362 block_capacity: usize,
363) {
364 let block_bound = remaining.min(block_capacity) + 3 + 16;
368 if out.capacity() - out.len() >= block_bound {
369 return;
370 }
371 let produced = (out.len() - blocks_start) as u64;
372 let estimate = if consumed == 0 {
373 block_bound
376 } else {
377 let scaled = ((remaining as u128 * produced as u128) / consumed as u128) as u64;
381 let headers = (remaining as u64 / block_capacity.max(1) as u64 + 1) * 3;
382 usize::try_from(scaled + scaled / 16 + headers + 64).unwrap_or(usize::MAX)
383 };
384 out.reserve_exact(estimate.max(block_bound + produced as usize));
393}
394
395fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
396 debug_assert!(hash_log >= 8);
397 if hash_log == 8 {
398 return bytes[0] as usize;
399 }
400 debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
401 let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
402 (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
403}
404
405fn presplit_record_fingerprint(
406 fp: &mut PreSplitFingerprint,
407 src: &[u8],
408 sampling_rate: usize,
409 hash_log: usize,
410) {
411 fp.events.fill(0);
412 fp.nb_events = 0;
413 if src.len() < 2 {
414 return;
415 }
416 let limit = src.len() - 1;
417 let mut n = 0usize;
418 while n < limit {
419 fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
420 n += sampling_rate;
421 }
422 fp.nb_events += limit / sampling_rate;
425}
426
427fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
433 fp.events.fill(0);
434 for &b in src {
435 fp.events[b as usize] += 1;
436 }
437 fp.nb_events = src.len();
440}
441
442fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
443 let slots = 1usize << hash_log;
444 let mut distance = 0u64;
445 for idx in 0..slots {
446 let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
447 let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
448 distance += left.abs_diff(right) as u64;
452 }
453 distance
454}
455
456fn presplit_fingerprints_differ(
457 reference: &PreSplitFingerprint,
458 new_fp: &PreSplitFingerprint,
459 penalty: i32,
460 hash_log: usize,
461) -> bool {
462 debug_assert!(reference.nb_events > 0);
463 debug_assert!(new_fp.nb_events > 0);
464 let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
465 let deviation = presplit_distance(reference, new_fp, hash_log);
466 let threshold =
469 p50 * (PRESPLIT_THRESHOLD_BASE + penalty as u64) / PRESPLIT_THRESHOLD_PENALTY_RATE;
470 deviation >= threshold
471}
472
473fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
474 for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
478 acc.events[idx] += new_fp.events[idx];
479 }
480 acc.nb_events += new_fp.nb_events;
481}
482
483fn split_block_by_chunks(block: &[u8], level: usize) -> usize {
484 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
485 debug_assert!((1..=4).contains(&level));
486 let (sampling_rate, hash_log) = match level - 1 {
487 0 => (43, 8),
488 1 => (11, 9),
489 2 => (5, 10),
490 _ => (1, 10),
491 };
492
493 let mut past = PreSplitFingerprint::default();
494 let mut new_events = PreSplitFingerprint::default();
495 let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
496 presplit_record_fingerprint(
497 &mut past,
498 &block[..PRESPLIT_CHUNK_SIZE],
499 sampling_rate,
500 hash_log,
501 );
502 let mut pos = PRESPLIT_CHUNK_SIZE;
503 while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
504 presplit_record_fingerprint(
505 &mut new_events,
506 &block[pos..pos + PRESPLIT_CHUNK_SIZE],
507 sampling_rate,
508 hash_log,
509 );
510 if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
511 return pos;
512 }
513 presplit_merge_events(&mut past, &new_events);
514 if penalty > 0 {
515 penalty -= 1;
516 }
517 pos += PRESPLIT_CHUNK_SIZE;
518 }
519 block.len()
520}
521
522fn split_block_from_borders(block: &[u8]) -> usize {
530 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
531 let block_size = block.len();
532 let mut past = PreSplitFingerprint::default();
533 let mut new_fp = PreSplitFingerprint::default();
534 presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
535 presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
536 if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
539 return block_size;
540 }
541
542 let mut middle = PreSplitFingerprint::default();
543 let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
544 presplit_record_byte_histogram(
545 &mut middle,
546 &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
547 );
548
549 let dist_from_begin = presplit_distance(&past, &middle, 8);
550 let dist_from_end = presplit_distance(&new_fp, &middle, 8);
551 let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
555 if dist_from_begin.abs_diff(dist_from_end) < min_distance {
556 return 64 * 1024;
557 }
558 if dist_from_begin > dist_from_end {
567 32 * 1024
568 } else {
569 96 * 1024
570 }
571}
572
573#[cfg(all(feature = "lsm", feature = "hash"))]
580#[inline]
581pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
582 let mut h = XxHash64::with_seed(0);
583 h.write(data);
584 h.finish() as u32
585}
586
587#[cfg(feature = "bench_internals")]
595pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
596 assert_eq!(
597 block.len(),
598 MAX_BLOCK_SIZE as usize,
599 "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
600 );
601 assert!(
602 split_level <= 4,
603 "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
604 );
605 if split_level == 0 {
606 split_block_from_borders(block)
607 } else {
608 split_block_by_chunks(block, split_level)
609 }
610}
611
612#[inline]
628fn warm_presplit_window(window: &[u8]) {
629 let mut acc = 0u8;
630 let mut i = 0usize;
631 while i < window.len() {
632 acc ^= window[i];
633 i += 64;
634 }
635 core::hint::black_box(acc);
636}
637
638pub(crate) fn optimal_block_size(
639 level: CompressionLevel,
640 block: &[u8],
641 remaining_src_size: usize,
642 block_size_max: usize,
643 savings: i64,
644) -> usize {
645 let Some(split_level) = crate::encoding::match_generator::level_pre_split(level) else {
646 return remaining_src_size.min(block_size_max);
647 };
648 if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
649 return remaining_src_size.min(block_size_max);
650 }
651 if savings < 3 {
652 return MAX_BLOCK_SIZE as usize;
653 }
654 if block.len() < MAX_BLOCK_SIZE as usize {
655 return remaining_src_size.min(block_size_max);
656 }
657 let raw_split = if split_level == 0 {
662 split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
663 } else {
664 split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
665 };
666 raw_split
667 .max(PRESPLIT_BLOCK_MIN)
668 .min(MAX_BLOCK_SIZE as usize)
669}
670
671pub(crate) struct CompressState<M: Matcher> {
672 pub(crate) matcher: M,
673 pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
674 pub(crate) huff_table_spare: Option<crate::huff0::huff0_encoder::HuffmanTable>,
680 pub(crate) fse_tables: FseTables,
681 pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
682 pub(crate) offset_hist: [u32; 3],
685 pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
703}
704
705impl<M: Matcher> CompressState<M> {
706 #[inline]
709 pub(crate) fn clear_huff_table(&mut self) {
710 if let Some(table) = self.last_huff_table.take() {
711 self.huff_table_spare = Some(table);
712 }
713 }
714
715 #[inline]
718 pub(crate) fn replace_huff_table(&mut self, table: crate::huff0::huff0_encoder::HuffmanTable) {
719 if let Some(old) = self.last_huff_table.replace(table) {
720 self.huff_table_spare = Some(old);
721 }
722 }
723}
724
725struct FramePrep {
730 window_size: u64,
731 use_dictionary_state: bool,
732 source_size_hint_known: bool,
733 initial_size_hint: Option<u64>,
734}
735
736fn initial_all_blocks_cap(initial_size_hint: Option<u64>, block_capacity: usize) -> usize {
756 const TINY_THRESHOLD: u64 = 4 * 1024;
757 const SMALL_THRESHOLD: u64 = 64 * 1024;
758 const TINY_CAP: usize = 4 * 1024;
759 const SMALL_CAP: usize = 16 * 1024;
760 const DEFAULT_CAP: usize = 130 * 1024;
761 let first_block_cap = block_capacity + 3 + 16;
762 match initial_size_hint {
763 Some(h) if h <= TINY_THRESHOLD => TINY_CAP.min(first_block_cap),
764 Some(h) if h <= SMALL_THRESHOLD => SMALL_CAP.min(first_block_cap),
765 _ => DEFAULT_CAP.min(first_block_cap),
766 }
767}
768
769pub(crate) trait OwnedBlockSource {
785 fn fill_block(
786 &mut self,
787 buf: &mut Vec<u8>,
788 block_capacity: usize,
789 size_hint_remaining: Option<u64>,
790 ) -> (usize, bool);
791}
792
793impl OwnedBlockSource for &[u8] {
794 fn fill_block(
795 &mut self,
796 buf: &mut Vec<u8>,
797 block_capacity: usize,
798 _size_hint_remaining: Option<u64>,
799 ) -> (usize, bool) {
800 let want = block_capacity - buf.len();
801 let take = want.min(self.len());
802 buf.extend_from_slice(&self[..take]);
803 *self = &self[take..];
804 (take, take < want)
805 }
806}
807
808pub(crate) struct ReaderBlockSource<Rd>(pub(crate) Rd);
813
814impl<Rd: Read> OwnedBlockSource for ReaderBlockSource<Rd> {
815 fn fill_block(
816 &mut self,
817 buf: &mut Vec<u8>,
818 block_capacity: usize,
819 size_hint_remaining: Option<u64>,
820 ) -> (usize, bool) {
821 let start = buf.len();
822 let mut filled = start;
823 let mut reached_eof = false;
824 let initial_target = match size_hint_remaining {
838 Some(remaining) => {
839 let remaining = remaining.min(block_capacity as u64) as usize;
840 filled + remaining.min(block_capacity - filled)
841 }
842 None => block_capacity,
845 };
846 if buf.len() < initial_target {
847 buf.resize(initial_target, 0);
848 }
849 loop {
850 if reached_eof || filled == block_capacity {
851 break;
852 }
853 if filled == buf.len() {
854 let grow_to = (buf.len() * 2).clamp(filled + 1, block_capacity);
861 buf.resize(grow_to, 0);
862 }
863 let read_end = buf.len();
864 let new_bytes = self.0.read(&mut buf[filled..read_end]).unwrap();
865 if new_bytes == 0 {
866 reached_eof = true;
867 break;
868 }
869 filled += new_bytes;
870 }
871 buf.truncate(filled);
872 (filled - start, reached_eof)
873 }
874}
875
876impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
877 pub fn new(compression_level: CompressionLevel) -> Self {
879 Self {
880 uncompressed_data: None,
881 compressed_data: None,
882 compression_level,
883 dictionary: None,
884 dictionary_entropy_cache: None,
885 source_size_hint: None,
886 state: CompressState {
887 matcher: MatchGeneratorDriver::new(1024 * 128, 1),
888 last_huff_table: None,
889 huff_table_spare: None,
890 fse_tables: FseTables::new(),
891 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
892 offset_hist: [1, 4, 8],
893 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
894 compression_level,
895 ),
896 },
897 magicless: false,
898 content_checksum: false,
899 content_size_flag: true,
900 dict_id_flag: true,
901 target_block_size: None,
902 #[cfg(feature = "hash")]
903 hasher: XxHash64::with_seed(0),
904 #[cfg(feature = "lsm")]
905 frame_emit_info: None,
906 #[cfg(all(feature = "lsm", feature = "hash"))]
907 per_block_checksums_enabled: false,
908 #[cfg(all(feature = "lsm", feature = "hash"))]
909 block_checksums: None,
910 #[cfg(feature = "lsm")]
911 block_decompressed_sizes: alloc::vec::Vec::new(),
912 strategy_override: None,
913 }
914 }
915
916 pub fn set_parameters(&mut self, params: &crate::encoding::CompressionParameters) {
939 self.compression_level = params.level();
940 let overrides = params.overrides();
941 self.strategy_override = overrides.strategy.map(|s| s.tag());
942 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
946 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
947 });
948 self.state.matcher.set_param_overrides(Some(overrides));
949 }
950
951 fn borrowed_eligible(&self, input_len: usize, prep: &FramePrep) -> bool {
978 if prep.use_dictionary_state
979 || matches!(self.compression_level, CompressionLevel::Uncompressed)
980 || input_len > u32::MAX as usize
981 {
982 return false;
983 }
984 self.state.matcher.borrowed_supported()
992 }
993
994 fn run_one_frame(&mut self, input: &[u8], prep: &FramePrep, out: &mut Vec<u8>) -> u64 {
1001 if self.borrowed_eligible(input.len(), prep) {
1002 self.run_borrowed_block_loop(input, out)
1003 } else {
1004 let mut cursor: &[u8] = input;
1005 self.run_owned_block_loop(&mut cursor, prep.initial_size_hint, true, out)
1006 }
1007 }
1008
1009 pub fn compress_independent_frame_into(&mut self, input: &[u8], out: &mut Vec<u8>) {
1053 self.source_size_hint = Some(input.len() as u64);
1057 let prep = self.prepare_frame();
1058 let total_uncompressed = input.len() as u64;
1064 let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1065 let checksum_len = if emit_checksum { 4 } else { 0 };
1066 out.clear();
1067 let first_block_bound = input.len().min(self.block_capacity()) + 3;
1078 out.reserve(18 + first_block_bound + checksum_len);
1079 self.append_frame_header(total_uncompressed, &prep, out);
1080 let header_len = out.len();
1081 let _ = self.run_one_frame(input, &prep, out);
1082 #[cfg(feature = "hash")]
1083 if self.content_checksum {
1084 out.extend_from_slice(&(self.hasher.finish() as u32).to_le_bytes());
1085 }
1086 #[cfg(feature = "lsm")]
1087 {
1088 let blocks_end = out.len() - checksum_len;
1089 self.populate_frame_emit_info(header_len, &out[header_len..blocks_end], emit_checksum);
1090 }
1091 #[cfg(not(feature = "lsm"))]
1092 let _ = header_len;
1093 }
1094
1095 pub fn compress_independent_frame(&mut self, input: &[u8]) -> Vec<u8> {
1109 let mut out = Vec::new();
1110 self.compress_independent_frame_into(input, &mut out);
1111 out
1112 }
1113
1114 fn run_borrowed_block_loop(&mut self, input: &[u8], out: &mut Vec<u8>) -> u64 {
1123 let blocks_start = out.len();
1131 let total_uncompressed = input.len() as u64;
1132 if input.is_empty() {
1135 let header = BlockHeader {
1136 last_block: true,
1137 block_type: crate::blocks::block::BlockType::Raw,
1138 block_size: 0,
1139 };
1140 header.serialize(out);
1141 #[cfg(feature = "lsm")]
1142 self.block_decompressed_sizes.push(0);
1143 #[cfg(all(feature = "lsm", feature = "hash"))]
1144 if let Some(checksums) = self.block_checksums.as_mut() {
1145 checksums.push(xxh64_block_low32(&[]));
1146 }
1147 return total_uncompressed;
1148 }
1149 unsafe {
1153 self.state.matcher.set_borrowed_window(input);
1154 }
1155 struct ClearBorrowedOnDrop(*mut MatchGeneratorDriver);
1161 impl Drop for ClearBorrowedOnDrop {
1162 fn drop(&mut self) {
1163 unsafe { (*self.0).clear_borrowed_window() };
1169 }
1170 }
1171 let _clear_guard = ClearBorrowedOnDrop(core::ptr::addr_of_mut!(self.state.matcher));
1172 let block_capacity = self.block_capacity();
1173 let mut start = 0usize;
1174 while start < input.len() {
1175 reserve_for_next_block(
1176 out,
1177 blocks_start,
1178 start as u64,
1179 input.len() - start,
1180 block_capacity,
1181 );
1182 let savings = start as i64 - (out.len() - blocks_start) as i64;
1190 if savings >= 3
1199 && input.len() - start >= MAX_BLOCK_SIZE as usize
1200 && block_capacity >= MAX_BLOCK_SIZE as usize
1201 && crate::encoding::match_generator::level_pre_split(self.compression_level)
1202 .is_some()
1203 {
1204 warm_presplit_window(&input[start..start + MAX_BLOCK_SIZE as usize]);
1205 }
1206 let block_len = optimal_block_size(
1207 self.compression_level,
1208 &input[start..],
1209 input.len() - start,
1210 block_capacity,
1211 savings,
1212 );
1213 let end = (start + block_len).min(input.len());
1214 let block = &input[start..end];
1215 let last_block = end == input.len();
1216 #[cfg(feature = "hash")]
1217 if self.content_checksum {
1218 self.hasher.write(block);
1219 }
1220 crate::encoding::levels::compress_block_encoded_borrowed(
1221 &mut self.state,
1222 self.compression_level,
1223 last_block,
1224 block,
1225 start,
1226 end,
1227 out,
1228 #[cfg(feature = "lsm")]
1229 Some(&mut self.block_decompressed_sizes),
1230 #[cfg(all(feature = "lsm", feature = "hash"))]
1231 self.block_checksums.as_mut(),
1232 );
1233 start = end;
1234 }
1235 total_uncompressed
1237 }
1238}
1239
1240impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
1241 pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
1243 Self {
1244 uncompressed_data: None,
1245 compressed_data: None,
1246 dictionary: None,
1247 dictionary_entropy_cache: None,
1248 source_size_hint: None,
1249 state: CompressState {
1250 matcher,
1251 last_huff_table: None,
1252 huff_table_spare: None,
1253 fse_tables: FseTables::new(),
1254 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
1255 offset_hist: [1, 4, 8],
1256 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
1257 compression_level,
1258 ),
1259 },
1260 compression_level,
1261 magicless: false,
1262 content_checksum: false,
1263 content_size_flag: true,
1264 dict_id_flag: true,
1265 target_block_size: None,
1266 #[cfg(feature = "hash")]
1267 hasher: XxHash64::with_seed(0),
1268 #[cfg(feature = "lsm")]
1269 frame_emit_info: None,
1270 #[cfg(all(feature = "lsm", feature = "hash"))]
1271 per_block_checksums_enabled: false,
1272 #[cfg(all(feature = "lsm", feature = "hash"))]
1273 block_checksums: None,
1274 #[cfg(feature = "lsm")]
1275 block_decompressed_sizes: alloc::vec::Vec::new(),
1276 strategy_override: None,
1277 }
1278 }
1279
1280 pub fn set_magicless(&mut self, magicless: bool) {
1287 self.magicless = magicless;
1288 }
1289
1290 pub fn set_content_checksum(&mut self, emit: bool) {
1302 self.content_checksum = emit;
1303 }
1304
1305 pub fn set_content_size_flag(&mut self, emit: bool) {
1311 self.content_size_flag = emit;
1312 }
1313
1314 pub fn set_dictionary_id_flag(&mut self, emit: bool) {
1320 self.dict_id_flag = emit;
1321 }
1322
1323 pub fn set_target_block_size(&mut self, target: Option<u32>) {
1330 self.target_block_size = target.map(|t| {
1331 t.clamp(
1332 crate::common::MIN_TARGET_BLOCK_SIZE,
1333 crate::common::MAX_BLOCK_SIZE,
1334 )
1335 });
1336 }
1337
1338 fn block_capacity(&self) -> usize {
1341 self.target_block_size
1342 .map_or(crate::common::MAX_BLOCK_SIZE as usize, |t| t as usize)
1343 }
1344
1345 pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
1349 self.uncompressed_data.replace(uncompressed_data)
1350 }
1351
1352 pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
1356 self.compressed_data.replace(compressed_data)
1357 }
1358
1359 pub fn set_source_size_hint(&mut self, size: u64) {
1369 self.source_size_hint = Some(size);
1370 }
1371
1372 pub fn heap_size(&self) -> usize {
1380 let mut total = self.state.matcher.heap_size();
1381 total += self
1382 .state
1383 .last_huff_table
1384 .as_ref()
1385 .map_or(0, |table| table.heap_size());
1386 total += self
1387 .state
1388 .huff_table_spare
1389 .as_ref()
1390 .map_or(0, |table| table.heap_size());
1391 total += self
1392 .dictionary
1393 .as_ref()
1394 .map_or(0, |d| d.inner.dict_content.capacity());
1395 total += self
1396 .dictionary_entropy_cache
1397 .as_ref()
1398 .map_or(0, CachedDictionaryEntropy::heap_size);
1399 #[cfg(all(feature = "lsm", feature = "hash"))]
1400 {
1401 total += self
1402 .block_checksums
1403 .as_ref()
1404 .map_or(0, |v| v.capacity() * core::mem::size_of::<u32>());
1405 }
1406 #[cfg(feature = "lsm")]
1407 {
1408 total += self.block_decompressed_sizes.capacity() * core::mem::size_of::<u32>();
1409 }
1410 total
1411 }
1412
1413 pub fn compress(&mut self) {
1428 let prep = self.prepare_frame();
1429 let mut source = self
1443 .uncompressed_data
1444 .take()
1445 .expect("source must be set via set_source before compress()");
1446 let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap(
1450 prep.initial_size_hint,
1451 self.block_capacity(),
1452 ));
1453 let mut block_source = ReaderBlockSource(&mut source);
1454 let total_uncompressed = self.run_owned_block_loop(
1455 &mut block_source,
1456 prep.initial_size_hint,
1457 false,
1458 &mut all_blocks,
1459 );
1460 self.uncompressed_data = Some(source);
1461 self.finish_frame(all_blocks, total_uncompressed, &prep);
1462 }
1463
1464 fn prepare_frame(&mut self) -> FramePrep {
1465 #[cfg(feature = "lsm")]
1468 {
1469 self.frame_emit_info = None;
1470 self.block_decompressed_sizes.clear();
1473 }
1474 #[cfg(all(feature = "lsm", feature = "hash"))]
1475 {
1476 if self.per_block_checksums_enabled {
1477 self.block_checksums = Some(alloc::vec::Vec::new());
1478 } else {
1479 self.block_checksums = None;
1480 }
1481 }
1482 let initial_size_hint = self.source_size_hint;
1483 let source_size_hint_known = initial_size_hint.is_some();
1484 let use_dictionary_state =
1485 !matches!(self.compression_level, CompressionLevel::Uncompressed)
1486 && self.state.matcher.supports_dictionary_priming()
1487 && self.dictionary.is_some();
1488 if let Some(size_hint) = self.source_size_hint.take() {
1489 self.state.matcher.set_source_size_hint(size_hint);
1492 }
1493 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1500 self.state
1501 .matcher
1502 .set_dictionary_size_hint(dict.inner.dict_content.len());
1503 }
1504 self.state.matcher.reset(self.compression_level);
1506 self.state.offset_hist = [1, 4, 8];
1507 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
1517 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
1518 });
1519 let cached_entropy = if use_dictionary_state {
1520 self.dictionary_entropy_cache.as_ref()
1521 } else {
1522 None
1523 };
1524 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1525 self.state.offset_hist = dict.inner.offset_hist;
1528 let cutoff_log = match self.state.strategy_tag {
1551 crate::encoding::strategy::StrategyTag::Fast
1552 | crate::encoding::strategy::StrategyTag::BtUltra
1553 | crate::encoding::strategy::StrategyTag::BtUltra2 => 13,
1554 crate::encoding::strategy::StrategyTag::Dfast => 14,
1555 crate::encoding::strategy::StrategyTag::Greedy
1556 | crate::encoding::strategy::StrategyTag::Lazy
1557 | crate::encoding::strategy::StrategyTag::Btlazy2
1558 | crate::encoding::strategy::StrategyTag::BtOpt => 15,
1559 };
1560 let prefer_copy_snapshot = initial_size_hint.is_some_and(|s| {
1561 crate::encoding::match_generator::source_size_ceil_log(s) > cutoff_log
1562 });
1563 let restored = prefer_copy_snapshot
1564 && self
1565 .state
1566 .matcher
1567 .restore_primed_dictionary(self.compression_level);
1568 if !restored {
1569 self.state.matcher.prime_with_dictionary(
1570 dict.inner.dict_content.as_slice(),
1571 dict.inner.offset_hist,
1572 );
1573 if prefer_copy_snapshot {
1574 self.state
1575 .matcher
1576 .capture_primed_dictionary(self.compression_level);
1577 }
1578 }
1579 }
1580 if let Some(cache) = cached_entropy {
1581 match &cache.huff {
1588 Some(src) => {
1589 if self.state.last_huff_table.is_none() {
1590 self.state.last_huff_table = self.state.huff_table_spare.take();
1591 }
1592 match &mut self.state.last_huff_table {
1593 Some(dst) => dst.clone_from(src),
1594 slot => *slot = Some(src.clone()),
1595 }
1596 }
1597 None => self.state.clear_huff_table(),
1598 }
1599 } else {
1600 self.state.clear_huff_table();
1601 }
1602 if let Some(cache) = cached_entropy {
1605 self.state
1606 .fse_tables
1607 .ll_previous
1608 .clone_from(&cache.ll_previous);
1609 self.state
1610 .fse_tables
1611 .ml_previous
1612 .clone_from(&cache.ml_previous);
1613 self.state
1614 .fse_tables
1615 .of_previous
1616 .clone_from(&cache.of_previous);
1617 } else {
1618 self.state.fse_tables.ll_previous = None;
1619 self.state.fse_tables.ml_previous = None;
1620 self.state.fse_tables.of_previous = None;
1621 }
1622 let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
1623 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1624 _ => None,
1625 });
1626 let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
1627 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1628 _ => None,
1629 });
1630 let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
1631 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1632 _ => None,
1633 });
1634 self.state.matcher.seed_dictionary_entropy(
1635 self.state.last_huff_table.as_ref(),
1636 ll_entropy,
1637 ml_entropy,
1638 of_entropy,
1639 );
1640 #[cfg(feature = "hash")]
1641 {
1642 self.hasher = XxHash64::with_seed(0);
1643 }
1644 let window_size = self.state.matcher.window_size();
1645 assert!(
1646 window_size != 0,
1647 "matcher reported window_size == 0, which is invalid"
1648 );
1649 FramePrep {
1650 window_size,
1651 use_dictionary_state,
1652 source_size_hint_known,
1653 initial_size_hint,
1654 }
1655 }
1656
1657 fn run_owned_block_loop<S: OwnedBlockSource>(
1668 &mut self,
1669 source: &mut S,
1670 initial_size_hint: Option<u64>,
1671 hint_is_exact: bool,
1677 out: &mut Vec<u8>,
1678 ) -> u64 {
1679 let blocks_start = out.len();
1687 let mut total_uncompressed: u64 = 0;
1688 let mut pending_input: Vec<u8> = Vec::new();
1689 let mut reached_eof = false;
1690 let mut savings = 0i64;
1691 loop {
1693 let block_capacity = self.block_capacity();
1697 let mut uncompressed_data = self.state.matcher.get_next_space();
1706 uncompressed_data.clear();
1707 uncompressed_data.extend_from_slice(&pending_input);
1708 pending_input.clear();
1709 if !reached_eof {
1710 let size_hint_remaining = match initial_size_hint {
1714 Some(hint) if hint > total_uncompressed => Some(hint - total_uncompressed),
1715 _ => None,
1716 };
1717 let (appended, eof) =
1718 source.fill_block(&mut uncompressed_data, block_capacity, size_hint_remaining);
1719 total_uncompressed += appended as u64;
1720 reached_eof = eof;
1721 }
1722 let mut last_block = reached_eof;
1723 let remaining_for_split = if reached_eof {
1724 uncompressed_data.len()
1725 } else {
1726 block_capacity
1727 };
1728 if !matches!(self.compression_level, CompressionLevel::Uncompressed)
1729 && uncompressed_data.len() == block_capacity
1730 {
1731 let block_len = optimal_block_size(
1732 self.compression_level,
1733 &uncompressed_data,
1734 remaining_for_split,
1735 block_capacity,
1736 savings,
1737 );
1738 if block_len < uncompressed_data.len() {
1739 pending_input.clear();
1745 pending_input.extend_from_slice(&uncompressed_data[block_len..]);
1746 uncompressed_data.truncate(block_len);
1747 last_block = false;
1748 }
1749 }
1750 #[cfg(feature = "hash")]
1753 if self.content_checksum {
1754 self.hasher.write(&uncompressed_data);
1755 }
1756 let emitted =
1768 total_uncompressed - uncompressed_data.len() as u64 - pending_input.len() as u64;
1769 match initial_size_hint {
1770 Some(hint) if hint >= total_uncompressed => {
1771 let hint_remaining = hint - emitted;
1776 let remaining = if hint_is_exact {
1777 hint_remaining
1778 } else {
1779 let buffered = total_uncompressed - emitted;
1780 const HINT_LOOKAHEAD: u64 = 64 * 1024;
1781 hint_remaining.min(buffered + HINT_LOOKAHEAD)
1782 };
1783 reserve_for_next_block(
1784 out,
1785 blocks_start,
1786 emitted,
1787 remaining as usize,
1788 self.block_capacity(),
1789 );
1790 }
1791 _ => {
1792 out.reserve(uncompressed_data.len() + 3 + 16);
1793 }
1794 }
1795 if uncompressed_data.is_empty() {
1797 let header = BlockHeader {
1798 last_block: true,
1799 block_type: crate::blocks::block::BlockType::Raw,
1800 block_size: 0,
1801 };
1802 header.serialize(out);
1803 #[cfg(feature = "lsm")]
1804 self.block_decompressed_sizes.push(0);
1805 #[cfg(all(feature = "lsm", feature = "hash"))]
1806 if let Some(checksums) = self.block_checksums.as_mut() {
1807 checksums.push(xxh64_block_low32(&[]));
1808 }
1809 break;
1810 }
1811
1812 match self.compression_level {
1813 CompressionLevel::Uncompressed => {
1814 let header = BlockHeader {
1815 last_block,
1816 block_type: crate::blocks::block::BlockType::Raw,
1817 block_size: uncompressed_data.len().try_into().unwrap(),
1818 };
1819 header.serialize(out);
1820 #[cfg(feature = "lsm")]
1821 self.block_decompressed_sizes
1822 .push(uncompressed_data.len() as u32);
1823 #[cfg(all(feature = "lsm", feature = "hash"))]
1824 if let Some(checksums) = self.block_checksums.as_mut() {
1825 checksums.push(xxh64_block_low32(&uncompressed_data));
1826 }
1827 out.extend_from_slice(&uncompressed_data);
1828 savings +=
1829 uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
1830 }
1831 CompressionLevel::Fastest
1832 | CompressionLevel::Default
1833 | CompressionLevel::Better
1834 | CompressionLevel::Best
1835 | CompressionLevel::Level(_) => {
1836 let before_len = out.len();
1837 let block_len = uncompressed_data.len();
1838 let dict_active = self.dictionary.is_some()
1847 && self.state.matcher.supports_dictionary_priming();
1848 compress_block_encoded(
1849 &mut self.state,
1850 self.compression_level,
1851 last_block,
1852 uncompressed_data,
1853 out,
1854 dict_active,
1855 #[cfg(feature = "lsm")]
1856 Some(&mut self.block_decompressed_sizes),
1857 #[cfg(all(feature = "lsm", feature = "hash"))]
1858 self.block_checksums.as_mut(),
1859 );
1860 savings += block_len as i64 - (out.len() - before_len) as i64;
1861 }
1862 }
1863 if last_block && pending_input.is_empty() {
1864 break;
1865 }
1866 }
1867 total_uncompressed
1868 }
1869
1870 fn append_frame_header(&self, total_uncompressed: u64, prep: &FramePrep, out: &mut Vec<u8>) {
1875 let single_segment = self.content_size_flag
1886 && prep.source_size_hint_known
1887 && total_uncompressed >= 512
1888 && total_uncompressed <= prep.window_size;
1889 let header = FrameHeader {
1890 frame_content_size: self.content_size_flag.then_some(total_uncompressed),
1891 single_segment,
1892 content_checksum: cfg!(feature = "hash") && self.content_checksum,
1893 dictionary_id: if prep.use_dictionary_state && self.dict_id_flag {
1894 self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
1895 } else {
1896 None
1897 },
1898 window_size: if single_segment {
1899 None
1900 } else {
1901 Some(prep.window_size)
1902 },
1903 magicless: self.magicless,
1904 };
1905 header.serialize(out);
1906 }
1907
1908 fn finish_frame(&mut self, all_blocks: Vec<u8>, total_uncompressed: u64, prep: &FramePrep) {
1914 let mut header_buf: Vec<u8> = Vec::with_capacity(18);
1915 self.append_frame_header(total_uncompressed, prep, &mut header_buf);
1916 #[cfg(feature = "hash")]
1920 let checksum_bytes = self
1921 .content_checksum
1922 .then(|| (self.hasher.finish() as u32).to_le_bytes());
1923 let drain = self.compressed_data.as_mut().unwrap();
1924 drain.write_all(&header_buf).unwrap();
1925 drain.write_all(&all_blocks).unwrap();
1926 #[cfg(feature = "hash")]
1930 if let Some(checksum_bytes) = checksum_bytes {
1931 drain.write_all(&checksum_bytes).unwrap();
1932 }
1933 #[cfg(feature = "lsm")]
1934 {
1935 let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1936 self.populate_frame_emit_info(header_buf.len(), &all_blocks, emit_checksum);
1937 }
1938 }
1939
1940 #[cfg(feature = "lsm")]
1956 fn populate_frame_emit_info(
1957 &mut self,
1958 header_len: usize,
1959 all_blocks: &[u8],
1960 emit_checksum: bool,
1961 ) {
1962 use crate::blocks::block::BlockType as BT;
1963 use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
1964 let frame_header_len: u32 = match u32::try_from(header_len) {
1973 Ok(v) => v,
1974 Err(_) => return,
1975 };
1976 let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
1977 Ok(v) => v,
1978 Err(_) => return,
1979 };
1980 let mut blocks: Vec<FrameBlock> = Vec::new();
1981 let mut cursor: usize = 0;
1982 while cursor + 3 <= all_blocks.len() {
1983 let mut header_u32 = [0u8; 4];
1984 header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
1985 let raw = u32::from_le_bytes(header_u32);
1986 let last_block = (raw & 1) != 0;
1987 let block_type = match (raw >> 1) & 0b11 {
1988 0 => BT::Raw,
1989 1 => BT::RLE,
1990 2 => BT::Compressed,
1991 _ => BT::Reserved,
1992 };
1993 let block_size_field = raw >> 3;
1994 let physical_body: u32 = match block_type {
2002 BT::RLE => 1,
2003 _ => block_size_field,
2004 };
2005 let cursor_u32: u32 = match u32::try_from(cursor) {
2006 Ok(v) => v,
2007 Err(_) => return,
2008 };
2009 let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
2010 Some(v) => v,
2011 None => return,
2012 };
2013 let decompressed_size = match self.block_decompressed_sizes.get(blocks.len()).copied() {
2024 Some(size) => size,
2025 None if matches!(block_type, BT::Raw | BT::RLE) => block_size_field,
2026 None => {
2027 debug_assert!(
2028 false,
2029 "missing decompressed-size sidecar entry for compressed block {}",
2030 blocks.len()
2031 );
2032 return;
2033 }
2034 };
2035 blocks.push(FrameBlock {
2036 offset_in_frame,
2037 header_size: 3,
2038 body_size: physical_body,
2039 block_size_field,
2040 block_type,
2041 last_block,
2042 decompressed_size,
2043 });
2044 cursor += 3 + physical_body as usize;
2045 if last_block {
2046 break;
2047 }
2048 }
2049 if cursor != all_blocks.len() || !blocks.last().is_some_and(|b| b.last_block) {
2057 debug_assert!(
2058 false,
2059 "incomplete block scan in populate_frame_emit_info: cursor={} len={} last_block={:?}",
2060 cursor,
2061 all_blocks.len(),
2062 blocks.last().map(|b| b.last_block)
2063 );
2064 return;
2065 }
2066 let checksum_range = if emit_checksum {
2067 let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
2068 Some(v) => v,
2069 None => return,
2070 };
2071 let cs_end = match cs_start.checked_add(4) {
2072 Some(v) => v,
2073 None => return,
2074 };
2075 Some(cs_start..cs_end)
2076 } else {
2077 None
2078 };
2079 let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
2080 Some(v) => v,
2081 None => return,
2082 };
2083 let total_size = if checksum_range.is_some() {
2084 match body_total.checked_add(4) {
2085 Some(v) => v,
2086 None => return,
2087 }
2088 } else {
2089 body_total
2090 };
2091 self.frame_emit_info = Some(FrameEmitInfo {
2092 frame_header_range: 0..frame_header_len,
2093 blocks,
2094 checksum_range,
2095 total_size,
2096 });
2097 }
2098
2099 #[cfg(feature = "lsm")]
2110 pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
2111 self.frame_emit_info.as_ref()
2112 }
2113
2114 #[cfg(all(feature = "lsm", feature = "hash"))]
2133 pub fn enable_per_block_checksums(&mut self) {
2134 self.per_block_checksums_enabled = true;
2135 }
2136
2137 #[cfg(all(feature = "lsm", feature = "hash"))]
2144 pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
2145 self.block_checksums.as_deref()
2146 }
2147
2148 pub fn source_mut(&mut self) -> Option<&mut R> {
2150 self.uncompressed_data.as_mut()
2151 }
2152
2153 pub fn drain_mut(&mut self) -> Option<&mut W> {
2155 self.compressed_data.as_mut()
2156 }
2157
2158 pub fn source(&self) -> Option<&R> {
2160 self.uncompressed_data.as_ref()
2161 }
2162
2163 pub fn drain(&self) -> Option<&W> {
2165 self.compressed_data.as_ref()
2166 }
2167
2168 pub fn take_source(&mut self) -> Option<R> {
2170 self.uncompressed_data.take()
2171 }
2172
2173 pub fn take_drain(&mut self) -> Option<W> {
2175 self.compressed_data.take()
2176 }
2177
2178 pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
2180 core::mem::swap(&mut match_generator, &mut self.state.matcher);
2181 match_generator
2182 }
2183
2184 pub fn set_compression_level(
2192 &mut self,
2193 compression_level: CompressionLevel,
2194 ) -> CompressionLevel {
2195 let old = self.compression_level;
2196 self.compression_level = compression_level;
2197 self.strategy_override = None;
2199 self.state.matcher.clear_param_overrides();
2200 old
2201 }
2202
2203 pub fn compression_level(&self) -> CompressionLevel {
2205 self.compression_level
2206 }
2207
2208 pub fn set_dictionary(
2215 &mut self,
2216 dictionary: crate::decoding::Dictionary,
2217 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2218 self.attach_dictionary(EncoderDictionary::from_dictionary(dictionary))
2219 }
2220
2221 pub fn set_dictionary_from_bytes(
2227 &mut self,
2228 raw_dictionary: &[u8],
2229 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2230 self.attach_dictionary(EncoderDictionary::from_bytes(raw_dictionary)?)
2231 }
2232
2233 pub fn set_encoder_dictionary(
2243 &mut self,
2244 dictionary: EncoderDictionary,
2245 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2246 self.attach_dictionary(dictionary)
2247 }
2248
2249 pub fn clear_dictionary(&mut self) -> Option<EncoderDictionary> {
2251 self.dictionary_entropy_cache = None;
2252 self.state.matcher.invalidate_primed_dictionary();
2256 self.dictionary.take()
2257 }
2258
2259 fn attach_dictionary(
2264 &mut self,
2265 enc: EncoderDictionary,
2266 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2267 let dictionary = &enc.inner;
2268 if dictionary.id == 0 {
2269 return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
2270 }
2271 if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
2272 return Err(
2273 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
2274 index: index as u8,
2275 },
2276 );
2277 }
2278 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(dictionary));
2279 self.state.matcher.invalidate_primed_dictionary();
2283 Ok(self.dictionary.replace(enc))
2284 }
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289 use alloc::format;
2294 use alloc::vec;
2295
2296 use super::FrameCompressor;
2297 use crate::blocks::block::BlockType;
2298 use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
2299 use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
2300 use crate::encoding::{Matcher, Sequence};
2301 use alloc::vec::Vec;
2302
2303 fn generate_data(seed: u64, len: usize) -> Vec<u8> {
2304 let mut state = seed;
2305 let mut data = Vec::with_capacity(len);
2306 for _ in 0..len {
2307 state = state
2308 .wrapping_mul(6364136223846793005)
2309 .wrapping_add(1442695040888963407);
2310 data.push((state >> 33) as u8);
2311 }
2312 data
2313 }
2314
2315 fn first_block_type(frame: &[u8]) -> BlockType {
2316 let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
2317 let mut decoder = block_decoder::new();
2318 let (header, _) = decoder
2319 .read_block_header(&frame[header_size as usize..])
2320 .expect("block header should parse");
2321 header.block_type
2322 }
2323
2324 #[cfg(feature = "std")]
2326 #[test]
2327 fn fcs_header_written_and_c_zstd_compatible() {
2328 let levels = [
2329 crate::encoding::CompressionLevel::Uncompressed,
2330 crate::encoding::CompressionLevel::Fastest,
2331 crate::encoding::CompressionLevel::Default,
2332 crate::encoding::CompressionLevel::Better,
2333 crate::encoding::CompressionLevel::Best,
2334 ];
2335 let fcs_2byte = vec![0xCDu8; 300]; let large = vec![0xABu8; 100_000];
2337 let inputs: [&[u8]; 5] = [
2338 &[],
2339 &[0x00],
2340 b"abcdefghijklmnopqrstuvwxy\n",
2341 &fcs_2byte,
2342 &large,
2343 ];
2344 for level in levels {
2345 for data in &inputs {
2346 let compressed = crate::encoding::compress_to_vec(*data, level);
2347 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
2349 .unwrap()
2350 .0;
2351 assert_eq!(
2352 header.frame_content_size(),
2353 data.len() as u64,
2354 "FCS mismatch for len={} level={:?}",
2355 data.len(),
2356 level,
2357 );
2358 assert_ne!(
2361 header.descriptor.frame_content_size_bytes().unwrap(),
2362 0,
2363 "FCS field must be present for len={} level={:?}",
2364 data.len(),
2365 level,
2366 );
2367 let mut decoded = Vec::new();
2369 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2370 |e| {
2371 panic!(
2372 "C zstd decode failed for len={} level={level:?}: {e}",
2373 data.len()
2374 )
2375 },
2376 );
2377 assert_eq!(
2378 decoded.as_slice(),
2379 *data,
2380 "C zstd roundtrip failed for len={}",
2381 data.len()
2382 );
2383 }
2384 }
2385 }
2386
2387 #[cfg(feature = "std")]
2388 #[test]
2389 fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
2390 let data = vec![0xAB; 2047];
2391 let compressed = {
2392 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2393 compressor.set_source_size_hint(data.len() as u64);
2394 compressor.set_source(data.as_slice());
2395 let mut out = Vec::new();
2396 compressor.set_drain(&mut out);
2397 compressor.compress();
2398 out
2399 };
2400
2401 let mut decoded = Vec::new();
2402 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2403 assert_eq!(decoded, data);
2404 }
2405
2406 #[cfg(feature = "std")]
2407 #[test]
2408 fn small_hinted_default_frame_uses_single_segment_header() {
2409 let data = generate_data(0xD15E_A5ED, 1024);
2410 let compressed = {
2411 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
2412 compressor.set_source_size_hint(data.len() as u64);
2413 compressor.set_source(data.as_slice());
2414 let mut out = Vec::new();
2415 compressor.set_drain(&mut out);
2416 compressor.compress();
2417 out
2418 };
2419
2420 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2421 assert!(
2422 frame_header.descriptor.single_segment_flag(),
2423 "small hinted default frames should use single-segment header for Rust/FFI parity"
2424 );
2425 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2426 let mut decoded = Vec::new();
2427 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
2428 .expect("ffi decoder must accept single-segment small hinted default frame");
2429 assert_eq!(decoded, data);
2430 }
2431
2432 #[cfg(feature = "std")]
2433 #[test]
2434 fn small_hinted_numeric_default_levels_use_single_segment_header() {
2435 let data = generate_data(0xA11C_E003, 1024);
2436 for level in [
2437 super::CompressionLevel::Level(0),
2438 super::CompressionLevel::Level(3),
2439 ] {
2440 let compressed = {
2441 let mut compressor = FrameCompressor::new(level);
2442 compressor.set_source_size_hint(data.len() as u64);
2443 compressor.set_source(data.as_slice());
2444 let mut out = Vec::new();
2445 compressor.set_drain(&mut out);
2446 compressor.compress();
2447 out
2448 };
2449
2450 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2451 assert!(
2452 frame_header.descriptor.single_segment_flag(),
2453 "small hinted numeric default level frames should use single-segment header (level={level:?})"
2454 );
2455 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2456 let mut decoded = Vec::new();
2457 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
2458 panic!(
2459 "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
2460 )
2461 });
2462 assert_eq!(decoded, data);
2463 }
2464 }
2465
2466 #[cfg(feature = "std")]
2467 #[test]
2468 fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
2469 let levels = [
2470 super::CompressionLevel::Fastest,
2471 super::CompressionLevel::Default,
2472 super::CompressionLevel::Better,
2473 super::CompressionLevel::Best,
2474 super::CompressionLevel::Level(-1),
2475 super::CompressionLevel::Level(2),
2476 super::CompressionLevel::Level(3),
2477 super::CompressionLevel::Level(4),
2478 super::CompressionLevel::Level(11),
2479 ];
2480 let sizes = [
2481 511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
2482 ];
2483
2484 for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
2485 for &size in &sizes {
2486 let data = generate_data(seed + seed_idx as u64, size);
2487 for &level in &levels {
2488 let compressed = {
2489 let mut compressor = FrameCompressor::new(level);
2490 compressor.set_source_size_hint(data.len() as u64);
2491 compressor.set_source(data.as_slice());
2492 let mut out = Vec::new();
2493 compressor.set_drain(&mut out);
2494 compressor.compress();
2495 out
2496 };
2497 if matches!(size, 511 | 512) {
2498 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2499 assert_eq!(
2500 frame_header.descriptor.single_segment_flag(),
2501 size == 512,
2502 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
2503 );
2504 }
2505
2506 let mut decoded = Vec::new();
2507 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2508 |e| {
2509 panic!(
2510 "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
2511 seed + seed_idx as u64
2512 )
2513 },
2514 );
2515 assert_eq!(
2516 decoded,
2517 data,
2518 "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
2519 seed + seed_idx as u64
2520 );
2521 }
2522 }
2523 }
2524 }
2525
2526 #[cfg(feature = "std")]
2527 #[test]
2528 fn hinted_levels_use_single_segment_header_symmetrically() {
2529 let levels = [
2530 super::CompressionLevel::Fastest,
2531 super::CompressionLevel::Default,
2532 super::CompressionLevel::Better,
2533 super::CompressionLevel::Best,
2534 super::CompressionLevel::Level(0),
2535 super::CompressionLevel::Level(2),
2536 super::CompressionLevel::Level(3),
2537 super::CompressionLevel::Level(4),
2538 super::CompressionLevel::Level(11),
2539 ];
2540 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
2541 let size = 1024 + seed_idx * 97;
2542 let data = generate_data(seed, size);
2543 for &level in &levels {
2544 let compressed = {
2545 let mut compressor = FrameCompressor::new(level);
2546 compressor.set_source_size_hint(data.len() as u64);
2547 compressor.set_source(data.as_slice());
2548 let mut out = Vec::new();
2549 compressor.set_drain(&mut out);
2550 compressor.compress();
2551 out
2552 };
2553 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2554 assert!(
2555 frame_header.descriptor.single_segment_flag(),
2556 "hinted frame should be single-segment for level={level:?} size={}",
2557 data.len()
2558 );
2559 assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2560 let mut decoded = Vec::new();
2561 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
2562 panic!(
2563 "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
2564 data.len()
2565 )
2566 });
2567 assert_eq!(decoded, data);
2568 }
2569 }
2570 }
2571
2572 #[cfg(feature = "std")]
2573 #[test]
2574 fn hinted_levels_pin_511_512_single_segment_boundary() {
2575 let levels = [
2576 super::CompressionLevel::Fastest,
2577 super::CompressionLevel::Default,
2578 super::CompressionLevel::Better,
2579 super::CompressionLevel::Best,
2580 super::CompressionLevel::Level(0),
2581 super::CompressionLevel::Level(2),
2582 super::CompressionLevel::Level(3),
2583 super::CompressionLevel::Level(4),
2584 super::CompressionLevel::Level(11),
2585 ];
2586 for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
2587 for &size in &[511usize, 512] {
2588 let data = generate_data(seed + seed_idx as u64, size);
2589 for &level in &levels {
2590 let compressed = {
2591 let mut compressor = FrameCompressor::new(level);
2592 compressor.set_source_size_hint(data.len() as u64);
2593 compressor.set_source(data.as_slice());
2594 let mut out = Vec::new();
2595 compressor.set_drain(&mut out);
2596 compressor.compress();
2597 out
2598 };
2599 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2600 assert_eq!(
2601 frame_header.descriptor.single_segment_flag(),
2602 size == 512,
2603 "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
2604 );
2605 let mut decoded = Vec::new();
2606 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2607 |e| {
2608 panic!(
2609 "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
2610 seed + seed_idx as u64
2611 )
2612 },
2613 );
2614 assert_eq!(decoded, data);
2615 }
2616 }
2617 }
2618 }
2619
2620 #[cfg(feature = "std")]
2621 #[test]
2622 fn fastest_random_block_uses_raw_fast_path() {
2623 let data = generate_data(0xC0FF_EE11, 10 * 1024);
2624 let compressed =
2625 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
2626
2627 assert_eq!(first_block_type(&compressed), BlockType::Raw);
2628
2629 let mut decoded = Vec::new();
2630 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2631 assert_eq!(decoded, data);
2632 }
2633
2634 #[cfg(feature = "std")]
2635 #[test]
2636 fn default_random_block_uses_raw_fast_path() {
2637 let data = generate_data(0xD15E_A5ED, 10 * 1024);
2638 let compressed =
2639 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
2640
2641 assert_eq!(first_block_type(&compressed), BlockType::Raw);
2642
2643 let mut decoded = Vec::new();
2644 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2645 assert_eq!(decoded, data);
2646 }
2647
2648 #[cfg(feature = "std")]
2649 #[test]
2650 fn best_random_block_uses_raw_fast_path() {
2651 let data = generate_data(0xB35C_AFE1, 10 * 1024);
2652 let compressed =
2653 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
2654
2655 assert_eq!(first_block_type(&compressed), BlockType::Raw);
2656
2657 let mut decoded = Vec::new();
2658 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2659 assert_eq!(decoded, data);
2660 }
2661
2662 #[cfg(feature = "std")]
2663 #[test]
2664 fn level2_random_block_uses_raw_fast_path() {
2665 let data = generate_data(0xA11C_E222, 10 * 1024);
2666 let compressed =
2667 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
2668
2669 assert_eq!(first_block_type(&compressed), BlockType::Raw);
2670
2671 let mut decoded = Vec::new();
2672 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2673 assert_eq!(decoded, data);
2674 }
2675
2676 #[cfg(feature = "std")]
2677 #[test]
2678 fn better_random_block_uses_raw_fast_path() {
2679 let data = generate_data(0xBE77_E111, 10 * 1024);
2680 let compressed =
2681 crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
2682
2683 assert_eq!(first_block_type(&compressed), BlockType::Raw);
2684
2685 let mut decoded = Vec::new();
2686 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2687 assert_eq!(decoded, data);
2688 }
2689
2690 #[cfg(feature = "std")]
2691 #[test]
2692 fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
2693 let mut data = Vec::with_capacity(16 * 1024);
2694 const LINE: &[u8] =
2695 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
2696 while data.len() < 16 * 1024 {
2697 let remaining = 16 * 1024 - data.len();
2698 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
2699 }
2700
2701 fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
2702 let compressed = crate::encoding::compress_to_vec(data, level);
2703 assert_ne!(first_block_type(&compressed), BlockType::Raw);
2704 assert!(
2705 compressed.len() < data.len(),
2706 "compressible input should remain compressible for level={level:?}"
2707 );
2708 let mut decoded = Vec::new();
2709 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2710 assert_eq!(decoded, data);
2711 }
2712
2713 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
2714 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
2715 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
2716 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
2717 assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
2718 }
2719
2720 #[cfg(feature = "std")]
2721 #[test]
2722 fn hinted_small_compressible_frames_use_single_segment_across_levels() {
2723 let mut data = Vec::with_capacity(4 * 1024);
2724 const LINE: &[u8] =
2725 b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
2726 while data.len() < 4 * 1024 {
2727 let remaining = 4 * 1024 - data.len();
2728 data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
2729 }
2730
2731 for level in [
2732 super::CompressionLevel::Fastest,
2733 super::CompressionLevel::Default,
2734 super::CompressionLevel::Better,
2735 super::CompressionLevel::Best,
2736 super::CompressionLevel::Level(0),
2737 super::CompressionLevel::Level(3),
2738 super::CompressionLevel::Level(4),
2739 super::CompressionLevel::Level(11),
2740 ] {
2741 let compressed = {
2742 let mut compressor = FrameCompressor::new(level);
2743 compressor.set_source_size_hint(data.len() as u64);
2744 compressor.set_source(data.as_slice());
2745 let mut out = Vec::new();
2746 compressor.set_drain(&mut out);
2747 compressor.compress();
2748 out
2749 };
2750 let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2751 assert!(
2752 frame_header.descriptor.single_segment_flag(),
2753 "hinted small compressible frame should use single-segment (level={level:?})"
2754 );
2755 assert_ne!(
2756 first_block_type(&compressed),
2757 BlockType::Raw,
2758 "compressible hinted frame should stay off raw fast path (level={level:?})"
2759 );
2760 assert!(
2761 compressed.len() < data.len(),
2762 "compressible hinted frame should still shrink (level={level:?})"
2763 );
2764 let mut decoded = Vec::new();
2765 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
2766 .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
2767 assert_eq!(decoded, data);
2768 }
2769 }
2770
2771 struct NoDictionaryMatcher {
2772 last_space: Vec<u8>,
2773 window_size: u64,
2774 }
2775
2776 impl NoDictionaryMatcher {
2777 fn new(window_size: u64) -> Self {
2778 Self {
2779 last_space: Vec::new(),
2780 window_size,
2781 }
2782 }
2783 }
2784
2785 impl Matcher for NoDictionaryMatcher {
2786 fn get_next_space(&mut self) -> Vec<u8> {
2787 vec![0; self.window_size as usize]
2788 }
2789
2790 fn get_last_space(&mut self) -> &[u8] {
2791 self.last_space.as_slice()
2792 }
2793
2794 fn commit_space(&mut self, space: Vec<u8>) {
2795 self.last_space = space;
2796 }
2797
2798 fn skip_matching(&mut self) {}
2799
2800 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
2801 handle_sequence(Sequence::Literals {
2802 literals: self.last_space.as_slice(),
2803 });
2804 }
2805
2806 fn reset(&mut self, _level: super::CompressionLevel) {
2807 self.last_space.clear();
2808 }
2809
2810 fn window_size(&self) -> u64 {
2811 self.window_size
2812 }
2813 }
2814
2815 #[test]
2816 fn frame_starts_with_magic_num() {
2817 let mock_data = [1_u8, 2, 3].as_slice();
2818 let mut output: Vec<u8> = Vec::new();
2819 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2820 compressor.set_source(mock_data);
2821 compressor.set_drain(&mut output);
2822
2823 compressor.compress();
2824 assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
2825 }
2826
2827 #[test]
2828 fn very_simple_raw_compress() {
2829 let mock_data = [1_u8, 2, 3].as_slice();
2830 let mut output: Vec<u8> = Vec::new();
2831 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2832 compressor.set_source(mock_data);
2833 compressor.set_drain(&mut output);
2834
2835 compressor.compress();
2836 }
2837
2838 #[test]
2839 fn very_simple_compress() {
2840 let mut mock_data = vec![0; 1 << 17];
2841 mock_data.extend(vec![1; (1 << 17) - 1]);
2842 mock_data.extend(vec![2; (1 << 18) - 1]);
2843 mock_data.extend(vec![2; 1 << 17]);
2844 mock_data.extend(vec![3; (1 << 17) - 1]);
2845 let mut output: Vec<u8> = Vec::new();
2846 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2847 compressor.set_source(mock_data.as_slice());
2848 compressor.set_drain(&mut output);
2849
2850 compressor.compress();
2851
2852 let mut decoder = FrameDecoder::new();
2853 let mut decoded = Vec::with_capacity(mock_data.len());
2854 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2855 assert_eq!(mock_data, decoded);
2856
2857 let mut decoded = Vec::new();
2858 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
2859 assert_eq!(mock_data, decoded);
2860 }
2861
2862 #[test]
2863 fn rle_compress() {
2864 let mock_data = vec![0; 1 << 19];
2865 let mut output: Vec<u8> = Vec::new();
2866 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2867 compressor.set_source(mock_data.as_slice());
2868 compressor.set_drain(&mut output);
2869
2870 compressor.compress();
2871
2872 let mut decoder = FrameDecoder::new();
2873 let mut decoded = Vec::with_capacity(mock_data.len());
2874 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2875 assert_eq!(mock_data, decoded);
2876 }
2877
2878 #[test]
2879 fn aaa_compress() {
2880 let mock_data = vec![0, 1, 3, 4, 5];
2881 let mut output: Vec<u8> = Vec::new();
2882 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2883 compressor.set_source(mock_data.as_slice());
2884 compressor.set_drain(&mut output);
2885
2886 compressor.compress();
2887
2888 let mut decoder = FrameDecoder::new();
2889 let mut decoded = Vec::with_capacity(mock_data.len());
2890 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2891 assert_eq!(mock_data, decoded);
2892
2893 let mut decoded = Vec::new();
2894 zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
2895 assert_eq!(mock_data, decoded);
2896 }
2897
2898 #[test]
2899 fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
2900 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2901 let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2902 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2903
2904 let mut data = Vec::new();
2905 for _ in 0..8 {
2906 data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
2907 }
2908
2909 let mut with_dict = Vec::new();
2910 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2911 let previous = compressor
2912 .set_dictionary_from_bytes(dict_raw)
2913 .expect("dictionary bytes should parse");
2914 assert!(
2915 previous.is_none(),
2916 "first dictionary insert should return None"
2917 );
2918 assert_eq!(
2919 compressor
2920 .set_dictionary(dict_for_encoder)
2921 .expect("valid dictionary should attach")
2922 .expect("set_dictionary_from_bytes inserted previous dictionary")
2923 .id(),
2924 dict_for_decoder.id
2925 );
2926 compressor.set_source(data.as_slice());
2927 compressor.set_drain(&mut with_dict);
2928 compressor.compress();
2929
2930 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2931 .expect("encoded stream should have a frame header");
2932 assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
2933
2934 let mut decoder = FrameDecoder::new();
2935 let mut missing_dict_target = Vec::with_capacity(data.len());
2936 let err = decoder
2937 .decode_all_to_vec(&with_dict, &mut missing_dict_target)
2938 .unwrap_err();
2939 assert!(
2940 matches!(
2941 &err,
2942 crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
2943 ),
2944 "dict-compressed stream should require dictionary id, got: {err:?}"
2945 );
2946
2947 let mut decoder = FrameDecoder::new();
2948 decoder.add_dict(dict_for_decoder).unwrap();
2949 let mut decoded = Vec::with_capacity(data.len());
2950 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2951 assert_eq!(decoded, data);
2952
2953 let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
2954 let mut ffi_decoded = Vec::with_capacity(data.len());
2955 let ffi_written = ffi_decoder
2956 .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
2957 .unwrap();
2958 assert_eq!(ffi_written, data.len());
2959 assert_eq!(ffi_decoded, data);
2960 }
2961
2962 #[cfg(all(feature = "dict_builder", feature = "std"))]
2963 #[test]
2964 fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
2965 use std::io::Cursor;
2966
2967 let mut training = Vec::new();
2968 for idx in 0..256u32 {
2969 training.extend_from_slice(
2970 format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
2971 );
2972 }
2973 let mut raw_dict = Vec::new();
2974 crate::dictionary::create_raw_dict_from_source(
2975 Cursor::new(training.as_slice()),
2976 training.len(),
2977 &mut raw_dict,
2978 4096,
2979 )
2980 .expect("dict_builder training should succeed");
2981 assert!(
2982 !raw_dict.is_empty(),
2983 "dict_builder produced an empty dictionary"
2984 );
2985
2986 let dict_id = 0xD1C7_0008;
2987 let encoder_dict =
2988 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2989 let decoder_dict =
2990 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2991
2992 let mut payload = Vec::new();
2993 for idx in 0..96u32 {
2994 payload.extend_from_slice(
2995 format!(
2996 "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
2997 )
2998 .as_bytes(),
2999 );
3000 }
3001
3002 let mut without_dict = Vec::new();
3003 let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
3004 baseline.set_source(payload.as_slice());
3005 baseline.set_drain(&mut without_dict);
3006 baseline.compress();
3007
3008 let mut with_dict = Vec::new();
3009 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3010 compressor
3011 .set_dictionary(encoder_dict)
3012 .expect("valid dict_builder dictionary should attach");
3013 compressor.set_source(payload.as_slice());
3014 compressor.set_drain(&mut with_dict);
3015 compressor.compress();
3016
3017 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
3018 .expect("encoded stream should have a frame header");
3019 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
3020 let mut decoder = FrameDecoder::new();
3021 decoder.add_dict(decoder_dict).unwrap();
3022 let mut decoded = Vec::with_capacity(payload.len());
3023 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
3024 assert_eq!(decoded, payload);
3025 assert!(
3026 with_dict.len() < without_dict.len(),
3027 "trained dictionary should improve compression for this small payload"
3028 );
3029 }
3030
3031 #[test]
3032 fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
3033 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3034 let mut output = Vec::new();
3035 let input = b"";
3036
3037 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3038 let previous = compressor
3039 .set_dictionary_from_bytes(dict_raw)
3040 .expect("dictionary bytes should parse");
3041 assert!(previous.is_none());
3042
3043 compressor.set_source(input.as_slice());
3044 compressor.set_drain(&mut output);
3045 compressor.compress();
3046
3047 assert!(
3048 compressor.state.last_huff_table.is_some(),
3049 "dictionary entropy should seed previous huffman table before first block"
3050 );
3051 assert!(
3052 compressor.state.fse_tables.ll_previous.is_some(),
3053 "dictionary entropy should seed previous ll table before first block"
3054 );
3055 assert!(
3056 compressor.state.fse_tables.ml_previous.is_some(),
3057 "dictionary entropy should seed previous ml table before first block"
3058 );
3059 assert!(
3060 compressor.state.fse_tables.of_previous.is_some(),
3061 "dictionary entropy should seed previous of table before first block"
3062 );
3063 }
3064
3065 #[test]
3069 fn content_size_flag_off_omits_fcs_and_roundtrips() {
3070 let payload = alloc::vec![0x42u8; 4096];
3071
3072 let mut compressor: FrameCompressor =
3073 FrameCompressor::new(super::CompressionLevel::Fastest);
3074 let mut with_fcs = Vec::new();
3075 compressor.compress_independent_frame_into(&payload, &mut with_fcs);
3076
3077 compressor.set_content_size_flag(false);
3078 let mut without_fcs = Vec::new();
3079 compressor.compress_independent_frame_into(&payload, &mut without_fcs);
3080
3081 let parsed_with = crate::decoding::frame::read_frame_header(with_fcs.as_slice())
3082 .expect("flag-on frame header must parse")
3083 .0;
3084 assert_eq!(parsed_with.frame_content_size(), 4096);
3085
3086 let parsed_without = crate::decoding::frame::read_frame_header(without_fcs.as_slice())
3087 .expect("flag-off frame header must parse")
3088 .0;
3089 assert_eq!(
3091 parsed_without.frame_content_size(),
3092 0,
3093 "FCS must be omitted with the content-size flag off"
3094 );
3095 assert_eq!(
3098 parsed_without
3099 .descriptor
3100 .frame_content_size_bytes()
3101 .expect("descriptor must parse"),
3102 0,
3103 "the FCS field itself must be omitted, not written as zero"
3104 );
3105
3106 let mut decoder = crate::decoding::FrameDecoder::new();
3107 let mut decoded = Vec::with_capacity(payload.len() + 64);
3110 decoder
3111 .decode_all_to_vec(&without_fcs, &mut decoded)
3112 .expect("flag-off frame must decode");
3113 assert_eq!(decoded, payload);
3114 }
3115
3116 #[test]
3120 fn dict_id_flag_off_omits_dictionary_id_and_roundtrips() {
3121 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3122 let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(8);
3123
3124 let mut compressor: FrameCompressor =
3125 FrameCompressor::new(super::CompressionLevel::Fastest);
3126 compressor
3127 .set_dictionary_from_bytes(dict_raw)
3128 .expect("dictionary bytes should parse");
3129 compressor.set_dictionary_id_flag(false);
3130 let mut frame = Vec::new();
3131 compressor.compress_independent_frame_into(&payload, &mut frame);
3132
3133 let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
3134 .expect("frame header must parse")
3135 .0;
3136 assert_eq!(
3137 parsed.dictionary_id(),
3138 None,
3139 "dictionary id must be omitted with the dict-id flag off"
3140 );
3141
3142 let mut sd = crate::decoding::StreamingDecoder::new_with_dictionary_bytes(
3145 frame.as_slice(),
3146 dict_raw,
3147 )
3148 .expect("decoder must accept the dictionary");
3149 let mut dec = Vec::new();
3150 std::io::Read::read_to_end(&mut sd, &mut dec)
3151 .expect("frame must decode with the dictionary handed explicitly");
3152 assert_eq!(dec, payload);
3153 }
3154
3155 #[test]
3161 fn compressible_stream_output_capacity_stays_at_output_scale() {
3162 let line = b"ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo\n";
3164 let mut input = Vec::with_capacity(4 << 20);
3165 while input.len() < (4 << 20) {
3166 let take = line.len().min((4 << 20) - input.len());
3167 input.extend_from_slice(&line[..take]);
3168 }
3169
3170 let mut compressor: FrameCompressor =
3171 FrameCompressor::new(super::CompressionLevel::Fastest);
3172 let mut out = Vec::new();
3173 compressor.compress_independent_frame_into(&input, &mut out);
3174
3175 assert!(!out.is_empty());
3176 assert!(
3177 out.capacity() < input.len() / 4,
3178 "capacity {} must stay at output scale (input {}, output {})",
3179 out.capacity(),
3180 input.len(),
3181 out.len()
3182 );
3183
3184 let mut decoder = crate::decoding::FrameDecoder::new();
3186 let mut decoded = Vec::with_capacity(input.len() + 64);
3187 decoder
3188 .decode_all_to_vec(&out, &mut decoded)
3189 .expect("frame must decode");
3190 assert_eq!(decoded, input);
3191 }
3192
3193 #[test]
3198 fn dict_frame_with_known_size_is_single_segment() {
3199 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3200 let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(64);
3201
3202 let mut compressor: FrameCompressor =
3203 FrameCompressor::new(super::CompressionLevel::Fastest);
3204 compressor
3205 .set_dictionary_from_bytes(dict_raw)
3206 .expect("dictionary bytes should parse");
3207 let mut frame = Vec::new();
3208 compressor.compress_independent_frame_into(&payload, &mut frame);
3209
3210 let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
3211 .expect("frame header must parse")
3212 .0;
3213 assert!(
3214 parsed.descriptor.single_segment_flag(),
3215 "dict frame with known size <= window must be single-segment"
3216 );
3217 assert!(parsed.dictionary_id().is_some());
3218 assert_eq!(parsed.frame_content_size(), payload.len() as u64);
3219
3220 let mut decoder = crate::decoding::FrameDecoder::new();
3222 decoder
3223 .add_dict_from_bytes(dict_raw)
3224 .expect("decoder must accept the dictionary");
3225 let mut decoded = Vec::with_capacity(payload.len() + 64);
3226 decoder
3227 .decode_all_to_vec(&frame, &mut decoded)
3228 .expect("single-segment dict frame must decode");
3229 assert_eq!(decoded, payload);
3230
3231 let ffi_decoded = zstd::bulk::Decompressor::with_dictionary(dict_raw)
3236 .expect("reference decompressor must accept the dictionary")
3237 .decompress(&frame, payload.len() + 64)
3238 .expect("reference zstd must accept the single-segment dict frame");
3239 assert_eq!(ffi_decoded, payload);
3240 }
3241
3242 #[test]
3247 fn heap_size_counts_active_and_spare_huffman_tables() {
3248 let mut compressor: FrameCompressor =
3249 FrameCompressor::new(super::CompressionLevel::Fastest);
3250 let base = compressor.heap_size();
3251
3252 let active = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
3253 b"abacabadabacabaeabacabadabacaba",
3254 );
3255 let active_bytes = active.heap_size();
3256 assert!(active_bytes > 0, "built table must own heap buffers");
3257 compressor.state.last_huff_table = Some(active);
3258 assert_eq!(
3259 compressor.heap_size(),
3260 base + active_bytes,
3261 "heap_size must include the active last_huff_table"
3262 );
3263
3264 let spare = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
3265 b"the quick brown fox jumps over the lazy dog",
3266 );
3267 let spare_bytes = spare.heap_size();
3268 assert!(spare_bytes > 0, "built table must own heap buffers");
3269 compressor.state.huff_table_spare = Some(spare);
3270 assert_eq!(
3271 compressor.heap_size(),
3272 base + active_bytes + spare_bytes,
3273 "heap_size must include the parked huff_table_spare"
3274 );
3275 }
3276
3277 #[test]
3278 fn set_encoder_dictionary_reattaches_prepared_dict_without_reparse() {
3279 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3280 let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3281 tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3282
3283 let prepared =
3286 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3287 let dict_id = prepared.id();
3288
3289 let mut with_dict = Vec::new();
3290 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3291 let previous = compressor
3292 .set_encoder_dictionary(prepared)
3293 .expect("prepared dictionary should attach");
3294 assert!(previous.is_none());
3295 compressor.set_source(payload.as_slice());
3296 compressor.set_drain(&mut with_dict);
3297 compressor.compress();
3298 let returned = compressor
3301 .clear_dictionary()
3302 .expect("dictionary was attached");
3303 assert_eq!(returned.id(), dict_id);
3304
3305 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
3308 .expect("encoded stream should have a frame header");
3309 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
3310 let decoder_dict = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
3311 let mut decoder = FrameDecoder::new();
3312 decoder.add_dict(decoder_dict).unwrap();
3313 let mut decoded = Vec::with_capacity(payload.len());
3314 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
3315 assert_eq!(decoded.as_slice(), payload.as_slice());
3316
3317 let mut with_dict2 = Vec::new();
3321 let mut compressor2 = FrameCompressor::new(super::CompressionLevel::Fastest);
3322 compressor2
3323 .set_encoder_dictionary(returned)
3324 .expect("returned dictionary should reattach");
3325 compressor2.set_source(payload.as_slice());
3326 compressor2.set_drain(&mut with_dict2);
3327 compressor2.compress();
3328 assert_eq!(
3329 with_dict2, with_dict,
3330 "reattached prepared dict must produce an identical frame"
3331 );
3332 }
3333
3334 #[test]
3335 fn dict_primed_matcher_snapshot_reused_across_frames_is_byte_identical() {
3336 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3343 let mut payload = Vec::new();
3348 while payload.len() < 16 * 1024 {
3349 payload.extend_from_slice(
3350 b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n",
3351 );
3352 }
3353
3354 let prepared =
3355 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3356 let dict_id = prepared.id();
3357 let mut compressor: FrameCompressor =
3358 FrameCompressor::new(super::CompressionLevel::Fastest);
3359 compressor
3360 .set_encoder_dictionary(prepared)
3361 .expect("prepared dictionary should attach");
3362
3363 let frame1 = compressor.compress_independent_frame(payload.as_slice());
3365 let frame2 = compressor.compress_independent_frame(payload.as_slice());
3366 assert_eq!(
3367 frame1, frame2,
3368 "restored prime snapshot must reproduce the freshly-primed frame byte-for-byte"
3369 );
3370
3371 for frame in [&frame1, &frame2] {
3374 let (hdr, _) =
3375 crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3376 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3377 let mut decoder = FrameDecoder::new();
3378 decoder
3379 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3380 .unwrap();
3381 let mut decoded = Vec::with_capacity(payload.len());
3382 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3383 assert_eq!(decoded.as_slice(), payload.as_slice());
3384 }
3385 }
3386
3387 #[test]
3388 fn dict_primed_matcher_cache_reused_across_small_attach_frames_is_byte_identical() {
3389 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3397 let mut payload = Vec::new();
3400 while payload.len() < 2 * 1024 {
3401 payload.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3402 }
3403
3404 let prepared =
3405 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3406 let dict_id = prepared.id();
3407 let mut compressor: FrameCompressor =
3408 FrameCompressor::new(super::CompressionLevel::Fastest);
3409 compressor
3410 .set_encoder_dictionary(prepared)
3411 .expect("prepared dictionary should attach");
3412
3413 let frame1 = compressor.compress_independent_frame(payload.as_slice());
3415 let frame2 = compressor.compress_independent_frame(payload.as_slice());
3416 assert_eq!(
3417 frame1, frame2,
3418 "reused dict table (attach path) must reproduce the freshly-built frame byte-for-byte"
3419 );
3420
3421 let fresh_prepared =
3424 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3425 let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3426 fresh
3427 .set_encoder_dictionary(fresh_prepared)
3428 .expect("prepared dictionary should attach");
3429 let fresh_frame = fresh.compress_independent_frame(payload.as_slice());
3430 assert_eq!(
3431 fresh_frame, frame1,
3432 "cold-cache compressor must match the warm-cache frame byte-for-byte"
3433 );
3434
3435 for frame in [&frame1, &frame2] {
3436 let (hdr, _) =
3437 crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3438 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3439 let mut decoder = FrameDecoder::new();
3440 decoder
3441 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3442 .unwrap();
3443 let mut decoded = Vec::with_capacity(payload.len());
3444 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3445 assert_eq!(decoded.as_slice(), payload.as_slice());
3446 }
3447 }
3448
3449 #[test]
3450 fn dict_fast_epoch_reset_many_frames_and_attach_copy_alternation_byte_identical() {
3451 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3460 let mut small = Vec::new();
3461 while small.len() < 2 * 1024 {
3462 small.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3463 }
3464 let mut large = Vec::new();
3466 while large.len() < 64 * 1024 {
3467 large.extend_from_slice(b"tenant=demo op=scan range=[k0,k9) limit=500 order=asc\n");
3468 }
3469
3470 let mut reused: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3471 reused
3472 .set_encoder_dictionary(
3473 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse"),
3474 )
3475 .expect("prepared dictionary should attach");
3476
3477 let reference = |payload: &[u8]| -> alloc::vec::Vec<u8> {
3478 let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3479 fresh
3480 .set_encoder_dictionary(
3481 super::EncoderDictionary::from_bytes(dict_raw)
3482 .expect("dict bytes should parse"),
3483 )
3484 .expect("prepared dictionary should attach");
3485 fresh.compress_independent_frame(payload)
3486 };
3487
3488 let small_expected = reference(&small);
3489 let large_expected = reference(&large);
3490
3491 for i in 0..32 {
3493 let frame = reused.compress_independent_frame(small.as_slice());
3494 assert_eq!(
3495 frame, small_expected,
3496 "attach frame {i} diverged from the fresh-compressor reference"
3497 );
3498 }
3499 for i in 0..4 {
3501 let frame = reused.compress_independent_frame(large.as_slice());
3502 assert_eq!(
3503 frame, large_expected,
3504 "copy frame {i} diverged from the fresh-compressor reference"
3505 );
3506 let frame = reused.compress_independent_frame(small.as_slice());
3507 assert_eq!(
3508 frame, small_expected,
3509 "attach frame after copy {i} diverged from the fresh-compressor reference"
3510 );
3511 }
3512 }
3513
3514 #[test]
3515 fn dict_primed_btlazy2_reused_across_attach_and_copy_boundary_is_byte_identical() {
3516 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3524 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3525 .expect("dict bytes should parse")
3526 .id();
3527 let make_payload = |target: usize| {
3530 let mut p = Vec::with_capacity(target);
3531 let mut i = 0u64;
3532 while p.len() < target {
3533 p.extend_from_slice(
3534 format!(
3535 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3536 i % 97
3537 )
3538 .as_bytes(),
3539 );
3540 i += 1;
3541 }
3542 p
3543 };
3544 for target in [16 * 1024usize, 64 * 1024usize] {
3546 let payload = make_payload(target);
3547 let mut warm: FrameCompressor =
3548 FrameCompressor::new(super::CompressionLevel::Level(15));
3549 warm.set_encoder_dictionary(
3550 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3551 )
3552 .expect("dict attach");
3553 let frame1 = warm.compress_independent_frame(payload.as_slice());
3555 let frame2 = warm.compress_independent_frame(payload.as_slice());
3556 assert_eq!(
3557 frame1, frame2,
3558 "reused dict cache must reproduce the freshly-primed frame byte-for-byte \
3559 (Level 15, target={target})"
3560 );
3561 let mut cold: FrameCompressor =
3563 FrameCompressor::new(super::CompressionLevel::Level(15));
3564 cold.set_encoder_dictionary(
3565 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3566 )
3567 .expect("dict attach");
3568 let cold_frame = cold.compress_independent_frame(payload.as_slice());
3569 assert_eq!(
3570 cold_frame, frame1,
3571 "cold-cache compressor must match warm-cache frame (Level 15, target={target})"
3572 );
3573 for frame in [&frame1, &frame2] {
3575 let (hdr, _) = crate::decoding::frame::read_frame_header(frame.as_slice())
3576 .expect("frame header");
3577 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3578 let mut decoder = FrameDecoder::new();
3579 decoder
3580 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3581 .unwrap();
3582 let mut decoded = Vec::with_capacity(payload.len());
3583 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3584 assert_eq!(decoded.as_slice(), payload.as_slice());
3585 }
3586 }
3587 }
3588
3589 #[test]
3590 fn dict_primed_btultra2_restore_is_floor_safe_and_byte_identical() {
3591 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3606 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3607 .expect("dict bytes should parse")
3608 .id();
3609 let make_payload = |seed: u64, target: usize| {
3613 let mut p = Vec::with_capacity(target);
3614 let mut i = seed;
3615 while p.len() < target {
3616 p.extend_from_slice(
3617 format!(
3618 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3619 i % 89
3620 )
3621 .as_bytes(),
3622 );
3623 i = i.wrapping_add(1);
3624 }
3625 p.truncate(target);
3626 p
3627 };
3628 let size = 48 * 1024usize;
3629 let frame_a = make_payload(0, size);
3630 let frame_b = make_payload(1_000_000, size);
3631
3632 let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3633 warm.set_encoder_dictionary(
3634 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3635 )
3636 .expect("dict attach");
3637 let _wa = warm.compress_independent_frame(frame_a.as_slice());
3640 let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3642
3643 let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3645 cold.set_encoder_dictionary(
3646 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3647 )
3648 .expect("dict attach");
3649 let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3650
3651 assert_eq!(
3652 warm_b, cold_b,
3653 "frame B via snapshot restore must be byte-identical to a cold compress \
3654 (a restore that leaks frame-A live-table entries would diverge here)"
3655 );
3656
3657 let (hdr, _) =
3659 crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3660 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3661 let mut decoder = FrameDecoder::new();
3662 decoder
3663 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3664 .unwrap();
3665 let mut decoded = Vec::with_capacity(frame_b.len());
3666 decoder
3667 .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3668 .unwrap();
3669 assert_eq!(decoded.as_slice(), frame_b.as_slice());
3670 }
3671
3672 #[test]
3673 fn dict_primed_btultra2_ldm_restore_is_byte_identical() {
3674 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3682 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3683 .expect("dict bytes should parse")
3684 .id();
3685 let make_payload = |seed: u64, target: usize| {
3686 let mut p = Vec::with_capacity(target);
3687 let mut i = seed;
3688 while p.len() < target {
3689 p.extend_from_slice(
3690 format!(
3691 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3692 i % 89
3693 )
3694 .as_bytes(),
3695 );
3696 i = i.wrapping_add(1);
3697 }
3698 p.truncate(target);
3699 p
3700 };
3701 let ldm_params =
3702 crate::encoding::CompressionParameters::builder(super::CompressionLevel::Level(22))
3703 .enable_long_distance_matching(true)
3704 .build()
3705 .expect("LDM-only params build");
3706 let size = 48 * 1024usize;
3707 let frame_a = make_payload(0, size);
3708 let frame_b = make_payload(1_000_000, size);
3709
3710 let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3711 warm.set_parameters(&ldm_params);
3712 warm.set_encoder_dictionary(
3713 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3714 )
3715 .expect("dict attach");
3716 let _wa = warm.compress_independent_frame(frame_a.as_slice());
3717 let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3718
3719 let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3720 cold.set_parameters(&ldm_params);
3721 cold.set_encoder_dictionary(
3722 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3723 )
3724 .expect("dict attach");
3725 let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3726
3727 assert_eq!(
3728 warm_b, cold_b,
3729 "LDM-on frame B via snapshot restore must be byte-identical to a cold compress"
3730 );
3731
3732 let (hdr, _) =
3733 crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3734 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3735 let mut decoder = FrameDecoder::new();
3736 decoder
3737 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3738 .unwrap();
3739 let mut decoded = Vec::with_capacity(frame_b.len());
3740 decoder
3741 .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3742 .unwrap();
3743 assert_eq!(decoded.as_slice(), frame_b.as_slice());
3744 }
3745
3746 #[test]
3747 fn set_dictionary_from_bytes_matches_full_decode_byte_for_byte() {
3748 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3757 let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3758 tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3759
3760 let mut from_bytes_out = Vec::new();
3762 {
3763 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3764 compressor
3765 .set_dictionary_from_bytes(dict_raw)
3766 .expect("dictionary bytes should parse");
3767 compressor.set_source(payload.as_slice());
3768 compressor.set_drain(&mut from_bytes_out);
3769 compressor.compress();
3770 }
3771
3772 let full_decode = crate::decoding::Dictionary::decode_dict(dict_raw)
3775 .expect("dictionary bytes should fully decode");
3776 let mut full_decode_out = Vec::new();
3777 {
3778 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3779 compressor
3780 .set_dictionary(full_decode)
3781 .expect("full-decode dictionary should attach");
3782 compressor.set_source(payload.as_slice());
3783 compressor.set_drain(&mut full_decode_out);
3784 compressor.compress();
3785 }
3786
3787 assert_eq!(
3788 from_bytes_out, full_decode_out,
3789 "encoder-only dict parse must produce byte-identical output to the full decode"
3790 );
3791 }
3792
3793 #[test]
3794 fn set_dictionary_rejects_zero_dictionary_id() {
3795 let invalid = crate::decoding::Dictionary {
3796 id: 0,
3797 fse: crate::decoding::scratch::FSEScratch::new(),
3798 huf: crate::decoding::scratch::HuffmanScratch::new(),
3799 dict_content: vec![1, 2, 3],
3800 offset_hist: [1, 4, 8],
3801 };
3802
3803 let mut compressor: FrameCompressor<
3804 &[u8],
3805 Vec<u8>,
3806 crate::encoding::match_generator::MatchGeneratorDriver,
3807 > = FrameCompressor::new(super::CompressionLevel::Fastest);
3808 let result = compressor.set_dictionary(invalid);
3809 assert!(matches!(
3810 result,
3811 Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
3812 ));
3813 }
3814
3815 #[test]
3816 fn set_dictionary_rejects_zero_repeat_offsets() {
3817 let invalid = crate::decoding::Dictionary {
3818 id: 1,
3819 fse: crate::decoding::scratch::FSEScratch::new(),
3820 huf: crate::decoding::scratch::HuffmanScratch::new(),
3821 dict_content: vec![1, 2, 3],
3822 offset_hist: [0, 4, 8],
3823 };
3824
3825 let mut compressor: FrameCompressor<
3826 &[u8],
3827 Vec<u8>,
3828 crate::encoding::match_generator::MatchGeneratorDriver,
3829 > = FrameCompressor::new(super::CompressionLevel::Fastest);
3830 let result = compressor.set_dictionary(invalid);
3831 assert!(matches!(
3832 result,
3833 Err(
3834 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
3835 index: 0
3836 }
3837 )
3838 ));
3839 }
3840
3841 #[test]
3842 fn uncompressed_mode_does_not_require_dictionary() {
3843 let dict_id = 0xABCD_0001;
3844 let dict =
3845 crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
3846 .expect("raw dictionary should be valid");
3847
3848 let payload = b"plain-bytes-that-should-stay-raw";
3849 let mut output = Vec::new();
3850 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
3851 compressor
3852 .set_dictionary(dict)
3853 .expect("dictionary should attach in uncompressed mode");
3854 compressor.set_source(payload.as_slice());
3855 compressor.set_drain(&mut output);
3856 compressor.compress();
3857
3858 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3859 .expect("encoded frame should have a header");
3860 assert_eq!(
3861 frame_header.dictionary_id(),
3862 None,
3863 "raw/uncompressed frames must not advertise dictionary dependency"
3864 );
3865
3866 let mut decoder = FrameDecoder::new();
3867 let mut decoded = Vec::with_capacity(payload.len());
3868 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3869 assert_eq!(decoded, payload);
3870 }
3871
3872 #[test]
3873 fn default_level_tiny_raw_dict_compresses_cleanly() {
3874 let dict_id = 0xABCD_0009;
3884 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3885 .expect("raw dictionary should be valid");
3886 let payload = b"the quick brown fox jumps over the lazy dog, repeatedly and at length";
3887 let mut output = Vec::new();
3888 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
3889 compressor
3890 .set_dictionary(dict)
3891 .expect("tiny raw dictionary should attach");
3892 compressor.set_source(payload.as_slice());
3893 compressor.set_drain(&mut output);
3894 compressor.compress();
3895 assert!(!output.is_empty(), "compression should produce a frame");
3896
3897 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3901 .expect("encoded frame should have a readable header");
3902 assert_eq!(
3903 frame_header.dictionary_id(),
3904 Some(dict_id),
3905 "tiny raw dict frame should still advertise its dictionary id",
3906 );
3907
3908 let decode_dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3913 .expect("raw dictionary should be valid");
3914 let mut decoder = FrameDecoder::new();
3915 decoder
3916 .add_dict(decode_dict)
3917 .expect("decoder dict should attach");
3918 let mut decoded = Vec::with_capacity(payload.len());
3919 decoder
3920 .decode_all_to_vec(&output, &mut decoded)
3921 .expect("dict roundtrip should decode");
3922 assert_eq!(decoded, payload, "tiny-dict roundtrip mismatch");
3923 }
3924
3925 #[test]
3933 fn dict_attach_roundtrips_across_backends_with_matching_payload() {
3934 let dict_id = 0xD1C7_0001;
3935 let line = |i: u32| {
3943 alloc::format!(
3944 "ts=2026-03-26T21:{:02}:{:02}Z level=INFO msg=\"event {i:05}\" tenant=t{i} region=eu\n",
3945 i / 60 % 60,
3946 i % 60,
3947 )
3948 .into_bytes()
3949 };
3950 let mut dict_content = Vec::new();
3951 for i in 0..256u32 {
3952 dict_content.extend_from_slice(&line(i));
3953 }
3954 let mut payload = Vec::new();
3957 let mut i = 0u32;
3958 for _ in 0..256u32 {
3959 payload.extend_from_slice(&line(i));
3960 i = (i + 97) % 256; }
3962
3963 let compress_at = |level, dict: Option<Vec<u8>>| -> Vec<u8> {
3964 let mut compressor = FrameCompressor::new(level);
3965 if let Some(bytes) = dict {
3966 let d = crate::decoding::Dictionary::from_raw_content(dict_id, bytes)
3967 .expect("raw dictionary should be valid");
3968 compressor
3969 .set_dictionary(d)
3970 .expect("dictionary should attach");
3971 }
3972 let mut out = Vec::new();
3973 compressor.set_source(payload.as_slice());
3974 compressor.set_drain(&mut out);
3975 compressor.compress();
3976 out
3977 };
3978
3979 for level in [
3980 super::CompressionLevel::Level(-5), super::CompressionLevel::Level(1), super::CompressionLevel::Default, super::CompressionLevel::Level(8), ] {
3985 let out = compress_at(level, Some(dict_content.clone()));
3986 let no_dict = compress_at(level, None);
3987 assert!(
3991 out.len() < no_dict.len(),
3992 "level {level:?}: dict-primed output ({}) must beat no-dict ({}) — dict probe did not fire",
3993 out.len(),
3994 no_dict.len(),
3995 );
3996
3997 let ddict =
3998 crate::decoding::Dictionary::from_raw_content(dict_id, dict_content.clone())
3999 .expect("raw dictionary should be valid");
4000 let mut decoder = FrameDecoder::new();
4001 decoder.add_dict(ddict).expect("decoder dict should attach");
4002 let mut decoded = Vec::with_capacity(payload.len());
4003 decoder
4004 .decode_all_to_vec(&out, &mut decoded)
4005 .unwrap_or_else(|e| panic!("level {level:?}: dict roundtrip decode failed: {e:?}"));
4006 assert_eq!(decoded, payload, "level {level:?}: dict roundtrip mismatch");
4007 }
4008 }
4009
4010 #[test]
4016 fn dict_swap_across_reused_compressor_roundtrips() {
4017 let lines = |tag: &str| -> (Vec<u8>, Vec<u8>) {
4025 let line =
4026 |i: u32| alloc::format!("{tag} record {i:05} field=value{i} end\n").into_bytes();
4027 let mut dict = Vec::new();
4028 for i in 0..256u32 {
4029 dict.extend_from_slice(&line(i));
4030 }
4031 let mut payload = Vec::new();
4032 let mut i = 0u32;
4033 for _ in 0..256u32 {
4034 payload.extend_from_slice(&line(i));
4035 i = (i + 97) % 256;
4036 }
4037 (dict, payload)
4038 };
4039 let (dict_a, payload_a) = lines("alpha");
4040 let (dict_b, payload_b) = lines("bravo");
4041
4042 for level in [
4043 super::CompressionLevel::Default,
4044 super::CompressionLevel::Level(8),
4045 ] {
4046 let no_dict = |payload: &[u8]| -> usize {
4047 let mut c: FrameCompressor = FrameCompressor::new(level);
4048 c.compress_independent_frame(payload).len()
4049 };
4050 let no_dict_a = no_dict(&payload_a);
4051 let no_dict_b = no_dict(&payload_b);
4052
4053 let mut compressor: FrameCompressor = FrameCompressor::new(level);
4054 for (dict_bytes, payload, no_dict_len) in [
4055 (&dict_a, &payload_a, no_dict_a),
4056 (&dict_b, &payload_b, no_dict_b),
4057 ] {
4058 let dict =
4059 crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
4060 .expect("raw dictionary should be valid");
4061 compressor
4062 .set_dictionary(dict)
4063 .expect("dictionary should attach");
4064 let out = compressor.compress_independent_frame(payload.as_slice());
4065 assert!(
4066 out.len() < no_dict_len,
4067 "level {level:?}: dict frame ({}) must beat no-dict ({}) — dict probe did not fire",
4068 out.len(),
4069 no_dict_len,
4070 );
4071
4072 let ddict =
4073 crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
4074 .expect("raw dictionary should be valid");
4075 let mut decoder = FrameDecoder::new();
4076 decoder.add_dict(ddict).expect("decoder dict should attach");
4077 let mut decoded = Vec::with_capacity(payload.len());
4078 decoder
4079 .decode_all_to_vec(&out, &mut decoded)
4080 .unwrap_or_else(|e| panic!("level {level:?}: dict-swap decode failed: {e:?}"));
4081 assert_eq!(
4082 decoded, *payload,
4083 "level {level:?}: dict-swap roundtrip mismatch (stale dict rows?)"
4084 );
4085 }
4086 }
4087 }
4088
4089 #[test]
4090 fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
4091 use crate::encoding::match_generator::MatchGeneratorDriver;
4092
4093 let dict_id = 0xABCD_0002;
4094 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4095 .expect("raw dictionary should be valid");
4096 let dict_for_decoder =
4097 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4098 .expect("raw dictionary should be valid");
4099
4100 let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
4105 let matcher = MatchGeneratorDriver::new(1024, 1);
4106
4107 let mut no_dict_output = Vec::new();
4108 let mut no_dict_compressor =
4109 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4110 no_dict_compressor.set_source(payload.as_slice());
4111 no_dict_compressor.set_drain(&mut no_dict_output);
4112 no_dict_compressor.compress();
4113 let (no_dict_frame_header, _) =
4114 crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
4115 .expect("baseline frame should have a header");
4116 let no_dict_window = no_dict_frame_header
4117 .window_size()
4118 .expect("window size should be present");
4119
4120 let mut output = Vec::new();
4121 let matcher = MatchGeneratorDriver::new(1024, 1);
4122 let mut compressor =
4123 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4124 compressor
4125 .set_dictionary(dict)
4126 .expect("dictionary should attach");
4127 compressor.set_source(payload.as_slice());
4128 compressor.set_drain(&mut output);
4129 compressor.compress();
4130
4131 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4132 .expect("encoded frame should have a header");
4133 let advertised_window = frame_header
4134 .window_size()
4135 .expect("window size should be present");
4136 assert_eq!(
4137 advertised_window, no_dict_window,
4138 "dictionary priming must not inflate advertised window size"
4139 );
4140 assert!(
4141 payload.len() > advertised_window as usize,
4142 "test must cross the advertised window boundary"
4143 );
4144
4145 let mut decoder = FrameDecoder::new();
4146 decoder.add_dict(dict_for_decoder).unwrap();
4147 let mut decoded = Vec::with_capacity(payload.len());
4148 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4149 assert_eq!(decoded, payload);
4150 }
4151
4152 #[test]
4153 fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
4154 let dict_id = 0xABCD_0004;
4155 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
4157 let dict_for_decoder =
4158 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
4159 let payload = b"abcdabcdabcdabcd".repeat(128);
4160
4161 let mut hinted_output = Vec::new();
4162 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
4163 hinted.set_dictionary(dict).unwrap();
4164 hinted.set_source_size_hint(1);
4165 hinted.set_source(payload.as_slice());
4166 hinted.set_drain(&mut hinted_output);
4167 hinted.compress();
4168
4169 let mut no_hint_output = Vec::new();
4170 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
4171 no_hint
4172 .set_dictionary(
4173 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
4174 .unwrap(),
4175 )
4176 .unwrap();
4177 no_hint.set_source(payload.as_slice());
4178 no_hint.set_drain(&mut no_hint_output);
4179 no_hint.compress();
4180
4181 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4182 .expect("encoded frame should have a header")
4183 .0
4184 .window_size()
4185 .expect("window size should be present");
4186 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4187 .expect("encoded frame should have a header")
4188 .0
4189 .window_size()
4190 .expect("window size should be present");
4191 assert!(
4192 hinted_window <= no_hint_window,
4193 "source-size hint should not increase advertised window with dictionary priming",
4194 );
4195
4196 let mut decoder = FrameDecoder::new();
4197 decoder.add_dict(dict_for_decoder).unwrap();
4198 let mut decoded = Vec::with_capacity(payload.len());
4199 decoder
4200 .decode_all_to_vec(&hinted_output, &mut decoded)
4201 .unwrap();
4202 assert_eq!(decoded, payload);
4203 }
4204
4205 #[test]
4212 fn dictionary_segment_in_incompressible_input_is_matched() {
4213 fn lcg(seed: u64, n: usize) -> alloc::vec::Vec<u8> {
4216 let mut s = seed;
4217 (0..n)
4218 .map(|_| {
4219 s = s
4220 .wrapping_mul(6364136223846793005)
4221 .wrapping_add(1442695040888963407);
4222 (s >> 56) as u8
4223 })
4224 .collect()
4225 }
4226 let dict_id = 0x00DC_7777;
4227 let r = lcg(1, 512); let mut payload = lcg(2, 2000); payload.extend_from_slice(&r); payload.extend_from_slice(&lcg(3, 1500)); assert!(
4237 crate::encoding::incompressible::block_looks_incompressible(&payload),
4238 "test payload must look incompressible to exercise the raw-fast-path",
4239 );
4240
4241 let compress =
4242 |level: super::CompressionLevel, dict: Option<&[u8]>| -> alloc::vec::Vec<u8> {
4243 let mut out = alloc::vec::Vec::new();
4244 let mut c = FrameCompressor::new(level);
4245 if let Some(d) = dict {
4246 c.set_dictionary(
4247 crate::decoding::Dictionary::from_raw_content(dict_id, d.to_vec()).unwrap(),
4248 )
4249 .unwrap();
4250 }
4251 c.set_source(payload.as_slice());
4252 c.set_drain(&mut out);
4253 c.compress();
4254 out
4255 };
4256
4257 for lvl in [
4258 super::CompressionLevel::Level(2),
4259 super::CompressionLevel::Level(6),
4260 super::CompressionLevel::Level(19),
4261 ] {
4262 let with_dict = compress(lvl, Some(&r));
4263 let no_dict = compress(lvl, None);
4264 assert!(
4267 with_dict.len() + 300 < no_dict.len(),
4268 "{lvl:?}: dict segment not matched (with_dict={}, no_dict={})",
4269 with_dict.len(),
4270 no_dict.len(),
4271 );
4272 let mut decoder = FrameDecoder::new();
4274 decoder
4275 .add_dict(
4276 crate::decoding::Dictionary::from_raw_content(dict_id, r.clone()).unwrap(),
4277 )
4278 .unwrap();
4279 let mut decoded = Vec::with_capacity(payload.len());
4280 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
4281 assert_eq!(decoded, payload, "{lvl:?}: dict round-trip mismatch");
4282
4283 let unrelated = lcg(99, 512);
4287 let with_bad_dict = compress(lvl, Some(&unrelated));
4288 assert!(
4289 with_bad_dict.len() <= no_dict.len() + 16,
4290 "{lvl:?}: unhelpful dict expanded output (with={}, no_dict={})",
4291 with_bad_dict.len(),
4292 no_dict.len(),
4293 );
4294 }
4295 }
4296
4297 #[test]
4298 fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
4299 let dict_id = 0xABCD_0005;
4300 let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
4302 let dict_for_decoder =
4303 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
4304 let payload = b"abcd".repeat(1024); let payload_len = payload.len() as u64;
4306
4307 let mut hinted_output = Vec::new();
4308 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
4309 hinted.set_dictionary(dict).unwrap();
4310 hinted.set_source_size_hint(payload_len);
4311 hinted.set_source(payload.as_slice());
4312 hinted.set_drain(&mut hinted_output);
4313 hinted.compress();
4314
4315 let mut no_hint_output = Vec::new();
4316 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
4317 no_hint
4318 .set_dictionary(
4319 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
4320 .unwrap(),
4321 )
4322 .unwrap();
4323 no_hint.set_source(payload.as_slice());
4324 no_hint.set_drain(&mut no_hint_output);
4325 no_hint.compress();
4326
4327 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4328 .expect("encoded frame should have a header")
4329 .0
4330 .window_size()
4331 .expect("window size should be present");
4332 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4333 .expect("encoded frame should have a header")
4334 .0
4335 .window_size()
4336 .expect("window size should be present");
4337 assert!(
4338 hinted_window <= no_hint_window,
4339 "source-size hint should not increase advertised window with dictionary priming",
4340 );
4341
4342 let mut decoder = FrameDecoder::new();
4343 decoder.add_dict(dict_for_decoder).unwrap();
4344 let mut decoded = Vec::with_capacity(payload.len());
4345 decoder
4346 .decode_all_to_vec(&hinted_output, &mut decoded)
4347 .unwrap();
4348 assert_eq!(decoded, payload);
4349 }
4350
4351 #[test]
4352 fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
4353 let dict_id = 0xABCD_0003;
4354 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4355 .expect("raw dictionary should be valid");
4356 let payload = b"abcdefghabcdefgh";
4357
4358 let mut output = Vec::new();
4359 let matcher = NoDictionaryMatcher::new(64);
4360 let mut compressor =
4361 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4362 compressor
4363 .set_dictionary(dict)
4364 .expect("dictionary should attach");
4365 compressor.set_source(payload.as_slice());
4366 compressor.set_drain(&mut output);
4367 compressor.compress();
4368
4369 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4370 .expect("encoded frame should have a header");
4371 assert_eq!(
4372 frame_header.dictionary_id(),
4373 None,
4374 "matchers that do not support dictionary priming must not advertise dictionary dependency"
4375 );
4376
4377 let mut decoder = FrameDecoder::new();
4378 let mut decoded = Vec::with_capacity(payload.len());
4379 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4380 assert_eq!(decoded, payload);
4381 }
4382
4383 #[cfg(feature = "hash")]
4384 #[test]
4385 fn checksum_two_frames_reused_compressor() {
4386 let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
4392
4393 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
4394
4395 let mut compressed1 = Vec::new();
4397 compressor.set_source(data.as_slice());
4398 compressor.set_drain(&mut compressed1);
4399 compressor.compress();
4400
4401 let mut compressed2 = Vec::new();
4403 compressor.set_source(data.as_slice());
4404 compressor.set_drain(&mut compressed2);
4405 compressor.compress();
4406
4407 fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
4408 let mut decoder = FrameDecoder::new();
4409 let mut source = compressed;
4410 decoder.reset(&mut source).unwrap();
4411 while !decoder.is_finished() {
4412 decoder
4413 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4414 .unwrap();
4415 }
4416 let mut decoded = Vec::new();
4417 decoder.collect_to_writer(&mut decoded).unwrap();
4418 (
4419 decoded,
4420 decoder.get_checksum_from_data(),
4421 decoder.get_calculated_checksum(),
4422 )
4423 }
4424
4425 let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
4426 assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
4427 assert_eq!(
4428 chksum_from_data1, chksum_calculated1,
4429 "frame 1: checksum mismatch"
4430 );
4431
4432 let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
4433 assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
4434 assert_eq!(
4435 chksum_from_data2, chksum_calculated2,
4436 "frame 2: checksum mismatch"
4437 );
4438
4439 assert_eq!(
4442 chksum_from_data1, chksum_from_data2,
4443 "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
4444 );
4445 }
4446
4447 #[cfg(feature = "lsm")]
4448 #[test]
4449 fn frame_emit_info_decompressed_ranges_match_decoded_output() {
4450 let data = emit_info_fixture_data();
4458
4459 for level in [
4464 super::CompressionLevel::Default,
4465 super::CompressionLevel::Level(22),
4466 ] {
4467 let mut compressed = Vec::new();
4468 let mut compressor = FrameCompressor::new(level);
4469 compressor.set_source_size_hint(data.len() as u64);
4474 compressor.set_source(data.as_slice());
4475 compressor.set_drain(&mut compressed);
4476 compressor.compress();
4477
4478 let info = compressor
4479 .last_frame_emit_info()
4480 .expect("emit info populated after compress")
4481 .clone();
4482
4483 let mut decoder = FrameDecoder::new();
4485 let mut source = compressed.as_slice();
4486 decoder.reset(&mut source).unwrap();
4487 while !decoder.is_finished() {
4488 decoder
4489 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4490 .unwrap();
4491 }
4492 let mut decoded = Vec::new();
4493 decoder.collect_to_writer(&mut decoded).unwrap();
4494 assert_eq!(decoded, data, "sanity: frame must round-trip ({level:?})");
4495
4496 assert!(
4497 info.blocks.len() >= 2,
4498 "fixture must span multiple blocks to exercise the mapping ({level:?}, got {})",
4499 info.blocks.len()
4500 );
4501 assert!(
4502 info.blocks.last().unwrap().last_block,
4503 "final block must carry last_block ({level:?})"
4504 );
4505
4506 if matches!(level, super::CompressionLevel::Level(22)) {
4512 let max_block = crate::common::MAX_BLOCK_SIZE as usize;
4513 let n_chunks = data.len().div_ceil(max_block);
4514 assert!(
4515 info.blocks.len() > n_chunks,
4516 "Level(22) must exercise post-split: {} blocks for {} input chunks",
4517 info.blocks.len(),
4518 n_chunks
4519 );
4520 }
4521
4522 let mut expected_start = 0u64;
4524 for i in 0..info.blocks.len() {
4525 let range = info
4526 .decompressed_byte_range(i)
4527 .expect("in-bounds block has a range");
4528 assert_eq!(
4529 range.start, expected_start,
4530 "block {i} range must start where the previous ended ({level:?})"
4531 );
4532 assert_eq!(
4533 u64::from(info.blocks[i].decompressed_size),
4534 range.end - range.start,
4535 "block {i} decompressed_size must equal its range width ({level:?})"
4536 );
4537 let mut psrc = compressed.as_slice();
4543 let mut pdec = FrameDecoder::new();
4544 pdec.reset(&mut psrc).unwrap();
4545 let pd = pdec
4546 .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4547 .unwrap();
4548 assert!(
4549 pd.stopped_at.is_none(),
4550 "block {i} must decode cleanly ({level:?})"
4551 );
4552 assert_eq!(
4553 pd.data.as_slice(),
4554 &decoded[range.start as usize..range.end as usize],
4555 "block {i} partial-decode bytes must equal the full-decode slice ({level:?})"
4556 );
4557 expected_start = range.end;
4558 }
4559 assert_eq!(
4560 expected_start,
4561 decoded.len() as u64,
4562 "block decompressed sizes must sum to the full decoded length ({level:?})"
4563 );
4564 assert_eq!(
4565 info.decompressed_byte_range(info.blocks.len()),
4566 None,
4567 "out-of-range index yields None ({level:?})"
4568 );
4569 }
4570 }
4571
4572 #[cfg(feature = "lsm")]
4575 fn emit_info_fixture_data() -> Vec<u8> {
4576 let mut data: Vec<u8> = Vec::with_capacity(400 * 1024);
4577 let mut x = 0x9E37_79B9u32;
4578 while data.len() < 400 * 1024 {
4579 x ^= x << 13;
4580 x ^= x >> 17;
4581 x ^= x << 5;
4582 let run = 16 + (x as usize % 48);
4583 let byte = (x >> 24) as u8;
4584 for _ in 0..run {
4585 data.push(byte);
4586 }
4587 data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4588 }
4589 data
4590 }
4591
4592 #[cfg(feature = "lsm")]
4593 #[test]
4594 fn frame_emit_info_decompressed_ranges_match_on_borrowed_oneshot_path() {
4595 let data = emit_info_fixture_data();
4602
4603 let mut compressor: FrameCompressor =
4604 FrameCompressor::new(super::CompressionLevel::Fastest);
4605 let compressed = compressor.compress_independent_frame(data.as_slice());
4606 let info = compressor
4607 .last_frame_emit_info()
4608 .expect("emit info populated after compress_independent_frame")
4609 .clone();
4610 assert!(
4615 info.blocks
4616 .iter()
4617 .any(|b| matches!(b.block_type, crate::blocks::block::BlockType::Compressed)),
4618 "borrowed-path fixture must emit at least one compressed block"
4619 );
4620 assert!(
4621 info.blocks.len() >= 2,
4622 "borrowed fixture must span multiple blocks (got {})",
4623 info.blocks.len()
4624 );
4625 assert!(info.blocks.last().unwrap().last_block);
4626
4627 let mut decoder = FrameDecoder::new();
4629 let mut source = compressed.as_slice();
4630 decoder.reset(&mut source).unwrap();
4631 while !decoder.is_finished() {
4632 decoder
4633 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4634 .unwrap();
4635 }
4636 let mut decoded = Vec::new();
4637 decoder.collect_to_writer(&mut decoded).unwrap();
4638 assert_eq!(decoded, data, "borrowed one-shot frame must round-trip");
4639
4640 let mut expected_start = 0u64;
4642 for i in 0..info.blocks.len() {
4643 let range = info.decompressed_byte_range(i).unwrap();
4644 assert_eq!(range.start, expected_start, "block {i} range contiguity");
4645 let mut psrc = compressed.as_slice();
4646 let mut pdec = FrameDecoder::new();
4647 pdec.reset(&mut psrc).unwrap();
4648 let pd = pdec
4649 .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4650 .unwrap();
4651 assert!(pd.stopped_at.is_none(), "block {i} must decode cleanly");
4652 assert_eq!(
4653 pd.data.as_slice(),
4654 &decoded[range.start as usize..range.end as usize],
4655 "borrowed block {i} partial-decode bytes must equal the full-decode slice"
4656 );
4657 expected_start = range.end;
4658 }
4659 assert_eq!(
4660 expected_start,
4661 decoded.len() as u64,
4662 "ranges sum to full length"
4663 );
4664 }
4665
4666 #[cfg(feature = "std")]
4667 #[test]
4668 fn fuzz_targets() {
4669 use std::io::Read;
4670 fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
4671 let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
4672 let mut result: Vec<u8> = Vec::new();
4673 decoder.read_to_end(&mut result).expect("Decoding failed");
4674 result
4675 }
4676
4677 fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
4678 let mut decoder = crate::decoding::FrameDecoder::new();
4679 decoder.reset(&mut data).unwrap();
4680 let mut result = vec![];
4681 while !decoder.is_finished() || decoder.can_collect() > 0 {
4682 decoder
4683 .decode_blocks(
4684 &mut data,
4685 crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
4686 )
4687 .unwrap();
4688 decoder.collect_to_writer(&mut result).unwrap();
4689 }
4690 result
4691 }
4692
4693 fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
4694 zstd::stream::encode_all(std::io::Cursor::new(data), 3)
4695 }
4696
4697 fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
4698 let mut input = Vec::new();
4699 data.read_to_end(&mut input).unwrap();
4700
4701 crate::encoding::compress_to_vec(
4702 input.as_slice(),
4703 crate::encoding::CompressionLevel::Uncompressed,
4704 )
4705 }
4706
4707 fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
4708 let mut input = Vec::new();
4709 data.read_to_end(&mut input).unwrap();
4710
4711 crate::encoding::compress_to_vec(
4712 input.as_slice(),
4713 crate::encoding::CompressionLevel::Fastest,
4714 )
4715 }
4716
4717 fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
4718 let mut output = Vec::new();
4719 zstd::stream::copy_decode(data, &mut output)?;
4720 Ok(output)
4721 }
4722 if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
4723 for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
4724 if file.as_ref().unwrap().file_type().unwrap().is_file() {
4725 let data = std::fs::read(file.unwrap().path()).unwrap();
4726 let data = data.as_slice();
4727 let compressed = encode_zstd(data).unwrap();
4729 let decoded = decode_szstd(&mut compressed.as_slice());
4730 let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
4731 assert!(
4732 decoded == data,
4733 "Decoded data did not match the original input during decompression"
4734 );
4735 assert_eq!(
4736 decoded2, data,
4737 "Decoded data did not match the original input during decompression"
4738 );
4739
4740 let mut input = data;
4743 let compressed = encode_szstd_uncompressed(&mut input);
4744 let decoded = decode_zstd(&compressed).unwrap();
4745 assert_eq!(
4746 decoded, data,
4747 "Decoded data did not match the original input during compression"
4748 );
4749 let mut input = data;
4751 let compressed = encode_szstd_compressed(&mut input);
4752 let decoded = decode_zstd(&compressed).unwrap();
4753 assert_eq!(
4754 decoded, data,
4755 "Decoded data did not match the original input during compression"
4756 );
4757 }
4758 }
4759 }
4760 }
4761
4762 #[test]
4768 fn split_block_from_borders_keeps_homogeneous_block() {
4769 let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
4770 let split = super::split_block_from_borders(&block);
4771 assert_eq!(split, MAX_BLOCK_SIZE as usize);
4772 }
4773
4774 #[test]
4788 fn split_block_from_borders_returns_midpoint_for_centred_transition() {
4789 let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
4790 for (i, byte) in block
4791 .iter_mut()
4792 .enumerate()
4793 .skip(MAX_BLOCK_SIZE as usize / 2)
4794 {
4795 *byte = (i % 251 + 1) as u8;
4796 }
4797 let split = super::split_block_from_borders(&block);
4798 assert_eq!(
4799 split,
4800 64 * 1024,
4801 "centred-transition fixture must take the symmetric \
4802 midpoint arm (`abs_diff < min_distance`), got {split}"
4803 );
4804 }
4805
4806 #[test]
4813 fn pre_split_level_dispatches_by_compression_level() {
4814 use crate::encoding::CompressionLevel;
4815 use crate::encoding::match_generator::level_pre_split;
4816 assert_eq!(level_pre_split(CompressionLevel::Uncompressed), None);
4817 assert_eq!(level_pre_split(CompressionLevel::Fastest), Some(0));
4819 assert_eq!(level_pre_split(CompressionLevel::Default), Some(1));
4821 assert_eq!(
4823 level_pre_split(CompressionLevel::Better),
4824 level_pre_split(CompressionLevel::Level(7)),
4825 );
4826 assert_eq!(
4830 level_pre_split(CompressionLevel::Best),
4831 level_pre_split(CompressionLevel::Level(13)),
4832 );
4833 assert_eq!(level_pre_split(CompressionLevel::Level(2)), Some(0)); assert_eq!(level_pre_split(CompressionLevel::Level(4)), Some(1)); assert_eq!(level_pre_split(CompressionLevel::Level(5)), Some(2)); assert_eq!(level_pre_split(CompressionLevel::Level(7)), Some(2)); assert_eq!(level_pre_split(CompressionLevel::Level(8)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(11)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(12)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(13)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(15)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(16)), Some(4)); assert_eq!(level_pre_split(CompressionLevel::Level(22)), Some(4)); }
4848
4849 #[test]
4859 fn periodic_stream_not_oversplit() {
4860 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4861 const LINES: &[&str] = &[
4862 "ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo table=orders region=eu-west\n",
4863 "ts=2026-03-26T21:39:29Z level=INFO msg=\"rotate segment\" tenant=demo table=orders region=eu-west\n",
4864 "ts=2026-03-26T21:39:30Z level=INFO msg=\"compact level\" tenant=demo table=orders region=eu-west\n",
4865 "ts=2026-03-26T21:39:31Z level=INFO msg=\"write block\" tenant=demo table=orders region=eu-west\n",
4866 ];
4867 let target = 512 * 1024usize;
4869 let mut data = Vec::with_capacity(target);
4870 let mut i = 0;
4871 while data.len() < target {
4872 let line = LINES[i % LINES.len()].as_bytes();
4873 let take = line.len().min(target - data.len());
4874 data.extend_from_slice(&line[..take]);
4875 i += 1;
4876 }
4877 let l7 = compress_slice_to_vec(&data, CompressionLevel::Level(7)); let l8 = compress_slice_to_vec(&data, CompressionLevel::Level(8)); let l15 = compress_slice_to_vec(&data, CompressionLevel::Level(15)); assert!(
4881 l8.len() < l7.len() * 2,
4882 "lazy2 over-split periodic stream: l7={} l8={}",
4883 l7.len(),
4884 l8.len()
4885 );
4886 assert!(
4887 l15.len() < l7.len() * 2,
4888 "btlazy2 over-split periodic stream: l7={} l15={}",
4889 l7.len(),
4890 l15.len()
4891 );
4892 for out in [&l7, &l8, &l15] {
4893 let mut decoder = FrameDecoder::new();
4894 let mut round = Vec::with_capacity(data.len());
4895 decoder
4896 .decode_all_to_vec(out, &mut round)
4897 .expect("decode periodic stream");
4898 assert_eq!(round, data, "periodic stream roundtrip mismatch");
4899 }
4900 }
4901
4902 #[test]
4917 fn greedy_chunk_split_roundtrips_through_own_decoder() {
4918 use crate::encoding::CompressionLevel;
4919 let mut data = vec![0u8; 256 * 1024];
4920 for (i, byte) in data.iter_mut().enumerate() {
4926 *byte = if i < 192 * 1024 {
4927 (i & 0x07) as u8
4928 } else {
4929 (i % 251 + 1) as u8
4930 };
4931 }
4932
4933 let second_block = &data[128 * 1024..];
4937 let split = super::optimal_block_size(
4938 CompressionLevel::Level(5),
4939 second_block,
4940 second_block.len(),
4941 MAX_BLOCK_SIZE as usize,
4942 100,
4943 );
4944 assert!(
4945 split < MAX_BLOCK_SIZE as usize,
4946 "second donor block must chunk-split at its intra-block transition, got {split}",
4947 );
4948
4949 let mut compressed = Vec::new();
4950 let mut compressor = FrameCompressor::new(CompressionLevel::Level(5));
4951 compressor.set_source(data.as_slice());
4952 compressor.set_drain(&mut compressed);
4953 compressor.compress();
4954
4955 let mut decoder = FrameDecoder::new();
4956 let mut source = compressed.as_slice();
4957 decoder
4958 .reset(&mut source)
4959 .expect("frame header should parse");
4960 while !decoder.is_finished() {
4961 decoder
4962 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4963 .expect("decode should succeed");
4964 }
4965 let mut decoded = Vec::with_capacity(data.len());
4966 decoder.collect_to_writer(&mut decoded).unwrap();
4967 assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4968 }
4969
4970 #[test]
4978 fn fast_oneshot_borrowed_split_emits_subblock() {
4979 use crate::encoding::CompressionLevel;
4980 let mut data = vec![0u8; 256 * 1024];
4986 for (i, byte) in data.iter_mut().enumerate() {
4987 if i >= 192 * 1024 {
4988 *byte = (i % 251 + 1) as u8;
4989 }
4990 }
4991
4992 let second_block = &data[128 * 1024..];
4996 assert!(
4997 super::optimal_block_size(
4998 CompressionLevel::Fastest,
4999 second_block,
5000 second_block.len(),
5001 MAX_BLOCK_SIZE as usize,
5002 100,
5003 ) < MAX_BLOCK_SIZE as usize,
5004 "fixture must resolve to a sub-block split in the second donor block",
5005 );
5006
5007 let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5010 let frame = compressor.compress_independent_frame(&data);
5011
5012 let mut decoder = FrameDecoder::new();
5013 let mut source = frame.as_slice();
5014 decoder
5015 .reset(&mut source)
5016 .expect("frame header should parse");
5017 while !decoder.is_finished() {
5018 decoder
5019 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
5020 .expect("decode should succeed");
5021 }
5022 let mut decoded = Vec::with_capacity(data.len());
5023 decoder.collect_to_writer(&mut decoded).unwrap();
5024 assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
5025 assert!(
5026 decoder.blocks_decoded() >= 3,
5027 "fast one-shot borrowed path must split the second donor block \
5028 (256 KiB unsplit = 2 blocks), got {} blocks",
5029 decoder.blocks_decoded(),
5030 );
5031 }
5032
5033 #[cfg(feature = "std")]
5045 #[test]
5046 fn set_compression_level_then_compress_refreshes_strategy_tag() {
5047 use super::CompressionLevel;
5048 use crate::encoding::strategy::StrategyTag;
5049
5050 let data = vec![0xABu8; 256];
5051 let mut out = Vec::new();
5052 let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
5053 let initial_tag = compressor.state.strategy_tag;
5054 assert_eq!(
5055 initial_tag,
5056 StrategyTag::for_compression_level(CompressionLevel::Fastest),
5057 "construction-time strategy_tag must reflect initial level",
5058 );
5059
5060 let new_level = CompressionLevel::Level(20);
5064 compressor.set_compression_level(new_level);
5065 compressor.set_source(data.as_slice());
5066 compressor.set_drain(&mut out);
5067 compressor.compress();
5068
5069 let new_tag = compressor.state.strategy_tag;
5070 let expected = StrategyTag::for_compression_level(new_level);
5071 assert_eq!(
5072 new_tag, expected,
5073 "strategy_tag must follow set_compression_level → compress, \
5074 got {new_tag:?} expected {expected:?}",
5075 );
5076 assert_eq!(
5077 expected,
5078 StrategyTag::BtUltra2,
5079 "test fixture invariant: Level(20) must resolve to BtUltra2 \
5080 so the post-switch tag visibly crosses the band boundary",
5081 );
5082 assert_ne!(
5083 new_tag, initial_tag,
5084 "test fixture invariant: chosen levels must resolve to \
5085 different StrategyTag variants",
5086 );
5087 }
5088
5089 #[test]
5093 fn magicless_frame_omits_magic_and_roundtrips() {
5094 use crate::common::MAGIC_NUM;
5095 let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
5096
5097 let mut output: Vec<u8> = Vec::new();
5099 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
5100 compressor.set_magicless(true);
5101 compressor.set_source(input.as_slice());
5102 compressor.set_drain(&mut output);
5103 compressor.compress();
5104
5105 assert!(
5107 !output.starts_with(&MAGIC_NUM.to_le_bytes()),
5108 "magicless frame must omit the 4-byte magic prefix",
5109 );
5110
5111 let mut decoder = crate::decoding::FrameDecoder::new();
5113 decoder.set_magicless(true);
5114 let mut cursor: &[u8] = output.as_slice();
5115 decoder.init(&mut cursor).expect("magicless init");
5116 decoder
5117 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
5118 .expect("decode_blocks");
5119 let mut decoded: Vec<u8> = Vec::new();
5120 decoder
5121 .collect_to_writer(&mut decoded)
5122 .expect("collect_to_writer");
5123 assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
5124
5125 use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
5136 let mut std_decoder = crate::decoding::FrameDecoder::new();
5137 let std_init = std_decoder.init(output.as_slice());
5138 match std_init {
5139 Err(FrameDecoderError::ReadFrameHeaderError(
5140 ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
5141 )) => {}
5142 other => panic!(
5143 "standard decoder must reject a magicless frame with \
5144 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
5145 ),
5146 }
5147 }
5148
5149 #[test]
5157 fn compress_independent_frame_reuse_matches_fresh_and_roundtrips() {
5158 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
5159 let levels = [
5160 CompressionLevel::Uncompressed,
5161 CompressionLevel::Fastest,
5162 CompressionLevel::Default,
5163 CompressionLevel::Better,
5164 CompressionLevel::Best,
5165 CompressionLevel::Level(5),
5166 ];
5167 let inputs: Vec<Vec<u8>> = vec![
5168 Vec::new(),
5169 vec![0x00],
5170 b"the quick brown fox jumps over the lazy dog\n".to_vec(),
5171 vec![0x7Eu8; 50_000], generate_data(0xABCD, 70_000), generate_data(0x1234, 200_000),
5174 ];
5175 for level in levels {
5176 let mut cctx: FrameCompressor = FrameCompressor::new(level);
5177 for data in &inputs {
5178 let reused = cctx.compress_independent_frame(data);
5179 let fresh = compress_slice_to_vec(data, level);
5180 assert_eq!(
5181 reused,
5182 fresh,
5183 "reused frame != fresh frame for len={} level={:?}",
5184 data.len(),
5185 level,
5186 );
5187 let mut decoder = FrameDecoder::new();
5188 let mut decoded = Vec::with_capacity(data.len());
5189 decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
5190 assert_eq!(
5191 decoded,
5192 *data,
5193 "roundtrip failed for len={} level={:?}",
5194 data.len(),
5195 level,
5196 );
5197 }
5198 }
5199 }
5200
5201 #[test]
5206 fn compress_independent_frame_into_replaces_buffer_contents() {
5207 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
5208 let large = vec![0x11u8; 40_000];
5209 let small = b"short payload".to_vec();
5210 let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
5211 let mut out = Vec::new();
5212 cctx.compress_independent_frame_into(&large, &mut out);
5213 let frame_large = out.clone();
5214 cctx.compress_independent_frame_into(&small, &mut out);
5216 assert_eq!(
5217 out,
5218 compress_slice_to_vec(&small, CompressionLevel::Default),
5219 "reused buffer must hold exactly the second frame",
5220 );
5221 let mut decoder = FrameDecoder::new();
5223 let mut decoded = Vec::with_capacity(large.len());
5224 decoder
5225 .decode_all_to_vec(&frame_large, &mut decoded)
5226 .unwrap();
5227 assert_eq!(decoded, large);
5228 }
5229
5230 #[test]
5237 fn compress_independent_frame_reuses_sticky_dictionary() {
5238 use crate::encoding::CompressionLevel;
5239 let dict_raw = include_bytes!("../../dict_tests/dictionary");
5240 let dict_content = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
5241 let mut payload_a = Vec::new();
5242 for _ in 0..8 {
5243 payload_a.extend_from_slice(&dict_content.dict_content[..2048]);
5244 }
5245 let payload_b = b"a different second frame payload, still dict-attached".to_vec();
5246 let inputs = [payload_a, payload_b];
5247
5248 let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5249 cctx.set_dictionary_from_bytes(dict_raw)
5250 .expect("dictionary bytes should parse");
5251
5252 for data in &inputs {
5253 let reused = cctx.compress_independent_frame(data);
5254 let mut fresh_enc: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5256 fresh_enc
5257 .set_dictionary_from_bytes(dict_raw)
5258 .expect("dictionary bytes should parse");
5259 let fresh = fresh_enc.compress_independent_frame(data);
5260 assert_eq!(
5261 reused,
5262 fresh,
5263 "reused dict frame != fresh dict frame, len={}",
5264 data.len(),
5265 );
5266 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
5268 let mut decoder = FrameDecoder::new();
5269 decoder.add_dict(dict_for_decoder).unwrap();
5270 let mut decoded = Vec::with_capacity(data.len());
5271 decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
5272 assert_eq!(&decoded, data, "dict roundtrip failed, len={}", data.len());
5273 }
5274 }
5275}