Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
21///
22/// `FrameCompressor` will generally be used by:
23/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
24/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
25///
26/// # Examples
27/// ```
28/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
29/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
30/// let mut output = std::vec::Vec::new();
31/// // Initialize a compressor.
32/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
33/// compressor.set_source(mock_data);
34/// compressor.set_drain(&mut output);
35///
36/// // `compress` writes the compressed output into the provided buffer.
37/// compressor.compress();
38/// ```
39pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
40    uncompressed_data: Option<R>,
41    compressed_data: Option<W>,
42    compression_level: CompressionLevel,
43    dictionary: Option<crate::decoding::Dictionary>,
44    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
45    source_size_hint: Option<u64>,
46    state: CompressState<M>,
47    /// When true, emitted frames omit the 4-byte magic number prefix
48    /// (`ZSTD_f_zstd1_magicless`). Default false. The caller is
49    /// responsible for ensuring the decoder is configured for the
50    /// matching format — wire-format only round-trips with a
51    /// magicless-aware decoder.
52    magicless: bool,
53    #[cfg(feature = "hash")]
54    hasher: XxHash64,
55    /// Block-layout introspection populated at the end of every
56    /// successful `compress()`. `None` until the first call.
57    /// Behind the `lsm` feature gate.
58    #[cfg(feature = "lsm")]
59    frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
60    /// When `true`, `compress()` XXH64-hashes each block's
61    /// uncompressed bytes and appends the low-32-bit digest to
62    /// `block_checksums`. Default `false` (zero cost). Gated on
63    /// `all(lsm, hash)` because XXH64 lives behind the `hash`
64    /// feature; an `lsm`-only build has no way to compute digests.
65    #[cfg(all(feature = "lsm", feature = "hash"))]
66    per_block_checksums_enabled: bool,
67    /// Per-block XXH64 (low 32 bits) digests captured during
68    /// `compress()` when `per_block_checksums_enabled` is set. Ordered
69    /// by block-emit order. `None` until the first call after enabling.
70    /// Gated on `all(lsm, hash)` (see `per_block_checksums_enabled`).
71    #[cfg(all(feature = "lsm", feature = "hash"))]
72    block_checksums: Option<alloc::vec::Vec<u32>>,
73}
74
75#[derive(Clone, Default)]
76struct CachedDictionaryEntropy {
77    huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
78    ll_previous: Option<PreviousFseTable>,
79    ml_previous: Option<PreviousFseTable>,
80    of_previous: Option<PreviousFseTable>,
81}
82
83#[derive(Clone)]
84pub(crate) enum PreviousFseTable {
85    // Default tables are immutable and already stored alongside the state, so
86    // repeating them only needs a lightweight marker instead of cloning FSETable.
87    Default,
88    Custom(Box<FSETable>),
89    Rle(u8),
90}
91
92impl PreviousFseTable {
93    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
94        match self {
95            Self::Default => Some(default),
96            Self::Custom(table) => Some(table),
97            Self::Rle(_) => None,
98        }
99    }
100}
101
102pub(crate) struct FseTables {
103    /// The three predefined LL/ML/OF tables are functions of
104    /// compile-time-constant distributions. The
105    /// [`fse_encoder::FseDefaultTable`] type alias resolves to
106    /// `&'static FSETable` when a process-wide cache is available
107    /// (atomic-pointer targets, or no-atomic targets with the
108    /// `critical-section` feature) and to `Box<FSETable>` on the
109    /// cache-less no-atomic path (one per-frame allocation, dropped
110    /// with the compressor — no `Box::leak`, no unbounded growth).
111    /// Both arms `Deref` to `FSETable`, so consumers in
112    /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
113    /// without seeing the per-target divergence.
114    pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
115    pub(crate) ll_previous: Option<PreviousFseTable>,
116    pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
117    pub(crate) ml_previous: Option<PreviousFseTable>,
118    pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
119    pub(crate) of_previous: Option<PreviousFseTable>,
120}
121
122impl FseTables {
123    pub fn new() -> Self {
124        Self {
125            ll_default: default_ll_table(),
126            ll_previous: None,
127            ml_default: default_ml_table(),
128            ml_previous: None,
129            of_default: default_of_table(),
130            of_previous: None,
131        }
132    }
133
134    /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
135    /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
136    /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
137    /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
138    /// through `Box`. Both arms yield `&FSETable` uniformly so
139    /// downstream consumers can stay cfg-agnostic.
140    #[inline]
141    #[allow(clippy::borrow_deref_ref)]
142    pub(crate) fn ll_default_ref(&self) -> &FSETable {
143        &*self.ll_default
144    }
145
146    /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
147    #[inline]
148    #[allow(clippy::borrow_deref_ref)]
149    pub(crate) fn ml_default_ref(&self) -> &FSETable {
150        &*self.ml_default
151    }
152
153    /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
154    #[inline]
155    #[allow(clippy::borrow_deref_ref)]
156    pub(crate) fn of_default_ref(&self) -> &FSETable {
157        &*self.of_default
158    }
159}
160
161const PRESPLIT_BLOCK_MIN: usize = 3500;
162const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
163const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
164const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
165const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
166const PRESPLIT_HASH_LOG_MAX: usize = 10;
167const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
168const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
169/// Donor `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
170/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
171/// drive the cheap border heuristic; a third one from the middle disambiguates
172/// where in the block the transition sits.
173const PRESPLIT_BORDERS_SEGMENT: usize = 512;
174
175#[derive(Clone)]
176struct PreSplitFingerprint {
177    events: [u32; PRESPLIT_HASH_TABLE_SIZE],
178    nb_events: usize,
179}
180
181impl Default for PreSplitFingerprint {
182    fn default() -> Self {
183        Self {
184            events: [0; PRESPLIT_HASH_TABLE_SIZE],
185            nb_events: 0,
186        }
187    }
188}
189
190fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
191    debug_assert!(hash_log >= 8);
192    if hash_log == 8 {
193        return bytes[0] as usize;
194    }
195    debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
196    let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
197    (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
198}
199
200fn presplit_record_fingerprint(
201    fp: &mut PreSplitFingerprint,
202    src: &[u8],
203    sampling_rate: usize,
204    hash_log: usize,
205) {
206    fp.events.fill(0);
207    fp.nb_events = 0;
208    if src.len() < 2 {
209        return;
210    }
211    let limit = src.len() - 1;
212    let mut n = 0usize;
213    while n < limit {
214        fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
215        n += sampling_rate;
216    }
217    // Donor parity: zstd_preSplit.c records the integer division, not the
218    // rounded-up number of sampled events from the loop above.
219    fp.nb_events += limit / sampling_rate;
220}
221
222/// Single-byte histogram pass — matches donor `HIST_add` over a small
223/// segment with `hashLog == 8` (the `hash2` shortcut at
224/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
225/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
226/// heuristic doesn't pay for that wider hash on its 512-byte windows.
227fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
228    fp.events.fill(0);
229    for &b in src {
230        fp.events[b as usize] += 1;
231    }
232    // Donor `HIST_add` returns the maximum symbol; the caller then sets
233    // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
234    fp.nb_events = src.len();
235}
236
237fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
238    let slots = 1usize << hash_log;
239    let mut distance = 0u64;
240    for idx in 0..slots {
241        let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
242        let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
243        distance = distance.saturating_add(left.abs_diff(right) as u64);
244    }
245    distance
246}
247
248fn presplit_fingerprints_differ(
249    reference: &PreSplitFingerprint,
250    new_fp: &PreSplitFingerprint,
251    penalty: i32,
252    hash_log: usize,
253) -> bool {
254    debug_assert!(reference.nb_events > 0);
255    debug_assert!(new_fp.nb_events > 0);
256    let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
257    let deviation = presplit_distance(reference, new_fp, hash_log);
258    let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
259        / PRESPLIT_THRESHOLD_PENALTY_RATE;
260    deviation >= threshold
261}
262
263fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
264    for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
265        acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
266    }
267    acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
268}
269
270fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
271    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
272    debug_assert!((1..=4).contains(&level));
273    let (sampling_rate, hash_log) = match level - 1 {
274        0 => (43, 8),
275        1 => (11, 9),
276        2 => (5, 10),
277        _ => (1, 10),
278    };
279
280    let mut past = PreSplitFingerprint::default();
281    let mut new_events = PreSplitFingerprint::default();
282    let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
283    presplit_record_fingerprint(
284        &mut past,
285        &block[..PRESPLIT_CHUNK_SIZE],
286        sampling_rate,
287        hash_log,
288    );
289    let mut pos = PRESPLIT_CHUNK_SIZE;
290    while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
291        presplit_record_fingerprint(
292            &mut new_events,
293            &block[pos..pos + PRESPLIT_CHUNK_SIZE],
294            sampling_rate,
295            hash_log,
296        );
297        if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
298            return pos;
299        }
300        presplit_merge_events(&mut past, &new_events);
301        if penalty > 0 {
302            penalty -= 1;
303        }
304        pos += PRESPLIT_CHUNK_SIZE;
305    }
306    block.len()
307}
308
309/// Donor port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
310/// Records two 512-byte byte-histograms — one from each end of a 128 KB
311/// block — and a third from the middle as a tie-breaker; returns either
312/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
313/// size when the two ends look indistinguishable. Cheaper than the
314/// chunk-based path because it touches at most 1.5 KB of input
315/// regardless of block size.
316fn donor_split_block_from_borders(block: &[u8]) -> usize {
317    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
318    let block_size = block.len();
319    let mut past = PreSplitFingerprint::default();
320    let mut new_fp = PreSplitFingerprint::default();
321    presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
322    presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
323    // Donor uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
324    // distance with no threshold padding (`zstd_preSplit.c:214`).
325    if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
326        return block_size;
327    }
328
329    let mut middle = PreSplitFingerprint::default();
330    let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
331    presplit_record_byte_histogram(
332        &mut middle,
333        &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
334    );
335
336    let dist_from_begin = presplit_distance(&past, &middle, 8);
337    let dist_from_end = presplit_distance(&new_fp, &middle, 8);
338    // Donor `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
339    // if the middle is roughly equidistant from both ends, the change
340    // sits near the centre — split at the midpoint.
341    let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
342    if dist_from_begin.abs_diff(dist_from_end) < min_distance {
343        return 64 * 1024;
344    }
345    // Larger `dist_from_begin` (i.e. `middle` farther from the head
346    // fingerprint, equivalently closer to the tail) means the new
347    // statistics already dominate the centre — the transition
348    // happened EARLY → emit a small 32 KB head and let the 96 KB
349    // tail absorb the rest. Inverse case: `dist_from_end` larger
350    // (middle still resembles the head) means the transition is
351    // LATE → emit a 96 KB head so the trailing 32 KB carries the
352    // new statistics alone.
353    if dist_from_begin > dist_from_end {
354        32 * 1024
355    } else {
356        96 * 1024
357    }
358}
359
360fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
361    match level {
362        // Donor `ZSTD_blockSplitter_level` table (`clevels.h`): cheap
363        // borders heuristic for lazy2 / btlazy2 strategies (levels
364        // 11..=15) — the splitter still pays for itself on
365        // heterogeneous payloads but the per-block cost stays bounded
366        // by two 512-byte histograms.
367        CompressionLevel::Level(11..=15) => Some(0),
368        // C zstd's default splitter level for btopt/btultra/btultra2 is 4
369        // (`ZSTD_splitBlock_byChunks` with internal level 3 — sampling
370        // rate 1, `hashLog` 10).
371        CompressionLevel::Level(16..=22) => Some(4),
372        _ => None,
373    }
374}
375
376/// XXH64 (low 32 bits, seed 0) over `data`. Shared helper for the
377/// per-physical-block checksum sidecar so encoder and decoder hash
378/// the exact same byte ranges with the exact same parameters. Gated
379/// at `all(lsm, hash)` because the only consumer is the lsm-side
380/// `block_checksums` sidecar; non-lsm builds carry no reference to
381/// this helper at all.
382#[cfg(all(feature = "lsm", feature = "hash"))]
383#[inline]
384pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
385    let mut h = XxHash64::with_seed(0);
386    h.write(data);
387    h.finish() as u32
388}
389
390/// Bench-only entry point for the donor-parity comparator test in
391/// `tests/block_splitter_donor_parity.rs`. Dispatches to the same
392/// `_from_borders` (split_level == 0) / `_by_chunks` (split_level ∈
393/// 1..=4) ports that `donor_optimal_block_size` itself routes
394/// through. Caller is responsible for passing exactly
395/// `MAX_BLOCK_SIZE` bytes (per donor `ZSTD_splitBlock` contract —
396/// "@blockSize must be == 128 KB" in `zstd_preSplit.h`).
397#[cfg(feature = "bench_internals")]
398pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
399    assert_eq!(
400        block.len(),
401        MAX_BLOCK_SIZE as usize,
402        "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
403    );
404    assert!(
405        split_level <= 4,
406        "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
407    );
408    if split_level == 0 {
409        donor_split_block_from_borders(block)
410    } else {
411        donor_split_block_by_chunks(block, split_level)
412    }
413}
414
415pub(crate) fn donor_optimal_block_size(
416    level: CompressionLevel,
417    block: &[u8],
418    remaining_src_size: usize,
419    block_size_max: usize,
420    savings: i64,
421) -> usize {
422    let Some(split_level) = donor_pre_split_level(level) else {
423        return remaining_src_size.min(block_size_max);
424    };
425    if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
426        return remaining_src_size.min(block_size_max);
427    }
428    if savings < 3 {
429        return MAX_BLOCK_SIZE as usize;
430    }
431    if block.len() < MAX_BLOCK_SIZE as usize {
432        return remaining_src_size.min(block_size_max);
433    }
434    // Donor `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
435    // `split_level == 0` → cheap borders heuristic;
436    // `split_level == 1..=4` → byChunks with internal sampling level
437    // `split_level - 1`.
438    let raw_split = if split_level == 0 {
439        donor_split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
440    } else {
441        donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
442    };
443    raw_split
444        .max(PRESPLIT_BLOCK_MIN)
445        .min(MAX_BLOCK_SIZE as usize)
446}
447
448pub(crate) struct CompressState<M: Matcher> {
449    pub(crate) matcher: M,
450    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
451    pub(crate) fse_tables: FseTables,
452    pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
453    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
454    /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
455    pub(crate) offset_hist: [u32; 3],
456    /// Strategy tag resolved from the current `CompressionLevel` at every
457    /// `matcher.reset()` call. Used by the literal-compression gates
458    /// (`min_literals_to_compress`, `min_gain`) in
459    /// `encoding::blocks::compressed` to mirror donor's strategy-aware
460    /// thresholds (`zstd_compress_literals.c:114-127, 187-188`).
461    ///
462    /// **Invariant (required of every construction site):** must be
463    /// initialized from the active `CompressionLevel` via
464    /// `StrategyTag::for_compression_level`, and re-synced from the
465    /// active level alongside every `matcher.reset()` call so the
466    /// level-aware gates stay correct after a level change. The two
467    /// reset sites that own this sync are `FrameCompressor::compress`
468    /// and `StreamingEncoder::ensure_frame_started`. There is no
469    /// `Default` impl — production constructors
470    /// (`FrameCompressor::new`, `new_with_matcher`, the streaming
471    /// encoder constructor) plumb this explicitly. Tests that build
472    /// `CompressState` by hand must also supply a value.
473    pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
474}
475
476impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
477    /// Create a new `FrameCompressor`
478    pub fn new(compression_level: CompressionLevel) -> Self {
479        Self {
480            uncompressed_data: None,
481            compressed_data: None,
482            compression_level,
483            dictionary: None,
484            dictionary_entropy_cache: None,
485            source_size_hint: None,
486            state: CompressState {
487                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
488                last_huff_table: None,
489                fse_tables: FseTables::new(),
490                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
491                offset_hist: [1, 4, 8],
492                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
493                    compression_level,
494                ),
495            },
496            magicless: false,
497            #[cfg(feature = "hash")]
498            hasher: XxHash64::with_seed(0),
499            #[cfg(feature = "lsm")]
500            frame_emit_info: None,
501            #[cfg(all(feature = "lsm", feature = "hash"))]
502            per_block_checksums_enabled: false,
503            #[cfg(all(feature = "lsm", feature = "hash"))]
504            block_checksums: None,
505        }
506    }
507}
508
509impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
510    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
511    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
512        Self {
513            uncompressed_data: None,
514            compressed_data: None,
515            dictionary: None,
516            dictionary_entropy_cache: None,
517            source_size_hint: None,
518            state: CompressState {
519                matcher,
520                last_huff_table: None,
521                fse_tables: FseTables::new(),
522                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
523                offset_hist: [1, 4, 8],
524                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
525                    compression_level,
526                ),
527            },
528            compression_level,
529            magicless: false,
530            #[cfg(feature = "hash")]
531            hasher: XxHash64::with_seed(0),
532            #[cfg(feature = "lsm")]
533            frame_emit_info: None,
534            #[cfg(all(feature = "lsm", feature = "hash"))]
535            per_block_checksums_enabled: false,
536            #[cfg(all(feature = "lsm", feature = "hash"))]
537            block_checksums: None,
538        }
539    }
540
541    /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
542    ///
543    /// When set to `true`, emitted frames omit the 4-byte magic number
544    /// prefix. The matching decoder must be configured to expect a
545    /// magicless stream — wire-format only round-trips with a
546    /// magicless-aware decoder.
547    pub fn set_magicless(&mut self, magicless: bool) {
548        self.magicless = magicless;
549    }
550
551    /// Before calling [FrameCompressor::compress] you need to set the source.
552    ///
553    /// This is the data that is compressed and written into the drain.
554    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
555        self.uncompressed_data.replace(uncompressed_data)
556    }
557
558    /// Before calling [FrameCompressor::compress] you need to set the drain.
559    ///
560    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
561    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
562        self.compressed_data.replace(compressed_data)
563    }
564
565    /// Provide a hint about the total uncompressed size for the next frame.
566    ///
567    /// When set, the encoder selects smaller hash tables and windows for
568    /// small inputs, matching the C zstd source-size-class behavior.
569    ///
570    /// This hint applies only to frame payload bytes (`size`). Dictionary
571    /// history is primed separately and does not inflate the hinted size or
572    /// advertised frame window.
573    /// Must be called before [`compress`](Self::compress).
574    pub fn set_source_size_hint(&mut self, size: u64) {
575        self.source_size_hint = Some(size);
576    }
577
578    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
579    ///
580    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
581    /// All compressed blocks are buffered in memory so that the frame header can include the
582    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
583    /// entire frame — header, blocks, and optional checksum — is then written to the drain
584    /// at the end. This means peak memory usage is O(compressed_size).
585    ///
586    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
587    /// [Read::take] function
588    pub fn compress(&mut self) {
589        // Reset per-frame introspection state so a re-used compressor
590        // doesn't carry over the previous frame's layout/checksums.
591        #[cfg(feature = "lsm")]
592        {
593            self.frame_emit_info = None;
594        }
595        #[cfg(all(feature = "lsm", feature = "hash"))]
596        {
597            if self.per_block_checksums_enabled {
598                self.block_checksums = Some(alloc::vec::Vec::new());
599            } else {
600                self.block_checksums = None;
601            }
602        }
603        let initial_size_hint = self.source_size_hint;
604        let source_size_hint_known = initial_size_hint.is_some();
605        let use_dictionary_state =
606            !matches!(self.compression_level, CompressionLevel::Uncompressed)
607                && self.state.matcher.supports_dictionary_priming()
608                && self.dictionary.is_some();
609        if let Some(size_hint) = self.source_size_hint.take() {
610            // Keep source-size hint scoped to payload bytes; dictionary priming
611            // is applied separately and should not force larger matcher sizing.
612            self.state.matcher.set_source_size_hint(size_hint);
613        }
614        // Clearing buffers to allow re-using of the compressor
615        self.state.matcher.reset(self.compression_level);
616        self.state.offset_hist = [1, 4, 8];
617        // Sync `state.strategy_tag` to the level resolved at this reset so
618        // the literal-compression gates (`min_literals_to_compress` /
619        // `min_gain` in `encoding::blocks::compressed`) see the correct
620        // strategy for the next frame. Frame-by-frame level changes go
621        // through this same `compress()` entry point, so re-syncing here
622        // covers level switches without touching the matcher dispatch.
623        self.state.strategy_tag =
624            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level);
625        let cached_entropy = if use_dictionary_state {
626            self.dictionary_entropy_cache.as_ref()
627        } else {
628            None
629        };
630        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
631            // This state drives sequence encoding, while matcher priming below updates
632            // the match generator's internal repeat-offset history for match finding.
633            self.state.offset_hist = dict.offset_hist;
634            self.state
635                .matcher
636                .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
637        }
638        if let Some(cache) = cached_entropy {
639            self.state.last_huff_table.clone_from(&cache.huff);
640        } else {
641            self.state.last_huff_table = None;
642        }
643        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
644        // reusing existing allocations where possible instead of reallocating every frame.
645        if let Some(cache) = cached_entropy {
646            self.state
647                .fse_tables
648                .ll_previous
649                .clone_from(&cache.ll_previous);
650            self.state
651                .fse_tables
652                .ml_previous
653                .clone_from(&cache.ml_previous);
654            self.state
655                .fse_tables
656                .of_previous
657                .clone_from(&cache.of_previous);
658        } else {
659            self.state.fse_tables.ll_previous = None;
660            self.state.fse_tables.ml_previous = None;
661            self.state.fse_tables.of_previous = None;
662        }
663        let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
664            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
665            _ => None,
666        });
667        let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
668            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
669            _ => None,
670        });
671        let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
672            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
673            _ => None,
674        });
675        self.state.matcher.seed_dictionary_entropy(
676            self.state.last_huff_table.as_ref(),
677            ll_entropy,
678            ml_entropy,
679            of_entropy,
680        );
681        #[cfg(feature = "hash")]
682        {
683            self.hasher = XxHash64::with_seed(0);
684        }
685        let source = self.uncompressed_data.as_mut().unwrap();
686        let drain = self.compressed_data.as_mut().unwrap();
687        let window_size = self.state.matcher.window_size();
688        assert!(
689            window_size != 0,
690            "matcher reported window_size == 0, which is invalid"
691        );
692        // Accumulate all compressed blocks; the frame header is written
693        // after all input has been read so that Frame_Content_Size is
694        // known. The default seed is one donor block; smaller seeds for
695        // small payloads avoid pinning a full block worth of bytes when
696        // the compressed output fits in a few hundred bytes. For larger
697        // inputs the default seed amortises the first few `Vec::extend`
698        // doublings cheaply and the `peak - default_seed` residue is
699        // dominated by internal `compress_block_encoded` buffers anyway,
700        // so changing it produces no measurable savings.
701        //
702        // Seed-size tiers (mirrors donor `ZSTD_CStreamOutSize` naming):
703        //
704        // * `ALL_BLOCKS_TINY_CAP` — payload ≤ this size, seed equals
705        //   payload bound; ≥ everything compressed output could need
706        //   for a tiny input.
707        // * `ALL_BLOCKS_SMALL_CAP` — small-input seed picked to absorb
708        //   one or two doublings without over-allocating.
709        // * `ALL_BLOCKS_DEFAULT_CAP` — one donor block; the value the
710        //   rest of the encoder is sized around.
711        const ALL_BLOCKS_TINY_THRESHOLD: u64 = 4 * 1024;
712        const ALL_BLOCKS_SMALL_THRESHOLD: u64 = 64 * 1024;
713        const ALL_BLOCKS_TINY_CAP: usize = 4 * 1024;
714        const ALL_BLOCKS_SMALL_CAP: usize = 16 * 1024;
715        const ALL_BLOCKS_DEFAULT_CAP: usize = 130 * 1024;
716        let initial_all_blocks_cap = match initial_size_hint {
717            Some(h) if h <= ALL_BLOCKS_TINY_THRESHOLD => ALL_BLOCKS_TINY_CAP,
718            Some(h) if h <= ALL_BLOCKS_SMALL_THRESHOLD => ALL_BLOCKS_SMALL_CAP,
719            _ => ALL_BLOCKS_DEFAULT_CAP,
720        };
721        let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap);
722        let mut total_uncompressed: u64 = 0;
723        let mut pending_input: Vec<u8> = Vec::new();
724        let mut reached_eof = false;
725        let mut savings = 0i64;
726        // Compress block by block
727        loop {
728            // Read up to one donor block. When the pre-block splitter keeps a
729            // suffix, top it back up before compressing the next block, matching
730            // ZSTD_compress_frameChunk() over a contiguous input buffer.
731            let block_capacity = MAX_BLOCK_SIZE as usize;
732            let had_pending = !pending_input.is_empty();
733            let mut uncompressed_data = if had_pending {
734                core::mem::take(&mut pending_input)
735            } else {
736                self.state.matcher.get_next_space()
737            };
738            let mut filled = if had_pending {
739                uncompressed_data.len()
740            } else {
741                0
742            };
743            if uncompressed_data.len() < block_capacity {
744                uncompressed_data.resize(block_capacity, 0);
745            }
746            'read_loop: loop {
747                if reached_eof || filled == block_capacity {
748                    break 'read_loop;
749                }
750                let new_bytes = source
751                    .read(&mut uncompressed_data[filled..block_capacity])
752                    .unwrap();
753                if new_bytes == 0 {
754                    reached_eof = true;
755                    break 'read_loop;
756                }
757                filled += new_bytes;
758                total_uncompressed += new_bytes as u64;
759            }
760            uncompressed_data.truncate(filled);
761            let mut last_block = reached_eof;
762            let remaining_for_split = if reached_eof {
763                uncompressed_data.len()
764            } else {
765                block_capacity
766            };
767            if !matches!(self.compression_level, CompressionLevel::Uncompressed)
768                && uncompressed_data.len() == block_capacity
769            {
770                let block_len = donor_optimal_block_size(
771                    self.compression_level,
772                    &uncompressed_data,
773                    remaining_for_split,
774                    block_capacity,
775                    savings,
776                );
777                if block_len < uncompressed_data.len() {
778                    pending_input = uncompressed_data.split_off(block_len);
779                    // `split_off` returns a Vec whose capacity is typically
780                    // close to its length. Next iteration's `had_pending`
781                    // branch moves `pending_input` into `uncompressed_data`
782                    // and resizes to `block_capacity`, which would reallocate
783                    // from scratch on every pre-split. Pre-reserve here so
784                    // the resize stays in-place.
785                    if pending_input.capacity() < block_capacity {
786                        pending_input.reserve_exact(block_capacity - pending_input.len());
787                    }
788                    last_block = false;
789                }
790            }
791            // As we read, hash that data too
792            #[cfg(feature = "hash")]
793            self.hasher.write(&uncompressed_data);
794            // Per-physical-block XXH64 (low 32 bits) for the optional
795            // per-block checksum sidecar. Hashing happens INSIDE the
796            // block emitters (RLE / Raw fast-path / Compressed /
797            // post-split partitions), so the digests vector has
798            // exactly one entry per physical Block_Header written to
799            // `all_blocks` — 1:1 with `FrameEmitInfo.blocks`. See
800            // `enable_per_block_checksums` rustdoc.
801            // Special handling is needed for compression of a totally empty file
802            if uncompressed_data.is_empty() {
803                let header = BlockHeader {
804                    last_block: true,
805                    block_type: crate::blocks::block::BlockType::Raw,
806                    block_size: 0,
807                };
808                header.serialize(&mut all_blocks);
809                #[cfg(all(feature = "lsm", feature = "hash"))]
810                if let Some(checksums) = self.block_checksums.as_mut() {
811                    checksums.push(xxh64_block_low32(&[]));
812                }
813                break;
814            }
815
816            match self.compression_level {
817                CompressionLevel::Uncompressed => {
818                    let header = BlockHeader {
819                        last_block,
820                        block_type: crate::blocks::block::BlockType::Raw,
821                        block_size: uncompressed_data.len().try_into().unwrap(),
822                    };
823                    header.serialize(&mut all_blocks);
824                    #[cfg(all(feature = "lsm", feature = "hash"))]
825                    if let Some(checksums) = self.block_checksums.as_mut() {
826                        checksums.push(xxh64_block_low32(&uncompressed_data));
827                    }
828                    all_blocks.extend_from_slice(&uncompressed_data);
829                    savings +=
830                        uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
831                }
832                CompressionLevel::Fastest
833                | CompressionLevel::Default
834                | CompressionLevel::Better
835                | CompressionLevel::Best
836                | CompressionLevel::Level(_) => {
837                    let before_len = all_blocks.len();
838                    let block_len = uncompressed_data.len();
839                    compress_block_encoded(
840                        &mut self.state,
841                        self.compression_level,
842                        last_block,
843                        uncompressed_data,
844                        &mut all_blocks,
845                        #[cfg(all(feature = "lsm", feature = "hash"))]
846                        self.block_checksums.as_mut(),
847                    );
848                    savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
849                }
850            }
851            if last_block && pending_input.is_empty() {
852                break;
853            }
854        }
855
856        // Now that total_uncompressed is known, write the frame header with FCS.
857        // Match the donor framing policy for pledged one-shot inputs: use a
858        // single-segment frame whenever the source fits the active window.
859        let single_segment = !use_dictionary_state
860            && source_size_hint_known
861            && total_uncompressed >= 512
862            && total_uncompressed <= window_size;
863        let header = FrameHeader {
864            frame_content_size: Some(total_uncompressed),
865            single_segment,
866            content_checksum: cfg!(feature = "hash"),
867            dictionary_id: if use_dictionary_state {
868                self.dictionary.as_ref().map(|dict| dict.id as u64)
869            } else {
870                None
871            },
872            window_size: if single_segment {
873                None
874            } else {
875                Some(window_size)
876            },
877            magicless: self.magicless,
878        };
879        // Write the frame header and compressed blocks separately to avoid
880        // shifting the entire `all_blocks` buffer to prepend the header.
881        let mut header_buf: Vec<u8> = Vec::with_capacity(14);
882        header.serialize(&mut header_buf);
883        drain.write_all(&header_buf).unwrap();
884        drain.write_all(&all_blocks).unwrap();
885
886        // If the `hash` feature is enabled, then `content_checksum` is set to true in the header
887        // and a 32 bit hash is written at the end of the data.
888        #[cfg(feature = "hash")]
889        {
890            // Because we only have the data as a reader, we need to read all of it to calculate the checksum
891            // Possible TODO: create a wrapper around self.uncompressed data that hashes the data as it's read?
892            let content_checksum = self.hasher.finish();
893            drain
894                .write_all(&(content_checksum as u32).to_le_bytes())
895                .unwrap();
896        }
897
898        // FrameEmitInfo population (lsm feature): walk all_blocks to
899        // recover per-block layout. Each Block_Header is 3 bytes LE
900        // packing `(block_size << 3) | (block_type << 1) | last_block`.
901        // Physical body size differs by type: RLE bodies are always 1
902        // byte (the repeated byte), Raw/Compressed bodies span
903        // `block_size` bytes.
904        #[cfg(feature = "lsm")]
905        {
906            use crate::blocks::block::BlockType as BT;
907            use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
908            // All frame-offset arithmetic below is bounded by u32 on
909            // the wire (Block_Size is a 21-bit field, frames bounded
910            // by MAX_BLOCK_SIZE * #blocks). A pathologically large
911            // frame whose total emitted size exceeds u32::MAX would
912            // overflow the cast — bail out by leaving
913            // `frame_emit_info` at `None` rather than handing the
914            // caller a silently-truncated layout. Checked once for
915            // header / all_blocks / cursor up front + once per push;
916            // the overflow path is statically unreachable on every
917            // realistic frame so the predictor amortises the branch
918            // to zero cost on the hot path.
919            let frame_header_len: u32 = match u32::try_from(header_buf.len()) {
920                Ok(v) => v,
921                Err(_) => return,
922            };
923            let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
924                Ok(v) => v,
925                Err(_) => return,
926            };
927            let mut blocks: Vec<FrameBlock> = Vec::new();
928            let mut cursor: usize = 0;
929            while cursor + 3 <= all_blocks.len() {
930                let mut header_u32 = [0u8; 4];
931                header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
932                let raw = u32::from_le_bytes(header_u32);
933                let last_block = (raw & 1) != 0;
934                let block_type = match (raw >> 1) & 0b11 {
935                    0 => BT::Raw,
936                    1 => BT::RLE,
937                    2 => BT::Compressed,
938                    _ => BT::Reserved,
939                };
940                let block_size_field = raw >> 3;
941                // RLE bodies are always 1 byte physical on the wire
942                // (the single repeated byte); the spec's Block_Size
943                // field carries the logical repeat count. Raw and
944                // Compressed bodies physically span block_size_field
945                // bytes. Store the physical length in body_size so the
946                // 'offset + header + body_size' arithmetic always
947                // lands on the next block boundary, and surface the
948                // raw spec field separately as block_size_field.
949                let physical_body: u32 = match block_type {
950                    BT::RLE => 1,
951                    _ => block_size_field,
952                };
953                let cursor_u32: u32 = match u32::try_from(cursor) {
954                    Ok(v) => v,
955                    Err(_) => return,
956                };
957                let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
958                    Some(v) => v,
959                    None => return,
960                };
961                blocks.push(FrameBlock {
962                    offset_in_frame,
963                    header_size: 3,
964                    body_size: physical_body,
965                    block_size_field,
966                    block_type,
967                    last_block,
968                });
969                cursor += 3 + physical_body as usize;
970                if last_block {
971                    break;
972                }
973            }
974            let checksum_range = if cfg!(feature = "hash") {
975                let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
976                    Some(v) => v,
977                    None => return,
978                };
979                let cs_end = match cs_start.checked_add(4) {
980                    Some(v) => v,
981                    None => return,
982                };
983                Some(cs_start..cs_end)
984            } else {
985                None
986            };
987            let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
988                Some(v) => v,
989                None => return,
990            };
991            let total_size = if checksum_range.is_some() {
992                match body_total.checked_add(4) {
993                    Some(v) => v,
994                    None => return,
995                }
996            } else {
997                body_total
998            };
999            self.frame_emit_info = Some(FrameEmitInfo {
1000                frame_header_range: 0..frame_header_len,
1001                blocks,
1002                checksum_range,
1003                total_size,
1004            });
1005        }
1006    }
1007
1008    /// Layout of the most recently emitted frame.
1009    ///
1010    /// Returns `None` if [`compress`](Self::compress) has not been
1011    /// called yet on this compressor. After a successful `compress()`
1012    /// the returned `FrameEmitInfo` describes the frame header range,
1013    /// every emitted block's offset / size / type, and the optional
1014    /// trailing content-checksum range — all in frame-absolute byte
1015    /// offsets matching the bytes written to the drain.
1016    ///
1017    /// Behind the `lsm` Cargo feature.
1018    #[cfg(feature = "lsm")]
1019    pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
1020        self.frame_emit_info.as_ref()
1021    }
1022
1023    /// Opt in to per-block XXH64 checksum computation during
1024    /// [`compress`](Self::compress). Default off; zero cost when
1025    /// disabled. The captured digests are accessible via
1026    /// [`last_frame_block_checksums`](Self::last_frame_block_checksums).
1027    ///
1028    /// One checksum is emitted per physical FrameBlock written to
1029    /// the drain: 1:1 cardinality with
1030    /// [`last_frame_emit_info`](Self::last_frame_emit_info)'s
1031    /// `blocks` vector. On the post-split optimization path
1032    /// (Level 16-22 with large window) the per-partition decompressed
1033    /// range is hashed inside the partition loop so the digest count
1034    /// still matches the emitted block count. The decoder collects
1035    /// per-physical-block digests on the same granularity, so
1036    /// element-wise equality holds round-trip.
1037    ///
1038    /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
1039    /// primitive lives behind the `hash` feature, so this method only
1040    /// compiles when both are enabled.
1041    #[cfg(all(feature = "lsm", feature = "hash"))]
1042    pub fn enable_per_block_checksums(&mut self) {
1043        self.per_block_checksums_enabled = true;
1044    }
1045
1046    /// Per-block XXH64 (low 32 bits) digests captured during the most
1047    /// recent `compress()` call. `None` unless
1048    /// [`enable_per_block_checksums`](Self::enable_per_block_checksums)
1049    /// was called before `compress()`.
1050    ///
1051    /// Behind `all(feature = "lsm", feature = "hash")`.
1052    #[cfg(all(feature = "lsm", feature = "hash"))]
1053    pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
1054        self.block_checksums.as_deref()
1055    }
1056
1057    /// Get a mutable reference to the source
1058    pub fn source_mut(&mut self) -> Option<&mut R> {
1059        self.uncompressed_data.as_mut()
1060    }
1061
1062    /// Get a mutable reference to the drain
1063    pub fn drain_mut(&mut self) -> Option<&mut W> {
1064        self.compressed_data.as_mut()
1065    }
1066
1067    /// Get a reference to the source
1068    pub fn source(&self) -> Option<&R> {
1069        self.uncompressed_data.as_ref()
1070    }
1071
1072    /// Get a reference to the drain
1073    pub fn drain(&self) -> Option<&W> {
1074        self.compressed_data.as_ref()
1075    }
1076
1077    /// Retrieve the source
1078    pub fn take_source(&mut self) -> Option<R> {
1079        self.uncompressed_data.take()
1080    }
1081
1082    /// Retrieve the drain
1083    pub fn take_drain(&mut self) -> Option<W> {
1084        self.compressed_data.take()
1085    }
1086
1087    /// Before calling [FrameCompressor::compress] you can replace the matcher
1088    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
1089        core::mem::swap(&mut match_generator, &mut self.state.matcher);
1090        match_generator
1091    }
1092
1093    /// Before calling [FrameCompressor::compress] you can replace the compression level
1094    pub fn set_compression_level(
1095        &mut self,
1096        compression_level: CompressionLevel,
1097    ) -> CompressionLevel {
1098        let old = self.compression_level;
1099        self.compression_level = compression_level;
1100        old
1101    }
1102
1103    /// Get the current compression level
1104    pub fn compression_level(&self) -> CompressionLevel {
1105        self.compression_level
1106    }
1107
1108    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
1109    ///
1110    /// In compressed modes, the dictionary id is written only when the active
1111    /// matcher supports dictionary priming.
1112    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
1113    /// at encode time.
1114    pub fn set_dictionary(
1115        &mut self,
1116        dictionary: crate::decoding::Dictionary,
1117    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
1118    {
1119        if dictionary.id == 0 {
1120            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
1121        }
1122        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
1123            return Err(
1124                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1125                    index: index as u8,
1126                },
1127            );
1128        }
1129        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
1130            huff: dictionary.huf.table.to_encoder_table(),
1131            ll_previous: dictionary
1132                .fse
1133                .literal_lengths
1134                .to_encoder_table()
1135                .map(|table| PreviousFseTable::Custom(Box::new(table))),
1136            ml_previous: dictionary
1137                .fse
1138                .match_lengths
1139                .to_encoder_table()
1140                .map(|table| PreviousFseTable::Custom(Box::new(table))),
1141            of_previous: dictionary
1142                .fse
1143                .offsets
1144                .to_encoder_table()
1145                .map(|table| PreviousFseTable::Custom(Box::new(table))),
1146        });
1147        Ok(self.dictionary.replace(dictionary))
1148    }
1149
1150    /// Parse and attach a serialized dictionary blob.
1151    pub fn set_dictionary_from_bytes(
1152        &mut self,
1153        raw_dictionary: &[u8],
1154    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
1155    {
1156        let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
1157        self.set_dictionary(dictionary)
1158    }
1159
1160    /// Remove the attached dictionary.
1161    pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
1162        self.dictionary_entropy_cache = None;
1163        self.dictionary.take()
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    #[cfg(all(feature = "dict_builder", feature = "std"))]
1170    use alloc::format;
1171    use alloc::vec;
1172
1173    use super::FrameCompressor;
1174    use crate::blocks::block::BlockType;
1175    use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
1176    use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
1177    use crate::encoding::{Matcher, Sequence};
1178    use alloc::vec::Vec;
1179
1180    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
1181        let mut state = seed;
1182        let mut data = Vec::with_capacity(len);
1183        for _ in 0..len {
1184            state = state
1185                .wrapping_mul(6364136223846793005)
1186                .wrapping_add(1442695040888963407);
1187            data.push((state >> 33) as u8);
1188        }
1189        data
1190    }
1191
1192    fn first_block_type(frame: &[u8]) -> BlockType {
1193        let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
1194        let mut decoder = block_decoder::new();
1195        let (header, _) = decoder
1196            .read_block_header(&frame[header_size as usize..])
1197            .expect("block header should parse");
1198        header.block_type
1199    }
1200
1201    /// Frame content size is written correctly and C zstd can decompress the output.
1202    #[cfg(feature = "std")]
1203    #[test]
1204    fn fcs_header_written_and_c_zstd_compatible() {
1205        let levels = [
1206            crate::encoding::CompressionLevel::Uncompressed,
1207            crate::encoding::CompressionLevel::Fastest,
1208            crate::encoding::CompressionLevel::Default,
1209            crate::encoding::CompressionLevel::Better,
1210            crate::encoding::CompressionLevel::Best,
1211        ];
1212        let fcs_2byte = vec![0xCDu8; 300]; // 300 bytes → 2-byte FCS (256..=65791 range)
1213        let large = vec![0xABu8; 100_000];
1214        let inputs: [&[u8]; 5] = [
1215            &[],
1216            &[0x00],
1217            b"abcdefghijklmnopqrstuvwxy\n",
1218            &fcs_2byte,
1219            &large,
1220        ];
1221        for level in levels {
1222            for data in &inputs {
1223                let compressed = crate::encoding::compress_to_vec(*data, level);
1224                // Verify FCS is present and correct
1225                let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1226                    .unwrap()
1227                    .0;
1228                assert_eq!(
1229                    header.frame_content_size(),
1230                    data.len() as u64,
1231                    "FCS mismatch for len={} level={:?}",
1232                    data.len(),
1233                    level,
1234                );
1235                // Confirm the FCS field is actually present in the header
1236                // (not just the decoder returning 0 for absent FCS).
1237                assert_ne!(
1238                    header.descriptor.frame_content_size_bytes().unwrap(),
1239                    0,
1240                    "FCS field must be present for len={} level={:?}",
1241                    data.len(),
1242                    level,
1243                );
1244                // Verify C zstd can decompress
1245                let mut decoded = Vec::new();
1246                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1247                    |e| {
1248                        panic!(
1249                            "C zstd decode failed for len={} level={level:?}: {e}",
1250                            data.len()
1251                        )
1252                    },
1253                );
1254                assert_eq!(
1255                    decoded.as_slice(),
1256                    *data,
1257                    "C zstd roundtrip failed for len={}",
1258                    data.len()
1259                );
1260            }
1261        }
1262    }
1263
1264    #[cfg(feature = "std")]
1265    #[test]
1266    fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
1267        let data = vec![0xAB; 2047];
1268        let compressed = {
1269            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1270            compressor.set_source_size_hint(data.len() as u64);
1271            compressor.set_source(data.as_slice());
1272            let mut out = Vec::new();
1273            compressor.set_drain(&mut out);
1274            compressor.compress();
1275            out
1276        };
1277
1278        let mut decoded = Vec::new();
1279        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1280        assert_eq!(decoded, data);
1281    }
1282
1283    #[cfg(feature = "std")]
1284    #[test]
1285    fn small_hinted_default_frame_uses_single_segment_header() {
1286        let data = generate_data(0xD15E_A5ED, 1024);
1287        let compressed = {
1288            let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
1289            compressor.set_source_size_hint(data.len() as u64);
1290            compressor.set_source(data.as_slice());
1291            let mut out = Vec::new();
1292            compressor.set_drain(&mut out);
1293            compressor.compress();
1294            out
1295        };
1296
1297        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1298        assert!(
1299            frame_header.descriptor.single_segment_flag(),
1300            "small hinted default frames should use single-segment header for Rust/FFI parity"
1301        );
1302        assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1303        let mut decoded = Vec::new();
1304        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1305            .expect("ffi decoder must accept single-segment small hinted default frame");
1306        assert_eq!(decoded, data);
1307    }
1308
1309    #[cfg(feature = "std")]
1310    #[test]
1311    fn small_hinted_numeric_default_levels_use_single_segment_header() {
1312        let data = generate_data(0xA11C_E003, 1024);
1313        for level in [
1314            super::CompressionLevel::Level(0),
1315            super::CompressionLevel::Level(3),
1316        ] {
1317            let compressed = {
1318                let mut compressor = FrameCompressor::new(level);
1319                compressor.set_source_size_hint(data.len() as u64);
1320                compressor.set_source(data.as_slice());
1321                let mut out = Vec::new();
1322                compressor.set_drain(&mut out);
1323                compressor.compress();
1324                out
1325            };
1326
1327            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1328            assert!(
1329                frame_header.descriptor.single_segment_flag(),
1330                "small hinted numeric default level frames should use single-segment header (level={level:?})"
1331            );
1332            assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1333            let mut decoded = Vec::new();
1334            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1335                panic!(
1336                    "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
1337                )
1338            });
1339            assert_eq!(decoded, data);
1340        }
1341    }
1342
1343    #[cfg(feature = "std")]
1344    #[test]
1345    fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
1346        let levels = [
1347            super::CompressionLevel::Fastest,
1348            super::CompressionLevel::Default,
1349            super::CompressionLevel::Better,
1350            super::CompressionLevel::Best,
1351            super::CompressionLevel::Level(-1),
1352            super::CompressionLevel::Level(2),
1353            super::CompressionLevel::Level(3),
1354            super::CompressionLevel::Level(4),
1355            super::CompressionLevel::Level(11),
1356        ];
1357        let sizes = [
1358            511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
1359        ];
1360
1361        for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
1362            for &size in &sizes {
1363                let data = generate_data(seed + seed_idx as u64, size);
1364                for &level in &levels {
1365                    let compressed = {
1366                        let mut compressor = FrameCompressor::new(level);
1367                        compressor.set_source_size_hint(data.len() as u64);
1368                        compressor.set_source(data.as_slice());
1369                        let mut out = Vec::new();
1370                        compressor.set_drain(&mut out);
1371                        compressor.compress();
1372                        out
1373                    };
1374                    if matches!(size, 511 | 512) {
1375                        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1376                        assert_eq!(
1377                            frame_header.descriptor.single_segment_flag(),
1378                            size == 512,
1379                            "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1380                        );
1381                    }
1382
1383                    let mut decoded = Vec::new();
1384                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1385                        |e| {
1386                            panic!(
1387                                "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
1388                                seed + seed_idx as u64
1389                            )
1390                        },
1391                    );
1392                    assert_eq!(
1393                        decoded,
1394                        data,
1395                        "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
1396                        seed + seed_idx as u64
1397                    );
1398                }
1399            }
1400        }
1401    }
1402
1403    #[cfg(feature = "std")]
1404    #[test]
1405    fn hinted_levels_use_single_segment_header_symmetrically() {
1406        let levels = [
1407            super::CompressionLevel::Fastest,
1408            super::CompressionLevel::Default,
1409            super::CompressionLevel::Better,
1410            super::CompressionLevel::Best,
1411            super::CompressionLevel::Level(0),
1412            super::CompressionLevel::Level(2),
1413            super::CompressionLevel::Level(3),
1414            super::CompressionLevel::Level(4),
1415            super::CompressionLevel::Level(11),
1416        ];
1417        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1418            let size = 1024 + seed_idx * 97;
1419            let data = generate_data(seed, size);
1420            for &level in &levels {
1421                let compressed = {
1422                    let mut compressor = FrameCompressor::new(level);
1423                    compressor.set_source_size_hint(data.len() as u64);
1424                    compressor.set_source(data.as_slice());
1425                    let mut out = Vec::new();
1426                    compressor.set_drain(&mut out);
1427                    compressor.compress();
1428                    out
1429                };
1430                let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1431                assert!(
1432                    frame_header.descriptor.single_segment_flag(),
1433                    "hinted frame should be single-segment for level={level:?} size={}",
1434                    data.len()
1435                );
1436                assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1437                let mut decoded = Vec::new();
1438                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1439                    panic!(
1440                        "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
1441                        data.len()
1442                    )
1443                });
1444                assert_eq!(decoded, data);
1445            }
1446        }
1447    }
1448
1449    #[cfg(feature = "std")]
1450    #[test]
1451    fn hinted_levels_pin_511_512_single_segment_boundary() {
1452        let levels = [
1453            super::CompressionLevel::Fastest,
1454            super::CompressionLevel::Default,
1455            super::CompressionLevel::Better,
1456            super::CompressionLevel::Best,
1457            super::CompressionLevel::Level(0),
1458            super::CompressionLevel::Level(2),
1459            super::CompressionLevel::Level(3),
1460            super::CompressionLevel::Level(4),
1461            super::CompressionLevel::Level(11),
1462        ];
1463        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1464            for &size in &[511usize, 512] {
1465                let data = generate_data(seed + seed_idx as u64, size);
1466                for &level in &levels {
1467                    let compressed = {
1468                        let mut compressor = FrameCompressor::new(level);
1469                        compressor.set_source_size_hint(data.len() as u64);
1470                        compressor.set_source(data.as_slice());
1471                        let mut out = Vec::new();
1472                        compressor.set_drain(&mut out);
1473                        compressor.compress();
1474                        out
1475                    };
1476                    let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1477                    assert_eq!(
1478                        frame_header.descriptor.single_segment_flag(),
1479                        size == 512,
1480                        "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1481                    );
1482                    let mut decoded = Vec::new();
1483                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1484                        |e| {
1485                            panic!(
1486                                "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
1487                                seed + seed_idx as u64
1488                            )
1489                        },
1490                    );
1491                    assert_eq!(decoded, data);
1492                }
1493            }
1494        }
1495    }
1496
1497    #[cfg(feature = "std")]
1498    #[test]
1499    fn fastest_random_block_uses_raw_fast_path() {
1500        let data = generate_data(0xC0FF_EE11, 10 * 1024);
1501        let compressed =
1502            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
1503
1504        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1505
1506        let mut decoded = Vec::new();
1507        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1508        assert_eq!(decoded, data);
1509    }
1510
1511    #[cfg(feature = "std")]
1512    #[test]
1513    fn default_random_block_uses_raw_fast_path() {
1514        let data = generate_data(0xD15E_A5ED, 10 * 1024);
1515        let compressed =
1516            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
1517
1518        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1519
1520        let mut decoded = Vec::new();
1521        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1522        assert_eq!(decoded, data);
1523    }
1524
1525    #[cfg(feature = "std")]
1526    #[test]
1527    fn best_random_block_uses_raw_fast_path() {
1528        let data = generate_data(0xB35C_AFE1, 10 * 1024);
1529        let compressed =
1530            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
1531
1532        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1533
1534        let mut decoded = Vec::new();
1535        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1536        assert_eq!(decoded, data);
1537    }
1538
1539    #[cfg(feature = "std")]
1540    #[test]
1541    fn level2_random_block_uses_raw_fast_path() {
1542        let data = generate_data(0xA11C_E222, 10 * 1024);
1543        let compressed =
1544            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
1545
1546        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1547
1548        let mut decoded = Vec::new();
1549        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1550        assert_eq!(decoded, data);
1551    }
1552
1553    #[cfg(feature = "std")]
1554    #[test]
1555    fn better_random_block_uses_raw_fast_path() {
1556        let data = generate_data(0xBE77_E111, 10 * 1024);
1557        let compressed =
1558            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
1559
1560        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1561
1562        let mut decoded = Vec::new();
1563        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1564        assert_eq!(decoded, data);
1565    }
1566
1567    #[cfg(feature = "std")]
1568    #[test]
1569    fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
1570        let mut data = Vec::with_capacity(16 * 1024);
1571        const LINE: &[u8] =
1572            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1573        while data.len() < 16 * 1024 {
1574            let remaining = 16 * 1024 - data.len();
1575            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1576        }
1577
1578        fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
1579            let compressed = crate::encoding::compress_to_vec(data, level);
1580            assert_ne!(first_block_type(&compressed), BlockType::Raw);
1581            assert!(
1582                compressed.len() < data.len(),
1583                "compressible input should remain compressible for level={level:?}"
1584            );
1585            let mut decoded = Vec::new();
1586            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1587            assert_eq!(decoded, data);
1588        }
1589
1590        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
1591        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
1592        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
1593        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
1594        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
1595    }
1596
1597    #[cfg(feature = "std")]
1598    #[test]
1599    fn hinted_small_compressible_frames_use_single_segment_across_levels() {
1600        let mut data = Vec::with_capacity(4 * 1024);
1601        const LINE: &[u8] =
1602            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1603        while data.len() < 4 * 1024 {
1604            let remaining = 4 * 1024 - data.len();
1605            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1606        }
1607
1608        for level in [
1609            super::CompressionLevel::Fastest,
1610            super::CompressionLevel::Default,
1611            super::CompressionLevel::Better,
1612            super::CompressionLevel::Best,
1613            super::CompressionLevel::Level(0),
1614            super::CompressionLevel::Level(3),
1615            super::CompressionLevel::Level(4),
1616            super::CompressionLevel::Level(11),
1617        ] {
1618            let compressed = {
1619                let mut compressor = FrameCompressor::new(level);
1620                compressor.set_source_size_hint(data.len() as u64);
1621                compressor.set_source(data.as_slice());
1622                let mut out = Vec::new();
1623                compressor.set_drain(&mut out);
1624                compressor.compress();
1625                out
1626            };
1627            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1628            assert!(
1629                frame_header.descriptor.single_segment_flag(),
1630                "hinted small compressible frame should use single-segment (level={level:?})"
1631            );
1632            assert_ne!(
1633                first_block_type(&compressed),
1634                BlockType::Raw,
1635                "compressible hinted frame should stay off raw fast path (level={level:?})"
1636            );
1637            assert!(
1638                compressed.len() < data.len(),
1639                "compressible hinted frame should still shrink (level={level:?})"
1640            );
1641            let mut decoded = Vec::new();
1642            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1643                .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
1644            assert_eq!(decoded, data);
1645        }
1646    }
1647
1648    struct NoDictionaryMatcher {
1649        last_space: Vec<u8>,
1650        window_size: u64,
1651    }
1652
1653    impl NoDictionaryMatcher {
1654        fn new(window_size: u64) -> Self {
1655            Self {
1656                last_space: Vec::new(),
1657                window_size,
1658            }
1659        }
1660    }
1661
1662    impl Matcher for NoDictionaryMatcher {
1663        fn get_next_space(&mut self) -> Vec<u8> {
1664            vec![0; self.window_size as usize]
1665        }
1666
1667        fn get_last_space(&mut self) -> &[u8] {
1668            self.last_space.as_slice()
1669        }
1670
1671        fn commit_space(&mut self, space: Vec<u8>) {
1672            self.last_space = space;
1673        }
1674
1675        fn skip_matching(&mut self) {}
1676
1677        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1678            handle_sequence(Sequence::Literals {
1679                literals: self.last_space.as_slice(),
1680            });
1681        }
1682
1683        fn reset(&mut self, _level: super::CompressionLevel) {
1684            self.last_space.clear();
1685        }
1686
1687        fn window_size(&self) -> u64 {
1688            self.window_size
1689        }
1690    }
1691
1692    #[test]
1693    fn frame_starts_with_magic_num() {
1694        let mock_data = [1_u8, 2, 3].as_slice();
1695        let mut output: Vec<u8> = Vec::new();
1696        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1697        compressor.set_source(mock_data);
1698        compressor.set_drain(&mut output);
1699
1700        compressor.compress();
1701        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1702    }
1703
1704    #[test]
1705    fn very_simple_raw_compress() {
1706        let mock_data = [1_u8, 2, 3].as_slice();
1707        let mut output: Vec<u8> = Vec::new();
1708        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1709        compressor.set_source(mock_data);
1710        compressor.set_drain(&mut output);
1711
1712        compressor.compress();
1713    }
1714
1715    #[test]
1716    fn very_simple_compress() {
1717        let mut mock_data = vec![0; 1 << 17];
1718        mock_data.extend(vec![1; (1 << 17) - 1]);
1719        mock_data.extend(vec![2; (1 << 18) - 1]);
1720        mock_data.extend(vec![2; 1 << 17]);
1721        mock_data.extend(vec![3; (1 << 17) - 1]);
1722        let mut output: Vec<u8> = Vec::new();
1723        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1724        compressor.set_source(mock_data.as_slice());
1725        compressor.set_drain(&mut output);
1726
1727        compressor.compress();
1728
1729        let mut decoder = FrameDecoder::new();
1730        let mut decoded = Vec::with_capacity(mock_data.len());
1731        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1732        assert_eq!(mock_data, decoded);
1733
1734        let mut decoded = Vec::new();
1735        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1736        assert_eq!(mock_data, decoded);
1737    }
1738
1739    #[test]
1740    fn rle_compress() {
1741        let mock_data = vec![0; 1 << 19];
1742        let mut output: Vec<u8> = Vec::new();
1743        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1744        compressor.set_source(mock_data.as_slice());
1745        compressor.set_drain(&mut output);
1746
1747        compressor.compress();
1748
1749        let mut decoder = FrameDecoder::new();
1750        let mut decoded = Vec::with_capacity(mock_data.len());
1751        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1752        assert_eq!(mock_data, decoded);
1753    }
1754
1755    #[test]
1756    fn aaa_compress() {
1757        let mock_data = vec![0, 1, 3, 4, 5];
1758        let mut output: Vec<u8> = Vec::new();
1759        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1760        compressor.set_source(mock_data.as_slice());
1761        compressor.set_drain(&mut output);
1762
1763        compressor.compress();
1764
1765        let mut decoder = FrameDecoder::new();
1766        let mut decoded = Vec::with_capacity(mock_data.len());
1767        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1768        assert_eq!(mock_data, decoded);
1769
1770        let mut decoded = Vec::new();
1771        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1772        assert_eq!(mock_data, decoded);
1773    }
1774
1775    #[test]
1776    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1777        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1778        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1779        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1780
1781        let mut data = Vec::new();
1782        for _ in 0..8 {
1783            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1784        }
1785
1786        let mut with_dict = Vec::new();
1787        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1788        let previous = compressor
1789            .set_dictionary_from_bytes(dict_raw)
1790            .expect("dictionary bytes should parse");
1791        assert!(
1792            previous.is_none(),
1793            "first dictionary insert should return None"
1794        );
1795        assert_eq!(
1796            compressor
1797                .set_dictionary(dict_for_encoder)
1798                .expect("valid dictionary should attach")
1799                .expect("set_dictionary_from_bytes inserted previous dictionary")
1800                .id,
1801            dict_for_decoder.id
1802        );
1803        compressor.set_source(data.as_slice());
1804        compressor.set_drain(&mut with_dict);
1805        compressor.compress();
1806
1807        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1808            .expect("encoded stream should have a frame header");
1809        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1810
1811        let mut decoder = FrameDecoder::new();
1812        let mut missing_dict_target = Vec::with_capacity(data.len());
1813        let err = decoder
1814            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1815            .unwrap_err();
1816        assert!(
1817            matches!(
1818                &err,
1819                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1820            ),
1821            "dict-compressed stream should require dictionary id, got: {err:?}"
1822        );
1823
1824        let mut decoder = FrameDecoder::new();
1825        decoder.add_dict(dict_for_decoder).unwrap();
1826        let mut decoded = Vec::with_capacity(data.len());
1827        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1828        assert_eq!(decoded, data);
1829
1830        let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1831        let mut ffi_decoded = Vec::with_capacity(data.len());
1832        let ffi_written = ffi_decoder
1833            .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1834            .unwrap();
1835        assert_eq!(ffi_written, data.len());
1836        assert_eq!(ffi_decoded, data);
1837    }
1838
1839    #[cfg(all(feature = "dict_builder", feature = "std"))]
1840    #[test]
1841    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1842        use std::io::Cursor;
1843
1844        let mut training = Vec::new();
1845        for idx in 0..256u32 {
1846            training.extend_from_slice(
1847                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1848            );
1849        }
1850        let mut raw_dict = Vec::new();
1851        crate::dictionary::create_raw_dict_from_source(
1852            Cursor::new(training.as_slice()),
1853            training.len(),
1854            &mut raw_dict,
1855            4096,
1856        )
1857        .expect("dict_builder training should succeed");
1858        assert!(
1859            !raw_dict.is_empty(),
1860            "dict_builder produced an empty dictionary"
1861        );
1862
1863        let dict_id = 0xD1C7_0008;
1864        let encoder_dict =
1865            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1866        let decoder_dict =
1867            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1868
1869        let mut payload = Vec::new();
1870        for idx in 0..96u32 {
1871            payload.extend_from_slice(
1872                format!(
1873                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1874                )
1875                .as_bytes(),
1876            );
1877        }
1878
1879        let mut without_dict = Vec::new();
1880        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1881        baseline.set_source(payload.as_slice());
1882        baseline.set_drain(&mut without_dict);
1883        baseline.compress();
1884
1885        let mut with_dict = Vec::new();
1886        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1887        compressor
1888            .set_dictionary(encoder_dict)
1889            .expect("valid dict_builder dictionary should attach");
1890        compressor.set_source(payload.as_slice());
1891        compressor.set_drain(&mut with_dict);
1892        compressor.compress();
1893
1894        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1895            .expect("encoded stream should have a frame header");
1896        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1897        let mut decoder = FrameDecoder::new();
1898        decoder.add_dict(decoder_dict).unwrap();
1899        let mut decoded = Vec::with_capacity(payload.len());
1900        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1901        assert_eq!(decoded, payload);
1902        assert!(
1903            with_dict.len() < without_dict.len(),
1904            "trained dictionary should improve compression for this small payload"
1905        );
1906    }
1907
1908    #[test]
1909    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1910        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1911        let mut output = Vec::new();
1912        let input = b"";
1913
1914        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1915        let previous = compressor
1916            .set_dictionary_from_bytes(dict_raw)
1917            .expect("dictionary bytes should parse");
1918        assert!(previous.is_none());
1919
1920        compressor.set_source(input.as_slice());
1921        compressor.set_drain(&mut output);
1922        compressor.compress();
1923
1924        assert!(
1925            compressor.state.last_huff_table.is_some(),
1926            "dictionary entropy should seed previous huffman table before first block"
1927        );
1928        assert!(
1929            compressor.state.fse_tables.ll_previous.is_some(),
1930            "dictionary entropy should seed previous ll table before first block"
1931        );
1932        assert!(
1933            compressor.state.fse_tables.ml_previous.is_some(),
1934            "dictionary entropy should seed previous ml table before first block"
1935        );
1936        assert!(
1937            compressor.state.fse_tables.of_previous.is_some(),
1938            "dictionary entropy should seed previous of table before first block"
1939        );
1940    }
1941
1942    #[test]
1943    fn set_dictionary_rejects_zero_dictionary_id() {
1944        let invalid = crate::decoding::Dictionary {
1945            id: 0,
1946            fse: crate::decoding::scratch::FSEScratch::new(),
1947            huf: crate::decoding::scratch::HuffmanScratch::new(),
1948            dict_content: vec![1, 2, 3],
1949            offset_hist: [1, 4, 8],
1950        };
1951
1952        let mut compressor: FrameCompressor<
1953            &[u8],
1954            Vec<u8>,
1955            crate::encoding::match_generator::MatchGeneratorDriver,
1956        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1957        let result = compressor.set_dictionary(invalid);
1958        assert!(matches!(
1959            result,
1960            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1961        ));
1962    }
1963
1964    #[test]
1965    fn set_dictionary_rejects_zero_repeat_offsets() {
1966        let invalid = crate::decoding::Dictionary {
1967            id: 1,
1968            fse: crate::decoding::scratch::FSEScratch::new(),
1969            huf: crate::decoding::scratch::HuffmanScratch::new(),
1970            dict_content: vec![1, 2, 3],
1971            offset_hist: [0, 4, 8],
1972        };
1973
1974        let mut compressor: FrameCompressor<
1975            &[u8],
1976            Vec<u8>,
1977            crate::encoding::match_generator::MatchGeneratorDriver,
1978        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1979        let result = compressor.set_dictionary(invalid);
1980        assert!(matches!(
1981            result,
1982            Err(
1983                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1984                    index: 0
1985                }
1986            )
1987        ));
1988    }
1989
1990    #[test]
1991    fn uncompressed_mode_does_not_require_dictionary() {
1992        let dict_id = 0xABCD_0001;
1993        let dict =
1994            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1995                .expect("raw dictionary should be valid");
1996
1997        let payload = b"plain-bytes-that-should-stay-raw";
1998        let mut output = Vec::new();
1999        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2000        compressor
2001            .set_dictionary(dict)
2002            .expect("dictionary should attach in uncompressed mode");
2003        compressor.set_source(payload.as_slice());
2004        compressor.set_drain(&mut output);
2005        compressor.compress();
2006
2007        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2008            .expect("encoded frame should have a header");
2009        assert_eq!(
2010            frame_header.dictionary_id(),
2011            None,
2012            "raw/uncompressed frames must not advertise dictionary dependency"
2013        );
2014
2015        let mut decoder = FrameDecoder::new();
2016        let mut decoded = Vec::with_capacity(payload.len());
2017        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2018        assert_eq!(decoded, payload);
2019    }
2020
2021    #[test]
2022    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
2023        use crate::encoding::match_generator::MatchGeneratorDriver;
2024
2025        let dict_id = 0xABCD_0002;
2026        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2027            .expect("raw dictionary should be valid");
2028        let dict_for_decoder =
2029            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2030                .expect("raw dictionary should be valid");
2031
2032        // Payload must exceed the encoder's advertised window (512 KiB
2033        // for Fastest after `window_log = 19` alignment with donor's
2034        // L1 fast row in `clevels.h`) so the test actually exercises
2035        // cross-window-boundary behavior.
2036        let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
2037        let matcher = MatchGeneratorDriver::new(1024, 1);
2038
2039        let mut no_dict_output = Vec::new();
2040        let mut no_dict_compressor =
2041            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2042        no_dict_compressor.set_source(payload.as_slice());
2043        no_dict_compressor.set_drain(&mut no_dict_output);
2044        no_dict_compressor.compress();
2045        let (no_dict_frame_header, _) =
2046            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
2047                .expect("baseline frame should have a header");
2048        let no_dict_window = no_dict_frame_header
2049            .window_size()
2050            .expect("window size should be present");
2051
2052        let mut output = Vec::new();
2053        let matcher = MatchGeneratorDriver::new(1024, 1);
2054        let mut compressor =
2055            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2056        compressor
2057            .set_dictionary(dict)
2058            .expect("dictionary should attach");
2059        compressor.set_source(payload.as_slice());
2060        compressor.set_drain(&mut output);
2061        compressor.compress();
2062
2063        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2064            .expect("encoded frame should have a header");
2065        let advertised_window = frame_header
2066            .window_size()
2067            .expect("window size should be present");
2068        assert_eq!(
2069            advertised_window, no_dict_window,
2070            "dictionary priming must not inflate advertised window size"
2071        );
2072        assert!(
2073            payload.len() > advertised_window as usize,
2074            "test must cross the advertised window boundary"
2075        );
2076
2077        let mut decoder = FrameDecoder::new();
2078        decoder.add_dict(dict_for_decoder).unwrap();
2079        let mut decoded = Vec::with_capacity(payload.len());
2080        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2081        assert_eq!(decoded, payload);
2082    }
2083
2084    #[test]
2085    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
2086        let dict_id = 0xABCD_0004;
2087        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
2088        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
2089        let dict_for_decoder =
2090            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
2091        let payload = b"abcdabcdabcdabcd".repeat(128);
2092
2093        let mut hinted_output = Vec::new();
2094        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
2095        hinted.set_dictionary(dict).unwrap();
2096        hinted.set_source_size_hint(1);
2097        hinted.set_source(payload.as_slice());
2098        hinted.set_drain(&mut hinted_output);
2099        hinted.compress();
2100
2101        let mut no_hint_output = Vec::new();
2102        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
2103        no_hint
2104            .set_dictionary(
2105                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
2106                    .unwrap(),
2107            )
2108            .unwrap();
2109        no_hint.set_source(payload.as_slice());
2110        no_hint.set_drain(&mut no_hint_output);
2111        no_hint.compress();
2112
2113        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
2114            .expect("encoded frame should have a header")
2115            .0
2116            .window_size()
2117            .expect("window size should be present");
2118        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
2119            .expect("encoded frame should have a header")
2120            .0
2121            .window_size()
2122            .expect("window size should be present");
2123        assert!(
2124            hinted_window <= no_hint_window,
2125            "source-size hint should not increase advertised window with dictionary priming",
2126        );
2127
2128        let mut decoder = FrameDecoder::new();
2129        decoder.add_dict(dict_for_decoder).unwrap();
2130        let mut decoded = Vec::with_capacity(payload.len());
2131        decoder
2132            .decode_all_to_vec(&hinted_output, &mut decoded)
2133            .unwrap();
2134        assert_eq!(decoded, payload);
2135    }
2136
2137    #[test]
2138    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
2139        let dict_id = 0xABCD_0005;
2140        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
2141        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
2142        let dict_for_decoder =
2143            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
2144        let payload = b"abcd".repeat(1024); // 4 KiB payload
2145        let payload_len = payload.len() as u64;
2146
2147        let mut hinted_output = Vec::new();
2148        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
2149        hinted.set_dictionary(dict).unwrap();
2150        hinted.set_source_size_hint(payload_len);
2151        hinted.set_source(payload.as_slice());
2152        hinted.set_drain(&mut hinted_output);
2153        hinted.compress();
2154
2155        let mut no_hint_output = Vec::new();
2156        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
2157        no_hint
2158            .set_dictionary(
2159                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
2160                    .unwrap(),
2161            )
2162            .unwrap();
2163        no_hint.set_source(payload.as_slice());
2164        no_hint.set_drain(&mut no_hint_output);
2165        no_hint.compress();
2166
2167        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
2168            .expect("encoded frame should have a header")
2169            .0
2170            .window_size()
2171            .expect("window size should be present");
2172        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
2173            .expect("encoded frame should have a header")
2174            .0
2175            .window_size()
2176            .expect("window size should be present");
2177        assert!(
2178            hinted_window <= no_hint_window,
2179            "source-size hint should not increase advertised window with dictionary priming",
2180        );
2181
2182        let mut decoder = FrameDecoder::new();
2183        decoder.add_dict(dict_for_decoder).unwrap();
2184        let mut decoded = Vec::with_capacity(payload.len());
2185        decoder
2186            .decode_all_to_vec(&hinted_output, &mut decoded)
2187            .unwrap();
2188        assert_eq!(decoded, payload);
2189    }
2190
2191    #[test]
2192    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
2193        let dict_id = 0xABCD_0003;
2194        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
2195            .expect("raw dictionary should be valid");
2196        let payload = b"abcdefghabcdefgh";
2197
2198        let mut output = Vec::new();
2199        let matcher = NoDictionaryMatcher::new(64);
2200        let mut compressor =
2201            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
2202        compressor
2203            .set_dictionary(dict)
2204            .expect("dictionary should attach");
2205        compressor.set_source(payload.as_slice());
2206        compressor.set_drain(&mut output);
2207        compressor.compress();
2208
2209        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
2210            .expect("encoded frame should have a header");
2211        assert_eq!(
2212            frame_header.dictionary_id(),
2213            None,
2214            "matchers that do not support dictionary priming must not advertise dictionary dependency"
2215        );
2216
2217        let mut decoder = FrameDecoder::new();
2218        let mut decoded = Vec::with_capacity(payload.len());
2219        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2220        assert_eq!(decoded, payload);
2221    }
2222
2223    #[cfg(feature = "hash")]
2224    #[test]
2225    fn checksum_two_frames_reused_compressor() {
2226        // Compress the same data twice using the same compressor and verify that:
2227        // 1. The checksum written in each frame matches what the decoder calculates.
2228        // 2. The hasher is correctly reset between frames (no cross-contamination).
2229        //    If the hasher were NOT reset, the second frame's calculated checksum
2230        //    would differ from the one stored in the frame data, causing assert_eq to fail.
2231        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
2232
2233        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2234
2235        // --- Frame 1 ---
2236        let mut compressed1 = Vec::new();
2237        compressor.set_source(data.as_slice());
2238        compressor.set_drain(&mut compressed1);
2239        compressor.compress();
2240
2241        // --- Frame 2 (reuse the same compressor) ---
2242        let mut compressed2 = Vec::new();
2243        compressor.set_source(data.as_slice());
2244        compressor.set_drain(&mut compressed2);
2245        compressor.compress();
2246
2247        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
2248            let mut decoder = FrameDecoder::new();
2249            let mut source = compressed;
2250            decoder.reset(&mut source).unwrap();
2251            while !decoder.is_finished() {
2252                decoder
2253                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2254                    .unwrap();
2255            }
2256            let mut decoded = Vec::new();
2257            decoder.collect_to_writer(&mut decoded).unwrap();
2258            (
2259                decoded,
2260                decoder.get_checksum_from_data(),
2261                decoder.get_calculated_checksum(),
2262            )
2263        }
2264
2265        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
2266        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
2267        assert_eq!(
2268            chksum_from_data1, chksum_calculated1,
2269            "frame 1: checksum mismatch"
2270        );
2271
2272        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
2273        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
2274        assert_eq!(
2275            chksum_from_data2, chksum_calculated2,
2276            "frame 2: checksum mismatch"
2277        );
2278
2279        // Same data compressed twice must produce the same checksum.
2280        // If state leaked across frames, the second calculated checksum would differ.
2281        assert_eq!(
2282            chksum_from_data1, chksum_from_data2,
2283            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
2284        );
2285    }
2286
2287    #[cfg(feature = "std")]
2288    #[test]
2289    fn fuzz_targets() {
2290        use std::io::Read;
2291        fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
2292            let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
2293            let mut result: Vec<u8> = Vec::new();
2294            decoder.read_to_end(&mut result).expect("Decoding failed");
2295            result
2296        }
2297
2298        fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
2299            let mut decoder = crate::decoding::FrameDecoder::new();
2300            decoder.reset(&mut data).unwrap();
2301            let mut result = vec![];
2302            while !decoder.is_finished() || decoder.can_collect() > 0 {
2303                decoder
2304                    .decode_blocks(
2305                        &mut data,
2306                        crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
2307                    )
2308                    .unwrap();
2309                decoder.collect_to_writer(&mut result).unwrap();
2310            }
2311            result
2312        }
2313
2314        fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2315            zstd::stream::encode_all(std::io::Cursor::new(data), 3)
2316        }
2317
2318        fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2319            let mut input = Vec::new();
2320            data.read_to_end(&mut input).unwrap();
2321
2322            crate::encoding::compress_to_vec(
2323                input.as_slice(),
2324                crate::encoding::CompressionLevel::Uncompressed,
2325            )
2326        }
2327
2328        fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2329            let mut input = Vec::new();
2330            data.read_to_end(&mut input).unwrap();
2331
2332            crate::encoding::compress_to_vec(
2333                input.as_slice(),
2334                crate::encoding::CompressionLevel::Fastest,
2335            )
2336        }
2337
2338        fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2339            let mut output = Vec::new();
2340            zstd::stream::copy_decode(data, &mut output)?;
2341            Ok(output)
2342        }
2343        if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
2344            for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
2345                if file.as_ref().unwrap().file_type().unwrap().is_file() {
2346                    let data = std::fs::read(file.unwrap().path()).unwrap();
2347                    let data = data.as_slice();
2348                    // Decoding
2349                    let compressed = encode_zstd(data).unwrap();
2350                    let decoded = decode_szstd(&mut compressed.as_slice());
2351                    let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
2352                    assert!(
2353                        decoded == data,
2354                        "Decoded data did not match the original input during decompression"
2355                    );
2356                    assert_eq!(
2357                        decoded2, data,
2358                        "Decoded data did not match the original input during decompression"
2359                    );
2360
2361                    // Encoding
2362                    // Uncompressed encoding
2363                    let mut input = data;
2364                    let compressed = encode_szstd_uncompressed(&mut input);
2365                    let decoded = decode_zstd(&compressed).unwrap();
2366                    assert_eq!(
2367                        decoded, data,
2368                        "Decoded data did not match the original input during compression"
2369                    );
2370                    // Compressed encoding
2371                    let mut input = data;
2372                    let compressed = encode_szstd_compressed(&mut input);
2373                    let decoded = decode_zstd(&compressed).unwrap();
2374                    assert_eq!(
2375                        decoded, data,
2376                        "Decoded data did not match the original input during compression"
2377                    );
2378                }
2379            }
2380        }
2381    }
2382
2383    /// Homogeneous input — every byte the same — must NOT be split:
2384    /// both border histograms are identical (all 512 hits on a single
2385    /// slot), so `presplit_fingerprints_differ` returns `false` and the
2386    /// function takes the early-return path at
2387    /// `zstd_preSplit.c:214` returning `blockSize`.
2388    #[test]
2389    fn donor_split_block_from_borders_keeps_homogeneous_block() {
2390        let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
2391        let split = super::donor_split_block_from_borders(&block);
2392        assert_eq!(split, MAX_BLOCK_SIZE as usize);
2393    }
2394
2395    /// Heterogeneous input — first half all zeros, second half a
2396    /// counter sequence — has clearly distinguishable border
2397    /// histograms, so the borders heuristic decides to split.
2398    ///
2399    /// The transition sits at exactly the block midpoint, so the
2400    /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
2401    /// zeros + half counter values. That makes it roughly
2402    /// equidistant from both border fingerprints — the
2403    /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
2404    /// branch fires and the heuristic returns the midpoint (64 KiB)
2405    /// per `zstd_preSplit.c:222`. The test asserts the exact value
2406    /// rather than just "one of {32K, 64K, 96K}" so a regression
2407    /// to a different quantised arm cannot silently slip through.
2408    #[test]
2409    fn donor_split_block_from_borders_returns_midpoint_for_centred_transition() {
2410        let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
2411        for (i, byte) in block
2412            .iter_mut()
2413            .enumerate()
2414            .skip(MAX_BLOCK_SIZE as usize / 2)
2415        {
2416            *byte = (i % 251 + 1) as u8;
2417        }
2418        let split = super::donor_split_block_from_borders(&block);
2419        assert_eq!(
2420            split,
2421            64 * 1024,
2422            "centred-transition fixture must take the symmetric \
2423             midpoint arm (`abs_diff < min_distance`), got {split}"
2424        );
2425    }
2426
2427    /// `donor_pre_split_level` maps mid-range levels to the cheap
2428    /// borders heuristic and high levels to the byChunks path. Levels
2429    /// below 11 stay unsplit so the splitter never runs on fast /
2430    /// default presets where its per-block cost would dominate.
2431    #[test]
2432    fn donor_pre_split_level_dispatches_by_compression_level() {
2433        use crate::encoding::CompressionLevel;
2434        assert_eq!(
2435            super::donor_pre_split_level(CompressionLevel::Fastest),
2436            None
2437        );
2438        assert_eq!(
2439            super::donor_pre_split_level(CompressionLevel::Default),
2440            None
2441        );
2442        assert_eq!(super::donor_pre_split_level(CompressionLevel::Better), None);
2443        assert_eq!(
2444            super::donor_pre_split_level(CompressionLevel::Level(7)),
2445            None
2446        );
2447        assert_eq!(
2448            super::donor_pre_split_level(CompressionLevel::Level(11)),
2449            Some(0)
2450        );
2451        assert_eq!(
2452            super::donor_pre_split_level(CompressionLevel::Level(15)),
2453            Some(0)
2454        );
2455        assert_eq!(
2456            super::donor_pre_split_level(CompressionLevel::Level(16)),
2457            Some(4)
2458        );
2459        assert_eq!(
2460            super::donor_pre_split_level(CompressionLevel::Level(22)),
2461            Some(4)
2462        );
2463    }
2464
2465    /// End-to-end: a 256 KB heterogeneous payload compressed at
2466    /// Level(13) (borders heuristic active) round-trips through the
2467    /// crate's own decoder. The pre-split path runs over the first
2468    /// 128 KB block and emits two consecutive sub-blocks; the second
2469    /// 128 KB block goes through the splitter on its own. The test
2470    /// proves the split decisions do not corrupt the frame bitstream.
2471    #[test]
2472    fn level_13_borders_split_roundtrips_through_own_decoder() {
2473        use crate::encoding::CompressionLevel;
2474        let mut data = vec![0u8; 256 * 1024];
2475        // First 128 KB: low-entropy repeating run; second 128 KB:
2476        // counter sequence — clearly distinct border histograms.
2477        for (i, byte) in data.iter_mut().enumerate() {
2478            *byte = if i < 128 * 1024 {
2479                (i & 0x07) as u8
2480            } else {
2481                (i % 251 + 1) as u8
2482            };
2483        }
2484
2485        let mut compressed = Vec::new();
2486        let mut compressor = FrameCompressor::new(CompressionLevel::Level(13));
2487        compressor.set_source(data.as_slice());
2488        compressor.set_drain(&mut compressed);
2489        compressor.compress();
2490
2491        let mut decoder = FrameDecoder::new();
2492        let mut source = compressed.as_slice();
2493        decoder
2494            .reset(&mut source)
2495            .expect("frame header should parse");
2496        while !decoder.is_finished() {
2497            decoder
2498                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2499                .expect("decode should succeed");
2500        }
2501        let mut decoded = Vec::with_capacity(data.len());
2502        decoder.collect_to_writer(&mut decoded).unwrap();
2503        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
2504    }
2505
2506    /// Regression: `set_compression_level` followed by `compress()` must
2507    /// refresh `state.strategy_tag` through the reset-time sync so the
2508    /// literal-compression gates (`min_literals_to_compress`,
2509    /// `min_gain`) use the NEW level's strategy. Picks a level pair
2510    /// that genuinely crosses strategy bands — `Fastest` resolves to
2511    /// `Fast`, `Level(20)` resolves to `BtUltra2` — so a missed sync
2512    /// would leave the construction-time tag visible and trip the
2513    /// assertion. `CompressionLevel::Best` would also pass type-wise
2514    /// but resolves to `Lazy` today, which keeps `min_literals_to_compress`
2515    /// in the same `shift=3 → 64-byte` band as `Fast` and weakens the
2516    /// signal that the gate floor actually moved.
2517    #[cfg(feature = "std")]
2518    #[test]
2519    fn set_compression_level_then_compress_refreshes_strategy_tag() {
2520        use super::CompressionLevel;
2521        use crate::encoding::strategy::StrategyTag;
2522
2523        let data = vec![0xABu8; 256];
2524        let mut out = Vec::new();
2525        let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
2526        let initial_tag = compressor.state.strategy_tag;
2527        assert_eq!(
2528            initial_tag,
2529            StrategyTag::for_compression_level(CompressionLevel::Fastest),
2530            "construction-time strategy_tag must reflect initial level",
2531        );
2532
2533        // Switch to a level whose resolved strategy lives in a different
2534        // band, then run a full compress cycle — the matcher.reset()
2535        // inside `compress` is the only site that can refresh the tag.
2536        let new_level = CompressionLevel::Level(20);
2537        compressor.set_compression_level(new_level);
2538        compressor.set_source(data.as_slice());
2539        compressor.set_drain(&mut out);
2540        compressor.compress();
2541
2542        let new_tag = compressor.state.strategy_tag;
2543        let expected = StrategyTag::for_compression_level(new_level);
2544        assert_eq!(
2545            new_tag, expected,
2546            "strategy_tag must follow set_compression_level → compress, \
2547             got {new_tag:?} expected {expected:?}",
2548        );
2549        assert_eq!(
2550            expected,
2551            StrategyTag::BtUltra2,
2552            "test fixture invariant: Level(20) must resolve to BtUltra2 \
2553             so the post-switch tag visibly crosses the band boundary",
2554        );
2555        assert_ne!(
2556            new_tag, initial_tag,
2557            "test fixture invariant: chosen levels must resolve to \
2558             different StrategyTag variants",
2559        );
2560    }
2561
2562    /// Magicless mode (`ZSTD_f_zstd1_magicless`): encoded frame
2563    /// MUST NOT start with the 4-byte magic prefix, AND must
2564    /// round-trip through a magicless-aware decoder.
2565    #[test]
2566    fn magicless_frame_omits_magic_and_roundtrips() {
2567        use crate::common::MAGIC_NUM;
2568        let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
2569
2570        // Encode with magicless = true.
2571        let mut output: Vec<u8> = Vec::new();
2572        let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
2573        compressor.set_magicless(true);
2574        compressor.set_source(input.as_slice());
2575        compressor.set_drain(&mut output);
2576        compressor.compress();
2577
2578        // 1. Encoded output must NOT begin with the zstd magic number.
2579        assert!(
2580            !output.starts_with(&MAGIC_NUM.to_le_bytes()),
2581            "magicless frame must omit the 4-byte magic prefix",
2582        );
2583
2584        // 2. A magicless-aware decoder must round-trip the payload.
2585        let mut decoder = crate::decoding::FrameDecoder::new();
2586        decoder.set_magicless(true);
2587        let mut cursor: &[u8] = output.as_slice();
2588        decoder.init(&mut cursor).expect("magicless init");
2589        decoder
2590            .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
2591            .expect("decode_blocks");
2592        let mut decoded: Vec<u8> = Vec::new();
2593        decoder
2594            .collect_to_writer(&mut decoded)
2595            .expect("collect_to_writer");
2596        assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
2597
2598        // 3. A standard (magicful) decoder MUST reject a magicless
2599        //    frame at the header-read step — the first 4 bytes are
2600        //    the frame-header descriptor + window / dictionary / FCS
2601        //    metadata, not the magic. We accept either
2602        //    `BadMagicNumber` (typical case: first 4 bytes don't
2603        //    match `MAGIC_NUM` and don't fall in the skippable-frame
2604        //    magic range) or `SkipFrame` (rare: the first 4 bytes
2605        //    coincidentally land in `0x184D2A50..=0x184D2A5F`). Both
2606        //    prove the standard decoder did not treat the bytes as a
2607        //    real magicful frame.
2608        use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
2609        let mut std_decoder = crate::decoding::FrameDecoder::new();
2610        let std_init = std_decoder.init(output.as_slice());
2611        match std_init {
2612            Err(FrameDecoderError::ReadFrameHeaderError(
2613                ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
2614            )) => {}
2615            other => panic!(
2616                "standard decoder must reject a magicless frame with \
2617                 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
2618            ),
2619        }
2620    }
2621}