Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
21///
22/// `FrameCompressor` will generally be used by:
23/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
24/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
25///
26/// # Examples
27/// ```
28/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
29/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
30/// let mut output = std::vec::Vec::new();
31/// // Initialize a compressor.
32/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
33/// compressor.set_source(mock_data);
34/// compressor.set_drain(&mut output);
35///
36/// // `compress` writes the compressed output into the provided buffer.
37/// compressor.compress();
38/// ```
39pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
40    uncompressed_data: Option<R>,
41    compressed_data: Option<W>,
42    compression_level: CompressionLevel,
43    dictionary: Option<crate::decoding::Dictionary>,
44    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
45    source_size_hint: Option<u64>,
46    state: CompressState<M>,
47    #[cfg(feature = "hash")]
48    hasher: XxHash64,
49}
50
51#[derive(Clone, Default)]
52struct CachedDictionaryEntropy {
53    huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
54    ll_previous: Option<PreviousFseTable>,
55    ml_previous: Option<PreviousFseTable>,
56    of_previous: Option<PreviousFseTable>,
57}
58
59#[derive(Clone)]
60pub(crate) enum PreviousFseTable {
61    // Default tables are immutable and already stored alongside the state, so
62    // repeating them only needs a lightweight marker instead of cloning FSETable.
63    Default,
64    Custom(Box<FSETable>),
65    Rle(u8),
66}
67
68impl PreviousFseTable {
69    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
70        match self {
71            Self::Default => Some(default),
72            Self::Custom(table) => Some(table),
73            Self::Rle(_) => None,
74        }
75    }
76}
77
78pub(crate) struct FseTables {
79    /// The three predefined LL/ML/OF tables are functions of
80    /// compile-time-constant distributions. The
81    /// [`fse_encoder::FseDefaultTable`] type alias resolves to
82    /// `&'static FSETable` when a process-wide cache is available
83    /// (atomic-pointer targets, or no-atomic targets with the
84    /// `critical-section` feature) and to `Box<FSETable>` on the
85    /// cache-less no-atomic path (one per-frame allocation, dropped
86    /// with the compressor — no `Box::leak`, no unbounded growth).
87    /// Both arms `Deref` to `FSETable`, so consumers in
88    /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
89    /// without seeing the per-target divergence.
90    pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
91    pub(crate) ll_previous: Option<PreviousFseTable>,
92    pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
93    pub(crate) ml_previous: Option<PreviousFseTable>,
94    pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
95    pub(crate) of_previous: Option<PreviousFseTable>,
96}
97
98impl FseTables {
99    pub fn new() -> Self {
100        Self {
101            ll_default: default_ll_table(),
102            ll_previous: None,
103            ml_default: default_ml_table(),
104            ml_previous: None,
105            of_default: default_of_table(),
106            of_previous: None,
107        }
108    }
109
110    /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
111    /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
112    /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
113    /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
114    /// through `Box`. Both arms yield `&FSETable` uniformly so
115    /// downstream consumers can stay cfg-agnostic.
116    #[inline]
117    #[allow(clippy::borrow_deref_ref)]
118    pub(crate) fn ll_default_ref(&self) -> &FSETable {
119        &*self.ll_default
120    }
121
122    /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
123    #[inline]
124    #[allow(clippy::borrow_deref_ref)]
125    pub(crate) fn ml_default_ref(&self) -> &FSETable {
126        &*self.ml_default
127    }
128
129    /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
130    #[inline]
131    #[allow(clippy::borrow_deref_ref)]
132    pub(crate) fn of_default_ref(&self) -> &FSETable {
133        &*self.of_default
134    }
135}
136
137const PRESPLIT_BLOCK_MIN: usize = 3500;
138const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
139const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
140const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
141const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
142const PRESPLIT_HASH_LOG_MAX: usize = 10;
143const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
144const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
145/// Donor `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
146/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
147/// drive the cheap border heuristic; a third one from the middle disambiguates
148/// where in the block the transition sits.
149const PRESPLIT_BORDERS_SEGMENT: usize = 512;
150
151#[derive(Clone)]
152struct PreSplitFingerprint {
153    events: [u32; PRESPLIT_HASH_TABLE_SIZE],
154    nb_events: usize,
155}
156
157impl Default for PreSplitFingerprint {
158    fn default() -> Self {
159        Self {
160            events: [0; PRESPLIT_HASH_TABLE_SIZE],
161            nb_events: 0,
162        }
163    }
164}
165
166fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
167    debug_assert!(hash_log >= 8);
168    if hash_log == 8 {
169        return bytes[0] as usize;
170    }
171    debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
172    let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
173    (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
174}
175
176fn presplit_record_fingerprint(
177    fp: &mut PreSplitFingerprint,
178    src: &[u8],
179    sampling_rate: usize,
180    hash_log: usize,
181) {
182    fp.events.fill(0);
183    fp.nb_events = 0;
184    if src.len() < 2 {
185        return;
186    }
187    let limit = src.len() - 1;
188    let mut n = 0usize;
189    while n < limit {
190        fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
191        n += sampling_rate;
192    }
193    // Donor parity: zstd_preSplit.c records the integer division, not the
194    // rounded-up number of sampled events from the loop above.
195    fp.nb_events += limit / sampling_rate;
196}
197
198/// Single-byte histogram pass — matches donor `HIST_add` over a small
199/// segment with `hashLog == 8` (the `hash2` shortcut at
200/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
201/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
202/// heuristic doesn't pay for that wider hash on its 512-byte windows.
203fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
204    fp.events.fill(0);
205    for &b in src {
206        fp.events[b as usize] += 1;
207    }
208    // Donor `HIST_add` returns the maximum symbol; the caller then sets
209    // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
210    fp.nb_events = src.len();
211}
212
213fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
214    let slots = 1usize << hash_log;
215    let mut distance = 0u64;
216    for idx in 0..slots {
217        let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
218        let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
219        distance = distance.saturating_add(left.abs_diff(right) as u64);
220    }
221    distance
222}
223
224fn presplit_fingerprints_differ(
225    reference: &PreSplitFingerprint,
226    new_fp: &PreSplitFingerprint,
227    penalty: i32,
228    hash_log: usize,
229) -> bool {
230    debug_assert!(reference.nb_events > 0);
231    debug_assert!(new_fp.nb_events > 0);
232    let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
233    let deviation = presplit_distance(reference, new_fp, hash_log);
234    let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
235        / PRESPLIT_THRESHOLD_PENALTY_RATE;
236    deviation >= threshold
237}
238
239fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
240    for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
241        acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
242    }
243    acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
244}
245
246fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
247    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
248    debug_assert!((1..=4).contains(&level));
249    let (sampling_rate, hash_log) = match level - 1 {
250        0 => (43, 8),
251        1 => (11, 9),
252        2 => (5, 10),
253        _ => (1, 10),
254    };
255
256    let mut past = PreSplitFingerprint::default();
257    let mut new_events = PreSplitFingerprint::default();
258    let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
259    presplit_record_fingerprint(
260        &mut past,
261        &block[..PRESPLIT_CHUNK_SIZE],
262        sampling_rate,
263        hash_log,
264    );
265    let mut pos = PRESPLIT_CHUNK_SIZE;
266    while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
267        presplit_record_fingerprint(
268            &mut new_events,
269            &block[pos..pos + PRESPLIT_CHUNK_SIZE],
270            sampling_rate,
271            hash_log,
272        );
273        if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
274            return pos;
275        }
276        presplit_merge_events(&mut past, &new_events);
277        if penalty > 0 {
278            penalty -= 1;
279        }
280        pos += PRESPLIT_CHUNK_SIZE;
281    }
282    block.len()
283}
284
285/// Donor port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
286/// Records two 512-byte byte-histograms — one from each end of a 128 KB
287/// block — and a third from the middle as a tie-breaker; returns either
288/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
289/// size when the two ends look indistinguishable. Cheaper than the
290/// chunk-based path because it touches at most 1.5 KB of input
291/// regardless of block size.
292fn donor_split_block_from_borders(block: &[u8]) -> usize {
293    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
294    let block_size = block.len();
295    let mut past = PreSplitFingerprint::default();
296    let mut new_fp = PreSplitFingerprint::default();
297    presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
298    presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
299    // Donor uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
300    // distance with no threshold padding (`zstd_preSplit.c:214`).
301    if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
302        return block_size;
303    }
304
305    let mut middle = PreSplitFingerprint::default();
306    let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
307    presplit_record_byte_histogram(
308        &mut middle,
309        &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
310    );
311
312    let dist_from_begin = presplit_distance(&past, &middle, 8);
313    let dist_from_end = presplit_distance(&new_fp, &middle, 8);
314    // Donor `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
315    // if the middle is roughly equidistant from both ends, the change
316    // sits near the centre — split at the midpoint.
317    let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
318    if dist_from_begin.abs_diff(dist_from_end) < min_distance {
319        return 64 * 1024;
320    }
321    // Larger `dist_from_begin` (i.e. `middle` farther from the head
322    // fingerprint, equivalently closer to the tail) means the new
323    // statistics already dominate the centre — the transition
324    // happened EARLY → emit a small 32 KB head and let the 96 KB
325    // tail absorb the rest. Inverse case: `dist_from_end` larger
326    // (middle still resembles the head) means the transition is
327    // LATE → emit a 96 KB head so the trailing 32 KB carries the
328    // new statistics alone.
329    if dist_from_begin > dist_from_end {
330        32 * 1024
331    } else {
332        96 * 1024
333    }
334}
335
336fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
337    match level {
338        // Donor `ZSTD_blockSplitter_level` table (`clevels.h`): cheap
339        // borders heuristic for lazy2 / btlazy2 strategies (levels
340        // 11..=15) — the splitter still pays for itself on
341        // heterogeneous payloads but the per-block cost stays bounded
342        // by two 512-byte histograms.
343        CompressionLevel::Level(11..=15) => Some(0),
344        // C zstd's default splitter level for btopt/btultra/btultra2 is 4
345        // (`ZSTD_splitBlock_byChunks` with internal level 3 — sampling
346        // rate 1, `hashLog` 10).
347        CompressionLevel::Level(16..=22) => Some(4),
348        _ => None,
349    }
350}
351
352pub(crate) fn donor_optimal_block_size(
353    level: CompressionLevel,
354    block: &[u8],
355    remaining_src_size: usize,
356    block_size_max: usize,
357    savings: i64,
358) -> usize {
359    let Some(split_level) = donor_pre_split_level(level) else {
360        return remaining_src_size.min(block_size_max);
361    };
362    if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
363        return remaining_src_size.min(block_size_max);
364    }
365    if savings < 3 {
366        return MAX_BLOCK_SIZE as usize;
367    }
368    if block.len() < MAX_BLOCK_SIZE as usize {
369        return remaining_src_size.min(block_size_max);
370    }
371    // Donor `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
372    // `split_level == 0` → cheap borders heuristic;
373    // `split_level == 1..=4` → byChunks with internal sampling level
374    // `split_level - 1`.
375    let raw_split = if split_level == 0 {
376        donor_split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
377    } else {
378        donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
379    };
380    raw_split
381        .max(PRESPLIT_BLOCK_MIN)
382        .min(MAX_BLOCK_SIZE as usize)
383}
384
385pub(crate) struct CompressState<M: Matcher> {
386    pub(crate) matcher: M,
387    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
388    pub(crate) fse_tables: FseTables,
389    pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
390    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
391    /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
392    pub(crate) offset_hist: [u32; 3],
393}
394
395impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
396    /// Create a new `FrameCompressor`
397    pub fn new(compression_level: CompressionLevel) -> Self {
398        Self {
399            uncompressed_data: None,
400            compressed_data: None,
401            compression_level,
402            dictionary: None,
403            dictionary_entropy_cache: None,
404            source_size_hint: None,
405            state: CompressState {
406                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
407                last_huff_table: None,
408                fse_tables: FseTables::new(),
409                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
410                offset_hist: [1, 4, 8],
411            },
412            #[cfg(feature = "hash")]
413            hasher: XxHash64::with_seed(0),
414        }
415    }
416}
417
418impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
419    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
420    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
421        Self {
422            uncompressed_data: None,
423            compressed_data: None,
424            dictionary: None,
425            dictionary_entropy_cache: None,
426            source_size_hint: None,
427            state: CompressState {
428                matcher,
429                last_huff_table: None,
430                fse_tables: FseTables::new(),
431                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
432                offset_hist: [1, 4, 8],
433            },
434            compression_level,
435            #[cfg(feature = "hash")]
436            hasher: XxHash64::with_seed(0),
437        }
438    }
439
440    /// Before calling [FrameCompressor::compress] you need to set the source.
441    ///
442    /// This is the data that is compressed and written into the drain.
443    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
444        self.uncompressed_data.replace(uncompressed_data)
445    }
446
447    /// Before calling [FrameCompressor::compress] you need to set the drain.
448    ///
449    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
450    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
451        self.compressed_data.replace(compressed_data)
452    }
453
454    /// Provide a hint about the total uncompressed size for the next frame.
455    ///
456    /// When set, the encoder selects smaller hash tables and windows for
457    /// small inputs, matching the C zstd source-size-class behavior.
458    ///
459    /// This hint applies only to frame payload bytes (`size`). Dictionary
460    /// history is primed separately and does not inflate the hinted size or
461    /// advertised frame window.
462    /// Must be called before [`compress`](Self::compress).
463    pub fn set_source_size_hint(&mut self, size: u64) {
464        self.source_size_hint = Some(size);
465    }
466
467    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
468    ///
469    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
470    /// All compressed blocks are buffered in memory so that the frame header can include the
471    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
472    /// entire frame — header, blocks, and optional checksum — is then written to the drain
473    /// at the end. This means peak memory usage is O(compressed_size).
474    ///
475    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
476    /// [Read::take] function
477    pub fn compress(&mut self) {
478        let initial_size_hint = self.source_size_hint;
479        let source_size_hint_known = initial_size_hint.is_some();
480        let use_dictionary_state =
481            !matches!(self.compression_level, CompressionLevel::Uncompressed)
482                && self.state.matcher.supports_dictionary_priming()
483                && self.dictionary.is_some();
484        if let Some(size_hint) = self.source_size_hint.take() {
485            // Keep source-size hint scoped to payload bytes; dictionary priming
486            // is applied separately and should not force larger matcher sizing.
487            self.state.matcher.set_source_size_hint(size_hint);
488        }
489        // Clearing buffers to allow re-using of the compressor
490        self.state.matcher.reset(self.compression_level);
491        self.state.offset_hist = [1, 4, 8];
492        let cached_entropy = if use_dictionary_state {
493            self.dictionary_entropy_cache.as_ref()
494        } else {
495            None
496        };
497        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
498            // This state drives sequence encoding, while matcher priming below updates
499            // the match generator's internal repeat-offset history for match finding.
500            self.state.offset_hist = dict.offset_hist;
501            self.state
502                .matcher
503                .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
504        }
505        if let Some(cache) = cached_entropy {
506            self.state.last_huff_table.clone_from(&cache.huff);
507        } else {
508            self.state.last_huff_table = None;
509        }
510        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
511        // reusing existing allocations where possible instead of reallocating every frame.
512        if let Some(cache) = cached_entropy {
513            self.state
514                .fse_tables
515                .ll_previous
516                .clone_from(&cache.ll_previous);
517            self.state
518                .fse_tables
519                .ml_previous
520                .clone_from(&cache.ml_previous);
521            self.state
522                .fse_tables
523                .of_previous
524                .clone_from(&cache.of_previous);
525        } else {
526            self.state.fse_tables.ll_previous = None;
527            self.state.fse_tables.ml_previous = None;
528            self.state.fse_tables.of_previous = None;
529        }
530        let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
531            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
532            _ => None,
533        });
534        let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
535            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
536            _ => None,
537        });
538        let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
539            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
540            _ => None,
541        });
542        self.state.matcher.seed_dictionary_entropy(
543            self.state.last_huff_table.as_ref(),
544            ll_entropy,
545            ml_entropy,
546            of_entropy,
547        );
548        #[cfg(feature = "hash")]
549        {
550            self.hasher = XxHash64::with_seed(0);
551        }
552        let source = self.uncompressed_data.as_mut().unwrap();
553        let drain = self.compressed_data.as_mut().unwrap();
554        let window_size = self.state.matcher.window_size();
555        assert!(
556            window_size != 0,
557            "matcher reported window_size == 0, which is invalid"
558        );
559        // Accumulate all compressed blocks; the frame header is written
560        // after all input has been read so that Frame_Content_Size is
561        // known. The default seed is one donor block; smaller seeds for
562        // small payloads avoid pinning a full block worth of bytes when
563        // the compressed output fits in a few hundred bytes. For larger
564        // inputs the default seed amortises the first few `Vec::extend`
565        // doublings cheaply and the `peak - default_seed` residue is
566        // dominated by internal `compress_block_encoded` buffers anyway,
567        // so changing it produces no measurable savings.
568        //
569        // Seed-size tiers (mirrors donor `ZSTD_CStreamOutSize` naming):
570        //
571        // * `ALL_BLOCKS_TINY_CAP` — payload ≤ this size, seed equals
572        //   payload bound; ≥ everything compressed output could need
573        //   for a tiny input.
574        // * `ALL_BLOCKS_SMALL_CAP` — small-input seed picked to absorb
575        //   one or two doublings without over-allocating.
576        // * `ALL_BLOCKS_DEFAULT_CAP` — one donor block; the value the
577        //   rest of the encoder is sized around.
578        const ALL_BLOCKS_TINY_THRESHOLD: u64 = 4 * 1024;
579        const ALL_BLOCKS_SMALL_THRESHOLD: u64 = 64 * 1024;
580        const ALL_BLOCKS_TINY_CAP: usize = 4 * 1024;
581        const ALL_BLOCKS_SMALL_CAP: usize = 16 * 1024;
582        const ALL_BLOCKS_DEFAULT_CAP: usize = 130 * 1024;
583        let initial_all_blocks_cap = match initial_size_hint {
584            Some(h) if h <= ALL_BLOCKS_TINY_THRESHOLD => ALL_BLOCKS_TINY_CAP,
585            Some(h) if h <= ALL_BLOCKS_SMALL_THRESHOLD => ALL_BLOCKS_SMALL_CAP,
586            _ => ALL_BLOCKS_DEFAULT_CAP,
587        };
588        let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap);
589        let mut total_uncompressed: u64 = 0;
590        let mut pending_input: Vec<u8> = Vec::new();
591        let mut reached_eof = false;
592        let mut savings = 0i64;
593        // Compress block by block
594        loop {
595            // Read up to one donor block. When the pre-block splitter keeps a
596            // suffix, top it back up before compressing the next block, matching
597            // ZSTD_compress_frameChunk() over a contiguous input buffer.
598            let block_capacity = MAX_BLOCK_SIZE as usize;
599            let had_pending = !pending_input.is_empty();
600            let mut uncompressed_data = if had_pending {
601                core::mem::take(&mut pending_input)
602            } else {
603                self.state.matcher.get_next_space()
604            };
605            let mut filled = if had_pending {
606                uncompressed_data.len()
607            } else {
608                0
609            };
610            if uncompressed_data.len() < block_capacity {
611                uncompressed_data.resize(block_capacity, 0);
612            }
613            'read_loop: loop {
614                if reached_eof || filled == block_capacity {
615                    break 'read_loop;
616                }
617                let new_bytes = source
618                    .read(&mut uncompressed_data[filled..block_capacity])
619                    .unwrap();
620                if new_bytes == 0 {
621                    reached_eof = true;
622                    break 'read_loop;
623                }
624                filled += new_bytes;
625                total_uncompressed += new_bytes as u64;
626            }
627            uncompressed_data.truncate(filled);
628            let mut last_block = reached_eof;
629            let remaining_for_split = if reached_eof {
630                uncompressed_data.len()
631            } else {
632                block_capacity
633            };
634            if !matches!(self.compression_level, CompressionLevel::Uncompressed)
635                && uncompressed_data.len() == block_capacity
636            {
637                let block_len = donor_optimal_block_size(
638                    self.compression_level,
639                    &uncompressed_data,
640                    remaining_for_split,
641                    block_capacity,
642                    savings,
643                );
644                if block_len < uncompressed_data.len() {
645                    pending_input = uncompressed_data.split_off(block_len);
646                    // `split_off` returns a Vec whose capacity is typically
647                    // close to its length. Next iteration's `had_pending`
648                    // branch moves `pending_input` into `uncompressed_data`
649                    // and resizes to `block_capacity`, which would reallocate
650                    // from scratch on every pre-split. Pre-reserve here so
651                    // the resize stays in-place.
652                    if pending_input.capacity() < block_capacity {
653                        pending_input.reserve_exact(block_capacity - pending_input.len());
654                    }
655                    last_block = false;
656                }
657            }
658            // As we read, hash that data too
659            #[cfg(feature = "hash")]
660            self.hasher.write(&uncompressed_data);
661            // Special handling is needed for compression of a totally empty file
662            if uncompressed_data.is_empty() {
663                let header = BlockHeader {
664                    last_block: true,
665                    block_type: crate::blocks::block::BlockType::Raw,
666                    block_size: 0,
667                };
668                header.serialize(&mut all_blocks);
669                break;
670            }
671
672            match self.compression_level {
673                CompressionLevel::Uncompressed => {
674                    let header = BlockHeader {
675                        last_block,
676                        block_type: crate::blocks::block::BlockType::Raw,
677                        block_size: uncompressed_data.len().try_into().unwrap(),
678                    };
679                    header.serialize(&mut all_blocks);
680                    all_blocks.extend_from_slice(&uncompressed_data);
681                    savings +=
682                        uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
683                }
684                CompressionLevel::Fastest
685                | CompressionLevel::Default
686                | CompressionLevel::Better
687                | CompressionLevel::Best
688                | CompressionLevel::Level(_) => {
689                    let before_len = all_blocks.len();
690                    let block_len = uncompressed_data.len();
691                    compress_block_encoded(
692                        &mut self.state,
693                        self.compression_level,
694                        last_block,
695                        uncompressed_data,
696                        &mut all_blocks,
697                    );
698                    savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
699                }
700            }
701            if last_block && pending_input.is_empty() {
702                break;
703            }
704        }
705
706        // Now that total_uncompressed is known, write the frame header with FCS.
707        // Match the donor framing policy for pledged one-shot inputs: use a
708        // single-segment frame whenever the source fits the active window.
709        let single_segment = !use_dictionary_state
710            && source_size_hint_known
711            && total_uncompressed >= 512
712            && total_uncompressed <= window_size;
713        let header = FrameHeader {
714            frame_content_size: Some(total_uncompressed),
715            single_segment,
716            content_checksum: cfg!(feature = "hash"),
717            dictionary_id: if use_dictionary_state {
718                self.dictionary.as_ref().map(|dict| dict.id as u64)
719            } else {
720                None
721            },
722            window_size: if single_segment {
723                None
724            } else {
725                Some(window_size)
726            },
727        };
728        // Write the frame header and compressed blocks separately to avoid
729        // shifting the entire `all_blocks` buffer to prepend the header.
730        let mut header_buf: Vec<u8> = Vec::with_capacity(14);
731        header.serialize(&mut header_buf);
732        drain.write_all(&header_buf).unwrap();
733        drain.write_all(&all_blocks).unwrap();
734
735        // If the `hash` feature is enabled, then `content_checksum` is set to true in the header
736        // and a 32 bit hash is written at the end of the data.
737        #[cfg(feature = "hash")]
738        {
739            // Because we only have the data as a reader, we need to read all of it to calculate the checksum
740            // Possible TODO: create a wrapper around self.uncompressed data that hashes the data as it's read?
741            let content_checksum = self.hasher.finish();
742            drain
743                .write_all(&(content_checksum as u32).to_le_bytes())
744                .unwrap();
745        }
746    }
747
748    /// Get a mutable reference to the source
749    pub fn source_mut(&mut self) -> Option<&mut R> {
750        self.uncompressed_data.as_mut()
751    }
752
753    /// Get a mutable reference to the drain
754    pub fn drain_mut(&mut self) -> Option<&mut W> {
755        self.compressed_data.as_mut()
756    }
757
758    /// Get a reference to the source
759    pub fn source(&self) -> Option<&R> {
760        self.uncompressed_data.as_ref()
761    }
762
763    /// Get a reference to the drain
764    pub fn drain(&self) -> Option<&W> {
765        self.compressed_data.as_ref()
766    }
767
768    /// Retrieve the source
769    pub fn take_source(&mut self) -> Option<R> {
770        self.uncompressed_data.take()
771    }
772
773    /// Retrieve the drain
774    pub fn take_drain(&mut self) -> Option<W> {
775        self.compressed_data.take()
776    }
777
778    /// Before calling [FrameCompressor::compress] you can replace the matcher
779    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
780        core::mem::swap(&mut match_generator, &mut self.state.matcher);
781        match_generator
782    }
783
784    /// Before calling [FrameCompressor::compress] you can replace the compression level
785    pub fn set_compression_level(
786        &mut self,
787        compression_level: CompressionLevel,
788    ) -> CompressionLevel {
789        let old = self.compression_level;
790        self.compression_level = compression_level;
791        old
792    }
793
794    /// Get the current compression level
795    pub fn compression_level(&self) -> CompressionLevel {
796        self.compression_level
797    }
798
799    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
800    ///
801    /// In compressed modes, the dictionary id is written only when the active
802    /// matcher supports dictionary priming.
803    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
804    /// at encode time.
805    pub fn set_dictionary(
806        &mut self,
807        dictionary: crate::decoding::Dictionary,
808    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
809    {
810        if dictionary.id == 0 {
811            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
812        }
813        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
814            return Err(
815                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
816                    index: index as u8,
817                },
818            );
819        }
820        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
821            huff: dictionary.huf.table.to_encoder_table(),
822            ll_previous: dictionary
823                .fse
824                .literal_lengths
825                .to_encoder_table()
826                .map(|table| PreviousFseTable::Custom(Box::new(table))),
827            ml_previous: dictionary
828                .fse
829                .match_lengths
830                .to_encoder_table()
831                .map(|table| PreviousFseTable::Custom(Box::new(table))),
832            of_previous: dictionary
833                .fse
834                .offsets
835                .to_encoder_table()
836                .map(|table| PreviousFseTable::Custom(Box::new(table))),
837        });
838        Ok(self.dictionary.replace(dictionary))
839    }
840
841    /// Parse and attach a serialized dictionary blob.
842    pub fn set_dictionary_from_bytes(
843        &mut self,
844        raw_dictionary: &[u8],
845    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
846    {
847        let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
848        self.set_dictionary(dictionary)
849    }
850
851    /// Remove the attached dictionary.
852    pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
853        self.dictionary_entropy_cache = None;
854        self.dictionary.take()
855    }
856}
857
858#[cfg(test)]
859mod tests {
860    #[cfg(all(feature = "dict_builder", feature = "std"))]
861    use alloc::format;
862    use alloc::vec;
863
864    use super::FrameCompressor;
865    use crate::blocks::block::BlockType;
866    use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
867    use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
868    use crate::encoding::{Matcher, Sequence};
869    use alloc::vec::Vec;
870
871    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
872        let mut state = seed;
873        let mut data = Vec::with_capacity(len);
874        for _ in 0..len {
875            state = state
876                .wrapping_mul(6364136223846793005)
877                .wrapping_add(1442695040888963407);
878            data.push((state >> 33) as u8);
879        }
880        data
881    }
882
883    fn first_block_type(frame: &[u8]) -> BlockType {
884        let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
885        let mut decoder = block_decoder::new();
886        let (header, _) = decoder
887            .read_block_header(&frame[header_size as usize..])
888            .expect("block header should parse");
889        header.block_type
890    }
891
892    /// Frame content size is written correctly and C zstd can decompress the output.
893    #[cfg(feature = "std")]
894    #[test]
895    fn fcs_header_written_and_c_zstd_compatible() {
896        let levels = [
897            crate::encoding::CompressionLevel::Uncompressed,
898            crate::encoding::CompressionLevel::Fastest,
899            crate::encoding::CompressionLevel::Default,
900            crate::encoding::CompressionLevel::Better,
901            crate::encoding::CompressionLevel::Best,
902        ];
903        let fcs_2byte = vec![0xCDu8; 300]; // 300 bytes → 2-byte FCS (256..=65791 range)
904        let large = vec![0xABu8; 100_000];
905        let inputs: [&[u8]; 5] = [
906            &[],
907            &[0x00],
908            b"abcdefghijklmnopqrstuvwxy\n",
909            &fcs_2byte,
910            &large,
911        ];
912        for level in levels {
913            for data in &inputs {
914                let compressed = crate::encoding::compress_to_vec(*data, level);
915                // Verify FCS is present and correct
916                let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
917                    .unwrap()
918                    .0;
919                assert_eq!(
920                    header.frame_content_size(),
921                    data.len() as u64,
922                    "FCS mismatch for len={} level={:?}",
923                    data.len(),
924                    level,
925                );
926                // Confirm the FCS field is actually present in the header
927                // (not just the decoder returning 0 for absent FCS).
928                assert_ne!(
929                    header.descriptor.frame_content_size_bytes().unwrap(),
930                    0,
931                    "FCS field must be present for len={} level={:?}",
932                    data.len(),
933                    level,
934                );
935                // Verify C zstd can decompress
936                let mut decoded = Vec::new();
937                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
938                    |e| {
939                        panic!(
940                            "C zstd decode failed for len={} level={level:?}: {e}",
941                            data.len()
942                        )
943                    },
944                );
945                assert_eq!(
946                    decoded.as_slice(),
947                    *data,
948                    "C zstd roundtrip failed for len={}",
949                    data.len()
950                );
951            }
952        }
953    }
954
955    #[cfg(feature = "std")]
956    #[test]
957    fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
958        let data = vec![0xAB; 2047];
959        let compressed = {
960            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
961            compressor.set_source_size_hint(data.len() as u64);
962            compressor.set_source(data.as_slice());
963            let mut out = Vec::new();
964            compressor.set_drain(&mut out);
965            compressor.compress();
966            out
967        };
968
969        let mut decoded = Vec::new();
970        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
971        assert_eq!(decoded, data);
972    }
973
974    #[cfg(feature = "std")]
975    #[test]
976    fn small_hinted_default_frame_uses_single_segment_header() {
977        let data = generate_data(0xD15E_A5ED, 1024);
978        let compressed = {
979            let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
980            compressor.set_source_size_hint(data.len() as u64);
981            compressor.set_source(data.as_slice());
982            let mut out = Vec::new();
983            compressor.set_drain(&mut out);
984            compressor.compress();
985            out
986        };
987
988        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
989        assert!(
990            frame_header.descriptor.single_segment_flag(),
991            "small hinted default frames should use single-segment header for Rust/FFI parity"
992        );
993        assert_eq!(frame_header.frame_content_size(), data.len() as u64);
994        let mut decoded = Vec::new();
995        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
996            .expect("ffi decoder must accept single-segment small hinted default frame");
997        assert_eq!(decoded, data);
998    }
999
1000    #[cfg(feature = "std")]
1001    #[test]
1002    fn small_hinted_numeric_default_levels_use_single_segment_header() {
1003        let data = generate_data(0xA11C_E003, 1024);
1004        for level in [
1005            super::CompressionLevel::Level(0),
1006            super::CompressionLevel::Level(3),
1007        ] {
1008            let compressed = {
1009                let mut compressor = FrameCompressor::new(level);
1010                compressor.set_source_size_hint(data.len() as u64);
1011                compressor.set_source(data.as_slice());
1012                let mut out = Vec::new();
1013                compressor.set_drain(&mut out);
1014                compressor.compress();
1015                out
1016            };
1017
1018            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1019            assert!(
1020                frame_header.descriptor.single_segment_flag(),
1021                "small hinted numeric default level frames should use single-segment header (level={level:?})"
1022            );
1023            assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1024            let mut decoded = Vec::new();
1025            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1026                panic!(
1027                    "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
1028                )
1029            });
1030            assert_eq!(decoded, data);
1031        }
1032    }
1033
1034    #[cfg(feature = "std")]
1035    #[test]
1036    fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
1037        let levels = [
1038            super::CompressionLevel::Fastest,
1039            super::CompressionLevel::Default,
1040            super::CompressionLevel::Better,
1041            super::CompressionLevel::Best,
1042            super::CompressionLevel::Level(-1),
1043            super::CompressionLevel::Level(2),
1044            super::CompressionLevel::Level(3),
1045            super::CompressionLevel::Level(4),
1046            super::CompressionLevel::Level(11),
1047        ];
1048        let sizes = [
1049            511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
1050        ];
1051
1052        for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
1053            for &size in &sizes {
1054                let data = generate_data(seed + seed_idx as u64, size);
1055                for &level in &levels {
1056                    let compressed = {
1057                        let mut compressor = FrameCompressor::new(level);
1058                        compressor.set_source_size_hint(data.len() as u64);
1059                        compressor.set_source(data.as_slice());
1060                        let mut out = Vec::new();
1061                        compressor.set_drain(&mut out);
1062                        compressor.compress();
1063                        out
1064                    };
1065                    if matches!(size, 511 | 512) {
1066                        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1067                        assert_eq!(
1068                            frame_header.descriptor.single_segment_flag(),
1069                            size == 512,
1070                            "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1071                        );
1072                    }
1073
1074                    let mut decoded = Vec::new();
1075                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1076                        |e| {
1077                            panic!(
1078                                "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
1079                                seed + seed_idx as u64
1080                            )
1081                        },
1082                    );
1083                    assert_eq!(
1084                        decoded,
1085                        data,
1086                        "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
1087                        seed + seed_idx as u64
1088                    );
1089                }
1090            }
1091        }
1092    }
1093
1094    #[cfg(feature = "std")]
1095    #[test]
1096    fn hinted_levels_use_single_segment_header_symmetrically() {
1097        let levels = [
1098            super::CompressionLevel::Fastest,
1099            super::CompressionLevel::Default,
1100            super::CompressionLevel::Better,
1101            super::CompressionLevel::Best,
1102            super::CompressionLevel::Level(0),
1103            super::CompressionLevel::Level(2),
1104            super::CompressionLevel::Level(3),
1105            super::CompressionLevel::Level(4),
1106            super::CompressionLevel::Level(11),
1107        ];
1108        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1109            let size = 1024 + seed_idx * 97;
1110            let data = generate_data(seed, size);
1111            for &level in &levels {
1112                let compressed = {
1113                    let mut compressor = FrameCompressor::new(level);
1114                    compressor.set_source_size_hint(data.len() as u64);
1115                    compressor.set_source(data.as_slice());
1116                    let mut out = Vec::new();
1117                    compressor.set_drain(&mut out);
1118                    compressor.compress();
1119                    out
1120                };
1121                let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1122                assert!(
1123                    frame_header.descriptor.single_segment_flag(),
1124                    "hinted frame should be single-segment for level={level:?} size={}",
1125                    data.len()
1126                );
1127                assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1128                let mut decoded = Vec::new();
1129                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1130                    panic!(
1131                        "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
1132                        data.len()
1133                    )
1134                });
1135                assert_eq!(decoded, data);
1136            }
1137        }
1138    }
1139
1140    #[cfg(feature = "std")]
1141    #[test]
1142    fn hinted_levels_pin_511_512_single_segment_boundary() {
1143        let levels = [
1144            super::CompressionLevel::Fastest,
1145            super::CompressionLevel::Default,
1146            super::CompressionLevel::Better,
1147            super::CompressionLevel::Best,
1148            super::CompressionLevel::Level(0),
1149            super::CompressionLevel::Level(2),
1150            super::CompressionLevel::Level(3),
1151            super::CompressionLevel::Level(4),
1152            super::CompressionLevel::Level(11),
1153        ];
1154        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1155            for &size in &[511usize, 512] {
1156                let data = generate_data(seed + seed_idx as u64, size);
1157                for &level in &levels {
1158                    let compressed = {
1159                        let mut compressor = FrameCompressor::new(level);
1160                        compressor.set_source_size_hint(data.len() as u64);
1161                        compressor.set_source(data.as_slice());
1162                        let mut out = Vec::new();
1163                        compressor.set_drain(&mut out);
1164                        compressor.compress();
1165                        out
1166                    };
1167                    let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1168                    assert_eq!(
1169                        frame_header.descriptor.single_segment_flag(),
1170                        size == 512,
1171                        "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1172                    );
1173                    let mut decoded = Vec::new();
1174                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1175                        |e| {
1176                            panic!(
1177                                "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
1178                                seed + seed_idx as u64
1179                            )
1180                        },
1181                    );
1182                    assert_eq!(decoded, data);
1183                }
1184            }
1185        }
1186    }
1187
1188    #[cfg(feature = "std")]
1189    #[test]
1190    fn fastest_random_block_uses_raw_fast_path() {
1191        let data = generate_data(0xC0FF_EE11, 10 * 1024);
1192        let compressed =
1193            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
1194
1195        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1196
1197        let mut decoded = Vec::new();
1198        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1199        assert_eq!(decoded, data);
1200    }
1201
1202    #[cfg(feature = "std")]
1203    #[test]
1204    fn default_random_block_uses_raw_fast_path() {
1205        let data = generate_data(0xD15E_A5ED, 10 * 1024);
1206        let compressed =
1207            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
1208
1209        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1210
1211        let mut decoded = Vec::new();
1212        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1213        assert_eq!(decoded, data);
1214    }
1215
1216    #[cfg(feature = "std")]
1217    #[test]
1218    fn best_random_block_uses_raw_fast_path() {
1219        let data = generate_data(0xB35C_AFE1, 10 * 1024);
1220        let compressed =
1221            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
1222
1223        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1224
1225        let mut decoded = Vec::new();
1226        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1227        assert_eq!(decoded, data);
1228    }
1229
1230    #[cfg(feature = "std")]
1231    #[test]
1232    fn level2_random_block_uses_raw_fast_path() {
1233        let data = generate_data(0xA11C_E222, 10 * 1024);
1234        let compressed =
1235            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
1236
1237        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1238
1239        let mut decoded = Vec::new();
1240        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1241        assert_eq!(decoded, data);
1242    }
1243
1244    #[cfg(feature = "std")]
1245    #[test]
1246    fn better_random_block_uses_raw_fast_path() {
1247        let data = generate_data(0xBE77_E111, 10 * 1024);
1248        let compressed =
1249            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
1250
1251        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1252
1253        let mut decoded = Vec::new();
1254        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1255        assert_eq!(decoded, data);
1256    }
1257
1258    #[cfg(feature = "std")]
1259    #[test]
1260    fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
1261        let mut data = Vec::with_capacity(16 * 1024);
1262        const LINE: &[u8] =
1263            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1264        while data.len() < 16 * 1024 {
1265            let remaining = 16 * 1024 - data.len();
1266            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1267        }
1268
1269        fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
1270            let compressed = crate::encoding::compress_to_vec(data, level);
1271            assert_ne!(first_block_type(&compressed), BlockType::Raw);
1272            assert!(
1273                compressed.len() < data.len(),
1274                "compressible input should remain compressible for level={level:?}"
1275            );
1276            let mut decoded = Vec::new();
1277            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1278            assert_eq!(decoded, data);
1279        }
1280
1281        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
1282        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
1283        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
1284        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
1285        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
1286    }
1287
1288    #[cfg(feature = "std")]
1289    #[test]
1290    fn hinted_small_compressible_frames_use_single_segment_across_levels() {
1291        let mut data = Vec::with_capacity(4 * 1024);
1292        const LINE: &[u8] =
1293            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1294        while data.len() < 4 * 1024 {
1295            let remaining = 4 * 1024 - data.len();
1296            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1297        }
1298
1299        for level in [
1300            super::CompressionLevel::Fastest,
1301            super::CompressionLevel::Default,
1302            super::CompressionLevel::Better,
1303            super::CompressionLevel::Best,
1304            super::CompressionLevel::Level(0),
1305            super::CompressionLevel::Level(3),
1306            super::CompressionLevel::Level(4),
1307            super::CompressionLevel::Level(11),
1308        ] {
1309            let compressed = {
1310                let mut compressor = FrameCompressor::new(level);
1311                compressor.set_source_size_hint(data.len() as u64);
1312                compressor.set_source(data.as_slice());
1313                let mut out = Vec::new();
1314                compressor.set_drain(&mut out);
1315                compressor.compress();
1316                out
1317            };
1318            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1319            assert!(
1320                frame_header.descriptor.single_segment_flag(),
1321                "hinted small compressible frame should use single-segment (level={level:?})"
1322            );
1323            assert_ne!(
1324                first_block_type(&compressed),
1325                BlockType::Raw,
1326                "compressible hinted frame should stay off raw fast path (level={level:?})"
1327            );
1328            assert!(
1329                compressed.len() < data.len(),
1330                "compressible hinted frame should still shrink (level={level:?})"
1331            );
1332            let mut decoded = Vec::new();
1333            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1334                .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
1335            assert_eq!(decoded, data);
1336        }
1337    }
1338
1339    struct NoDictionaryMatcher {
1340        last_space: Vec<u8>,
1341        window_size: u64,
1342    }
1343
1344    impl NoDictionaryMatcher {
1345        fn new(window_size: u64) -> Self {
1346            Self {
1347                last_space: Vec::new(),
1348                window_size,
1349            }
1350        }
1351    }
1352
1353    impl Matcher for NoDictionaryMatcher {
1354        fn get_next_space(&mut self) -> Vec<u8> {
1355            vec![0; self.window_size as usize]
1356        }
1357
1358        fn get_last_space(&mut self) -> &[u8] {
1359            self.last_space.as_slice()
1360        }
1361
1362        fn commit_space(&mut self, space: Vec<u8>) {
1363            self.last_space = space;
1364        }
1365
1366        fn skip_matching(&mut self) {}
1367
1368        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1369            handle_sequence(Sequence::Literals {
1370                literals: self.last_space.as_slice(),
1371            });
1372        }
1373
1374        fn reset(&mut self, _level: super::CompressionLevel) {
1375            self.last_space.clear();
1376        }
1377
1378        fn window_size(&self) -> u64 {
1379            self.window_size
1380        }
1381    }
1382
1383    #[test]
1384    fn frame_starts_with_magic_num() {
1385        let mock_data = [1_u8, 2, 3].as_slice();
1386        let mut output: Vec<u8> = Vec::new();
1387        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1388        compressor.set_source(mock_data);
1389        compressor.set_drain(&mut output);
1390
1391        compressor.compress();
1392        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1393    }
1394
1395    #[test]
1396    fn very_simple_raw_compress() {
1397        let mock_data = [1_u8, 2, 3].as_slice();
1398        let mut output: Vec<u8> = Vec::new();
1399        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1400        compressor.set_source(mock_data);
1401        compressor.set_drain(&mut output);
1402
1403        compressor.compress();
1404    }
1405
1406    #[test]
1407    fn very_simple_compress() {
1408        let mut mock_data = vec![0; 1 << 17];
1409        mock_data.extend(vec![1; (1 << 17) - 1]);
1410        mock_data.extend(vec![2; (1 << 18) - 1]);
1411        mock_data.extend(vec![2; 1 << 17]);
1412        mock_data.extend(vec![3; (1 << 17) - 1]);
1413        let mut output: Vec<u8> = Vec::new();
1414        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1415        compressor.set_source(mock_data.as_slice());
1416        compressor.set_drain(&mut output);
1417
1418        compressor.compress();
1419
1420        let mut decoder = FrameDecoder::new();
1421        let mut decoded = Vec::with_capacity(mock_data.len());
1422        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1423        assert_eq!(mock_data, decoded);
1424
1425        let mut decoded = Vec::new();
1426        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1427        assert_eq!(mock_data, decoded);
1428    }
1429
1430    #[test]
1431    fn rle_compress() {
1432        let mock_data = vec![0; 1 << 19];
1433        let mut output: Vec<u8> = Vec::new();
1434        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1435        compressor.set_source(mock_data.as_slice());
1436        compressor.set_drain(&mut output);
1437
1438        compressor.compress();
1439
1440        let mut decoder = FrameDecoder::new();
1441        let mut decoded = Vec::with_capacity(mock_data.len());
1442        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1443        assert_eq!(mock_data, decoded);
1444    }
1445
1446    #[test]
1447    fn aaa_compress() {
1448        let mock_data = vec![0, 1, 3, 4, 5];
1449        let mut output: Vec<u8> = Vec::new();
1450        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1451        compressor.set_source(mock_data.as_slice());
1452        compressor.set_drain(&mut output);
1453
1454        compressor.compress();
1455
1456        let mut decoder = FrameDecoder::new();
1457        let mut decoded = Vec::with_capacity(mock_data.len());
1458        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1459        assert_eq!(mock_data, decoded);
1460
1461        let mut decoded = Vec::new();
1462        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1463        assert_eq!(mock_data, decoded);
1464    }
1465
1466    #[test]
1467    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1468        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1469        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1470        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1471
1472        let mut data = Vec::new();
1473        for _ in 0..8 {
1474            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1475        }
1476
1477        let mut with_dict = Vec::new();
1478        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1479        let previous = compressor
1480            .set_dictionary_from_bytes(dict_raw)
1481            .expect("dictionary bytes should parse");
1482        assert!(
1483            previous.is_none(),
1484            "first dictionary insert should return None"
1485        );
1486        assert_eq!(
1487            compressor
1488                .set_dictionary(dict_for_encoder)
1489                .expect("valid dictionary should attach")
1490                .expect("set_dictionary_from_bytes inserted previous dictionary")
1491                .id,
1492            dict_for_decoder.id
1493        );
1494        compressor.set_source(data.as_slice());
1495        compressor.set_drain(&mut with_dict);
1496        compressor.compress();
1497
1498        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1499            .expect("encoded stream should have a frame header");
1500        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1501
1502        let mut decoder = FrameDecoder::new();
1503        let mut missing_dict_target = Vec::with_capacity(data.len());
1504        let err = decoder
1505            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1506            .unwrap_err();
1507        assert!(
1508            matches!(
1509                &err,
1510                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1511            ),
1512            "dict-compressed stream should require dictionary id, got: {err:?}"
1513        );
1514
1515        let mut decoder = FrameDecoder::new();
1516        decoder.add_dict(dict_for_decoder).unwrap();
1517        let mut decoded = Vec::with_capacity(data.len());
1518        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1519        assert_eq!(decoded, data);
1520
1521        let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1522        let mut ffi_decoded = Vec::with_capacity(data.len());
1523        let ffi_written = ffi_decoder
1524            .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1525            .unwrap();
1526        assert_eq!(ffi_written, data.len());
1527        assert_eq!(ffi_decoded, data);
1528    }
1529
1530    #[cfg(all(feature = "dict_builder", feature = "std"))]
1531    #[test]
1532    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1533        use std::io::Cursor;
1534
1535        let mut training = Vec::new();
1536        for idx in 0..256u32 {
1537            training.extend_from_slice(
1538                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1539            );
1540        }
1541        let mut raw_dict = Vec::new();
1542        crate::dictionary::create_raw_dict_from_source(
1543            Cursor::new(training.as_slice()),
1544            training.len(),
1545            &mut raw_dict,
1546            4096,
1547        )
1548        .expect("dict_builder training should succeed");
1549        assert!(
1550            !raw_dict.is_empty(),
1551            "dict_builder produced an empty dictionary"
1552        );
1553
1554        let dict_id = 0xD1C7_0008;
1555        let encoder_dict =
1556            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1557        let decoder_dict =
1558            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1559
1560        let mut payload = Vec::new();
1561        for idx in 0..96u32 {
1562            payload.extend_from_slice(
1563                format!(
1564                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1565                )
1566                .as_bytes(),
1567            );
1568        }
1569
1570        let mut without_dict = Vec::new();
1571        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1572        baseline.set_source(payload.as_slice());
1573        baseline.set_drain(&mut without_dict);
1574        baseline.compress();
1575
1576        let mut with_dict = Vec::new();
1577        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1578        compressor
1579            .set_dictionary(encoder_dict)
1580            .expect("valid dict_builder dictionary should attach");
1581        compressor.set_source(payload.as_slice());
1582        compressor.set_drain(&mut with_dict);
1583        compressor.compress();
1584
1585        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1586            .expect("encoded stream should have a frame header");
1587        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1588        let mut decoder = FrameDecoder::new();
1589        decoder.add_dict(decoder_dict).unwrap();
1590        let mut decoded = Vec::with_capacity(payload.len());
1591        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1592        assert_eq!(decoded, payload);
1593        assert!(
1594            with_dict.len() < without_dict.len(),
1595            "trained dictionary should improve compression for this small payload"
1596        );
1597    }
1598
1599    #[test]
1600    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1601        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1602        let mut output = Vec::new();
1603        let input = b"";
1604
1605        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1606        let previous = compressor
1607            .set_dictionary_from_bytes(dict_raw)
1608            .expect("dictionary bytes should parse");
1609        assert!(previous.is_none());
1610
1611        compressor.set_source(input.as_slice());
1612        compressor.set_drain(&mut output);
1613        compressor.compress();
1614
1615        assert!(
1616            compressor.state.last_huff_table.is_some(),
1617            "dictionary entropy should seed previous huffman table before first block"
1618        );
1619        assert!(
1620            compressor.state.fse_tables.ll_previous.is_some(),
1621            "dictionary entropy should seed previous ll table before first block"
1622        );
1623        assert!(
1624            compressor.state.fse_tables.ml_previous.is_some(),
1625            "dictionary entropy should seed previous ml table before first block"
1626        );
1627        assert!(
1628            compressor.state.fse_tables.of_previous.is_some(),
1629            "dictionary entropy should seed previous of table before first block"
1630        );
1631    }
1632
1633    #[test]
1634    fn set_dictionary_rejects_zero_dictionary_id() {
1635        let invalid = crate::decoding::Dictionary {
1636            id: 0,
1637            fse: crate::decoding::scratch::FSEScratch::new(),
1638            huf: crate::decoding::scratch::HuffmanScratch::new(),
1639            dict_content: vec![1, 2, 3],
1640            offset_hist: [1, 4, 8],
1641        };
1642
1643        let mut compressor: FrameCompressor<
1644            &[u8],
1645            Vec<u8>,
1646            crate::encoding::match_generator::MatchGeneratorDriver,
1647        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1648        let result = compressor.set_dictionary(invalid);
1649        assert!(matches!(
1650            result,
1651            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1652        ));
1653    }
1654
1655    #[test]
1656    fn set_dictionary_rejects_zero_repeat_offsets() {
1657        let invalid = crate::decoding::Dictionary {
1658            id: 1,
1659            fse: crate::decoding::scratch::FSEScratch::new(),
1660            huf: crate::decoding::scratch::HuffmanScratch::new(),
1661            dict_content: vec![1, 2, 3],
1662            offset_hist: [0, 4, 8],
1663        };
1664
1665        let mut compressor: FrameCompressor<
1666            &[u8],
1667            Vec<u8>,
1668            crate::encoding::match_generator::MatchGeneratorDriver,
1669        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1670        let result = compressor.set_dictionary(invalid);
1671        assert!(matches!(
1672            result,
1673            Err(
1674                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1675                    index: 0
1676                }
1677            )
1678        ));
1679    }
1680
1681    #[test]
1682    fn uncompressed_mode_does_not_require_dictionary() {
1683        let dict_id = 0xABCD_0001;
1684        let dict =
1685            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1686                .expect("raw dictionary should be valid");
1687
1688        let payload = b"plain-bytes-that-should-stay-raw";
1689        let mut output = Vec::new();
1690        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1691        compressor
1692            .set_dictionary(dict)
1693            .expect("dictionary should attach in uncompressed mode");
1694        compressor.set_source(payload.as_slice());
1695        compressor.set_drain(&mut output);
1696        compressor.compress();
1697
1698        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1699            .expect("encoded frame should have a header");
1700        assert_eq!(
1701            frame_header.dictionary_id(),
1702            None,
1703            "raw/uncompressed frames must not advertise dictionary dependency"
1704        );
1705
1706        let mut decoder = FrameDecoder::new();
1707        let mut decoded = Vec::with_capacity(payload.len());
1708        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1709        assert_eq!(decoded, payload);
1710    }
1711
1712    #[test]
1713    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
1714        use crate::encoding::match_generator::MatchGeneratorDriver;
1715
1716        let dict_id = 0xABCD_0002;
1717        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1718            .expect("raw dictionary should be valid");
1719        let dict_for_decoder =
1720            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1721                .expect("raw dictionary should be valid");
1722
1723        // Payload must exceed the encoder's advertised window (128 KiB for
1724        // Fastest) so the test actually exercises cross-window-boundary behavior.
1725        let payload = b"abcdefgh".repeat(128 * 1024 / 8 + 64);
1726        let matcher = MatchGeneratorDriver::new(1024, 1);
1727
1728        let mut no_dict_output = Vec::new();
1729        let mut no_dict_compressor =
1730            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1731        no_dict_compressor.set_source(payload.as_slice());
1732        no_dict_compressor.set_drain(&mut no_dict_output);
1733        no_dict_compressor.compress();
1734        let (no_dict_frame_header, _) =
1735            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
1736                .expect("baseline frame should have a header");
1737        let no_dict_window = no_dict_frame_header
1738            .window_size()
1739            .expect("window size should be present");
1740
1741        let mut output = Vec::new();
1742        let matcher = MatchGeneratorDriver::new(1024, 1);
1743        let mut compressor =
1744            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1745        compressor
1746            .set_dictionary(dict)
1747            .expect("dictionary should attach");
1748        compressor.set_source(payload.as_slice());
1749        compressor.set_drain(&mut output);
1750        compressor.compress();
1751
1752        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1753            .expect("encoded frame should have a header");
1754        let advertised_window = frame_header
1755            .window_size()
1756            .expect("window size should be present");
1757        assert_eq!(
1758            advertised_window, no_dict_window,
1759            "dictionary priming must not inflate advertised window size"
1760        );
1761        assert!(
1762            payload.len() > advertised_window as usize,
1763            "test must cross the advertised window boundary"
1764        );
1765
1766        let mut decoder = FrameDecoder::new();
1767        decoder.add_dict(dict_for_decoder).unwrap();
1768        let mut decoded = Vec::with_capacity(payload.len());
1769        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1770        assert_eq!(decoded, payload);
1771    }
1772
1773    #[test]
1774    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
1775        let dict_id = 0xABCD_0004;
1776        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1777        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1778        let dict_for_decoder =
1779            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1780        let payload = b"abcdabcdabcdabcd".repeat(128);
1781
1782        let mut hinted_output = Vec::new();
1783        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1784        hinted.set_dictionary(dict).unwrap();
1785        hinted.set_source_size_hint(1);
1786        hinted.set_source(payload.as_slice());
1787        hinted.set_drain(&mut hinted_output);
1788        hinted.compress();
1789
1790        let mut no_hint_output = Vec::new();
1791        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1792        no_hint
1793            .set_dictionary(
1794                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1795                    .unwrap(),
1796            )
1797            .unwrap();
1798        no_hint.set_source(payload.as_slice());
1799        no_hint.set_drain(&mut no_hint_output);
1800        no_hint.compress();
1801
1802        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1803            .expect("encoded frame should have a header")
1804            .0
1805            .window_size()
1806            .expect("window size should be present");
1807        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1808            .expect("encoded frame should have a header")
1809            .0
1810            .window_size()
1811            .expect("window size should be present");
1812        assert!(
1813            hinted_window <= no_hint_window,
1814            "source-size hint should not increase advertised window with dictionary priming",
1815        );
1816
1817        let mut decoder = FrameDecoder::new();
1818        decoder.add_dict(dict_for_decoder).unwrap();
1819        let mut decoded = Vec::with_capacity(payload.len());
1820        decoder
1821            .decode_all_to_vec(&hinted_output, &mut decoded)
1822            .unwrap();
1823        assert_eq!(decoded, payload);
1824    }
1825
1826    #[test]
1827    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
1828        let dict_id = 0xABCD_0005;
1829        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1830        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1831        let dict_for_decoder =
1832            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1833        let payload = b"abcd".repeat(1024); // 4 KiB payload
1834        let payload_len = payload.len() as u64;
1835
1836        let mut hinted_output = Vec::new();
1837        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1838        hinted.set_dictionary(dict).unwrap();
1839        hinted.set_source_size_hint(payload_len);
1840        hinted.set_source(payload.as_slice());
1841        hinted.set_drain(&mut hinted_output);
1842        hinted.compress();
1843
1844        let mut no_hint_output = Vec::new();
1845        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1846        no_hint
1847            .set_dictionary(
1848                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1849                    .unwrap(),
1850            )
1851            .unwrap();
1852        no_hint.set_source(payload.as_slice());
1853        no_hint.set_drain(&mut no_hint_output);
1854        no_hint.compress();
1855
1856        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1857            .expect("encoded frame should have a header")
1858            .0
1859            .window_size()
1860            .expect("window size should be present");
1861        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1862            .expect("encoded frame should have a header")
1863            .0
1864            .window_size()
1865            .expect("window size should be present");
1866        assert!(
1867            hinted_window <= no_hint_window,
1868            "source-size hint should not increase advertised window with dictionary priming",
1869        );
1870
1871        let mut decoder = FrameDecoder::new();
1872        decoder.add_dict(dict_for_decoder).unwrap();
1873        let mut decoded = Vec::with_capacity(payload.len());
1874        decoder
1875            .decode_all_to_vec(&hinted_output, &mut decoded)
1876            .unwrap();
1877        assert_eq!(decoded, payload);
1878    }
1879
1880    #[test]
1881    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
1882        let dict_id = 0xABCD_0003;
1883        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1884            .expect("raw dictionary should be valid");
1885        let payload = b"abcdefghabcdefgh";
1886
1887        let mut output = Vec::new();
1888        let matcher = NoDictionaryMatcher::new(64);
1889        let mut compressor =
1890            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1891        compressor
1892            .set_dictionary(dict)
1893            .expect("dictionary should attach");
1894        compressor.set_source(payload.as_slice());
1895        compressor.set_drain(&mut output);
1896        compressor.compress();
1897
1898        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1899            .expect("encoded frame should have a header");
1900        assert_eq!(
1901            frame_header.dictionary_id(),
1902            None,
1903            "matchers that do not support dictionary priming must not advertise dictionary dependency"
1904        );
1905
1906        let mut decoder = FrameDecoder::new();
1907        let mut decoded = Vec::with_capacity(payload.len());
1908        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1909        assert_eq!(decoded, payload);
1910    }
1911
1912    #[cfg(feature = "hash")]
1913    #[test]
1914    fn checksum_two_frames_reused_compressor() {
1915        // Compress the same data twice using the same compressor and verify that:
1916        // 1. The checksum written in each frame matches what the decoder calculates.
1917        // 2. The hasher is correctly reset between frames (no cross-contamination).
1918        //    If the hasher were NOT reset, the second frame's calculated checksum
1919        //    would differ from the one stored in the frame data, causing assert_eq to fail.
1920        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
1921
1922        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1923
1924        // --- Frame 1 ---
1925        let mut compressed1 = Vec::new();
1926        compressor.set_source(data.as_slice());
1927        compressor.set_drain(&mut compressed1);
1928        compressor.compress();
1929
1930        // --- Frame 2 (reuse the same compressor) ---
1931        let mut compressed2 = Vec::new();
1932        compressor.set_source(data.as_slice());
1933        compressor.set_drain(&mut compressed2);
1934        compressor.compress();
1935
1936        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
1937            let mut decoder = FrameDecoder::new();
1938            let mut source = compressed;
1939            decoder.reset(&mut source).unwrap();
1940            while !decoder.is_finished() {
1941                decoder
1942                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
1943                    .unwrap();
1944            }
1945            let mut decoded = Vec::new();
1946            decoder.collect_to_writer(&mut decoded).unwrap();
1947            (
1948                decoded,
1949                decoder.get_checksum_from_data(),
1950                decoder.get_calculated_checksum(),
1951            )
1952        }
1953
1954        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
1955        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
1956        assert_eq!(
1957            chksum_from_data1, chksum_calculated1,
1958            "frame 1: checksum mismatch"
1959        );
1960
1961        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
1962        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
1963        assert_eq!(
1964            chksum_from_data2, chksum_calculated2,
1965            "frame 2: checksum mismatch"
1966        );
1967
1968        // Same data compressed twice must produce the same checksum.
1969        // If state leaked across frames, the second calculated checksum would differ.
1970        assert_eq!(
1971            chksum_from_data1, chksum_from_data2,
1972            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
1973        );
1974    }
1975
1976    #[cfg(feature = "std")]
1977    #[test]
1978    fn fuzz_targets() {
1979        use std::io::Read;
1980        fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
1981            let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
1982            let mut result: Vec<u8> = Vec::new();
1983            decoder.read_to_end(&mut result).expect("Decoding failed");
1984            result
1985        }
1986
1987        fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
1988            let mut decoder = crate::decoding::FrameDecoder::new();
1989            decoder.reset(&mut data).unwrap();
1990            let mut result = vec![];
1991            while !decoder.is_finished() || decoder.can_collect() > 0 {
1992                decoder
1993                    .decode_blocks(
1994                        &mut data,
1995                        crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
1996                    )
1997                    .unwrap();
1998                decoder.collect_to_writer(&mut result).unwrap();
1999            }
2000            result
2001        }
2002
2003        fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2004            zstd::stream::encode_all(std::io::Cursor::new(data), 3)
2005        }
2006
2007        fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2008            let mut input = Vec::new();
2009            data.read_to_end(&mut input).unwrap();
2010
2011            crate::encoding::compress_to_vec(
2012                input.as_slice(),
2013                crate::encoding::CompressionLevel::Uncompressed,
2014            )
2015        }
2016
2017        fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2018            let mut input = Vec::new();
2019            data.read_to_end(&mut input).unwrap();
2020
2021            crate::encoding::compress_to_vec(
2022                input.as_slice(),
2023                crate::encoding::CompressionLevel::Fastest,
2024            )
2025        }
2026
2027        fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2028            let mut output = Vec::new();
2029            zstd::stream::copy_decode(data, &mut output)?;
2030            Ok(output)
2031        }
2032        if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
2033            for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
2034                if file.as_ref().unwrap().file_type().unwrap().is_file() {
2035                    let data = std::fs::read(file.unwrap().path()).unwrap();
2036                    let data = data.as_slice();
2037                    // Decoding
2038                    let compressed = encode_zstd(data).unwrap();
2039                    let decoded = decode_szstd(&mut compressed.as_slice());
2040                    let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
2041                    assert!(
2042                        decoded == data,
2043                        "Decoded data did not match the original input during decompression"
2044                    );
2045                    assert_eq!(
2046                        decoded2, data,
2047                        "Decoded data did not match the original input during decompression"
2048                    );
2049
2050                    // Encoding
2051                    // Uncompressed encoding
2052                    let mut input = data;
2053                    let compressed = encode_szstd_uncompressed(&mut input);
2054                    let decoded = decode_zstd(&compressed).unwrap();
2055                    assert_eq!(
2056                        decoded, data,
2057                        "Decoded data did not match the original input during compression"
2058                    );
2059                    // Compressed encoding
2060                    let mut input = data;
2061                    let compressed = encode_szstd_compressed(&mut input);
2062                    let decoded = decode_zstd(&compressed).unwrap();
2063                    assert_eq!(
2064                        decoded, data,
2065                        "Decoded data did not match the original input during compression"
2066                    );
2067                }
2068            }
2069        }
2070    }
2071
2072    /// Homogeneous input — every byte the same — must NOT be split:
2073    /// both border histograms are identical (all 512 hits on a single
2074    /// slot), so `presplit_fingerprints_differ` returns `false` and the
2075    /// function takes the early-return path at
2076    /// `zstd_preSplit.c:214` returning `blockSize`.
2077    #[test]
2078    fn donor_split_block_from_borders_keeps_homogeneous_block() {
2079        let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
2080        let split = super::donor_split_block_from_borders(&block);
2081        assert_eq!(split, MAX_BLOCK_SIZE as usize);
2082    }
2083
2084    /// Heterogeneous input — first half all zeros, second half a
2085    /// counter sequence — has clearly distinguishable border
2086    /// histograms, so the borders heuristic decides to split.
2087    ///
2088    /// The transition sits at exactly the block midpoint, so the
2089    /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
2090    /// zeros + half counter values. That makes it roughly
2091    /// equidistant from both border fingerprints — the
2092    /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
2093    /// branch fires and the heuristic returns the midpoint (64 KiB)
2094    /// per `zstd_preSplit.c:222`. The test asserts the exact value
2095    /// rather than just "one of {32K, 64K, 96K}" so a regression
2096    /// to a different quantised arm cannot silently slip through.
2097    #[test]
2098    fn donor_split_block_from_borders_returns_midpoint_for_centred_transition() {
2099        let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
2100        for (i, byte) in block
2101            .iter_mut()
2102            .enumerate()
2103            .skip(MAX_BLOCK_SIZE as usize / 2)
2104        {
2105            *byte = (i % 251 + 1) as u8;
2106        }
2107        let split = super::donor_split_block_from_borders(&block);
2108        assert_eq!(
2109            split,
2110            64 * 1024,
2111            "centred-transition fixture must take the symmetric \
2112             midpoint arm (`abs_diff < min_distance`), got {split}"
2113        );
2114    }
2115
2116    /// `donor_pre_split_level` maps mid-range levels to the cheap
2117    /// borders heuristic and high levels to the byChunks path. Levels
2118    /// below 11 stay unsplit so the splitter never runs on fast /
2119    /// default presets where its per-block cost would dominate.
2120    #[test]
2121    fn donor_pre_split_level_dispatches_by_compression_level() {
2122        use crate::encoding::CompressionLevel;
2123        assert_eq!(
2124            super::donor_pre_split_level(CompressionLevel::Fastest),
2125            None
2126        );
2127        assert_eq!(
2128            super::donor_pre_split_level(CompressionLevel::Default),
2129            None
2130        );
2131        assert_eq!(super::donor_pre_split_level(CompressionLevel::Better), None);
2132        assert_eq!(
2133            super::donor_pre_split_level(CompressionLevel::Level(7)),
2134            None
2135        );
2136        assert_eq!(
2137            super::donor_pre_split_level(CompressionLevel::Level(11)),
2138            Some(0)
2139        );
2140        assert_eq!(
2141            super::donor_pre_split_level(CompressionLevel::Level(15)),
2142            Some(0)
2143        );
2144        assert_eq!(
2145            super::donor_pre_split_level(CompressionLevel::Level(16)),
2146            Some(4)
2147        );
2148        assert_eq!(
2149            super::donor_pre_split_level(CompressionLevel::Level(22)),
2150            Some(4)
2151        );
2152    }
2153
2154    /// End-to-end: a 256 KB heterogeneous payload compressed at
2155    /// Level(13) (borders heuristic active) round-trips through the
2156    /// crate's own decoder. The pre-split path runs over the first
2157    /// 128 KB block and emits two consecutive sub-blocks; the second
2158    /// 128 KB block goes through the splitter on its own. The test
2159    /// proves the split decisions do not corrupt the frame bitstream.
2160    #[test]
2161    fn level_13_borders_split_roundtrips_through_own_decoder() {
2162        use crate::encoding::CompressionLevel;
2163        let mut data = vec![0u8; 256 * 1024];
2164        // First 128 KB: low-entropy repeating run; second 128 KB:
2165        // counter sequence — clearly distinct border histograms.
2166        for (i, byte) in data.iter_mut().enumerate() {
2167            *byte = if i < 128 * 1024 {
2168                (i & 0x07) as u8
2169            } else {
2170                (i % 251 + 1) as u8
2171            };
2172        }
2173
2174        let mut compressed = Vec::new();
2175        let mut compressor = FrameCompressor::new(CompressionLevel::Level(13));
2176        compressor.set_source(data.as_slice());
2177        compressor.set_drain(&mut compressed);
2178        compressor.compress();
2179
2180        let mut decoder = FrameDecoder::new();
2181        let mut source = compressed.as_slice();
2182        decoder
2183            .reset(&mut source)
2184            .expect("frame header should parse");
2185        while !decoder.is_finished() {
2186            decoder
2187                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2188                .expect("decode should succeed");
2189        }
2190        let mut decoded = Vec::with_capacity(data.len());
2191        decoder.collect_to_writer(&mut decoded).unwrap();
2192        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
2193    }
2194}