Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
21///
22/// `FrameCompressor` will generally be used by:
23/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
24/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
25///
26/// # Examples
27/// ```
28/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
29/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
30/// let mut output = std::vec::Vec::new();
31/// // Initialize a compressor.
32/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
33/// compressor.set_source(mock_data);
34/// compressor.set_drain(&mut output);
35///
36/// // `compress` writes the compressed output into the provided buffer.
37/// compressor.compress();
38/// ```
39pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
40    uncompressed_data: Option<R>,
41    compressed_data: Option<W>,
42    compression_level: CompressionLevel,
43    dictionary: Option<crate::decoding::Dictionary>,
44    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
45    source_size_hint: Option<u64>,
46    state: CompressState<M>,
47    #[cfg(feature = "hash")]
48    hasher: XxHash64,
49}
50
51#[derive(Clone, Default)]
52struct CachedDictionaryEntropy {
53    huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
54    ll_previous: Option<PreviousFseTable>,
55    ml_previous: Option<PreviousFseTable>,
56    of_previous: Option<PreviousFseTable>,
57}
58
59#[derive(Clone)]
60pub(crate) enum PreviousFseTable {
61    // Default tables are immutable and already stored alongside the state, so
62    // repeating them only needs a lightweight marker instead of cloning FSETable.
63    Default,
64    Custom(Box<FSETable>),
65    Rle(u8),
66}
67
68impl PreviousFseTable {
69    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
70        match self {
71            Self::Default => Some(default),
72            Self::Custom(table) => Some(table),
73            Self::Rle(_) => None,
74        }
75    }
76}
77
78pub(crate) struct FseTables {
79    /// The three predefined LL/ML/OF tables are functions of
80    /// compile-time-constant distributions. The
81    /// [`fse_encoder::FseDefaultTable`] type alias resolves to
82    /// `&'static FSETable` when a process-wide cache is available
83    /// (atomic-pointer targets, or no-atomic targets with the
84    /// `critical-section` feature) and to `Box<FSETable>` on the
85    /// cache-less no-atomic path (one per-frame allocation, dropped
86    /// with the compressor — no `Box::leak`, no unbounded growth).
87    /// Both arms `Deref` to `FSETable`, so consumers in
88    /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
89    /// without seeing the per-target divergence.
90    pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
91    pub(crate) ll_previous: Option<PreviousFseTable>,
92    pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
93    pub(crate) ml_previous: Option<PreviousFseTable>,
94    pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
95    pub(crate) of_previous: Option<PreviousFseTable>,
96}
97
98impl FseTables {
99    pub fn new() -> Self {
100        Self {
101            ll_default: default_ll_table(),
102            ll_previous: None,
103            ml_default: default_ml_table(),
104            ml_previous: None,
105            of_default: default_of_table(),
106            of_previous: None,
107        }
108    }
109
110    /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
111    /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
112    /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
113    /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
114    /// through `Box`. Both arms yield `&FSETable` uniformly so
115    /// downstream consumers can stay cfg-agnostic.
116    #[inline]
117    #[allow(clippy::borrow_deref_ref)]
118    pub(crate) fn ll_default_ref(&self) -> &FSETable {
119        &*self.ll_default
120    }
121
122    /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
123    #[inline]
124    #[allow(clippy::borrow_deref_ref)]
125    pub(crate) fn ml_default_ref(&self) -> &FSETable {
126        &*self.ml_default
127    }
128
129    /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
130    #[inline]
131    #[allow(clippy::borrow_deref_ref)]
132    pub(crate) fn of_default_ref(&self) -> &FSETable {
133        &*self.of_default
134    }
135}
136
137const PRESPLIT_BLOCK_MIN: usize = 3500;
138const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
139const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
140const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
141const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
142const PRESPLIT_HASH_LOG_MAX: usize = 10;
143const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
144const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
145/// Donor `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
146/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
147/// drive the cheap border heuristic; a third one from the middle disambiguates
148/// where in the block the transition sits.
149const PRESPLIT_BORDERS_SEGMENT: usize = 512;
150
151#[derive(Clone)]
152struct PreSplitFingerprint {
153    events: [u32; PRESPLIT_HASH_TABLE_SIZE],
154    nb_events: usize,
155}
156
157impl Default for PreSplitFingerprint {
158    fn default() -> Self {
159        Self {
160            events: [0; PRESPLIT_HASH_TABLE_SIZE],
161            nb_events: 0,
162        }
163    }
164}
165
166fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
167    debug_assert!(hash_log >= 8);
168    if hash_log == 8 {
169        return bytes[0] as usize;
170    }
171    debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
172    let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
173    (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
174}
175
176fn presplit_record_fingerprint(
177    fp: &mut PreSplitFingerprint,
178    src: &[u8],
179    sampling_rate: usize,
180    hash_log: usize,
181) {
182    fp.events.fill(0);
183    fp.nb_events = 0;
184    if src.len() < 2 {
185        return;
186    }
187    let limit = src.len() - 1;
188    let mut n = 0usize;
189    while n < limit {
190        fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
191        n += sampling_rate;
192    }
193    // Donor parity: zstd_preSplit.c records the integer division, not the
194    // rounded-up number of sampled events from the loop above.
195    fp.nb_events += limit / sampling_rate;
196}
197
198/// Single-byte histogram pass — matches donor `HIST_add` over a small
199/// segment with `hashLog == 8` (the `hash2` shortcut at
200/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
201/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
202/// heuristic doesn't pay for that wider hash on its 512-byte windows.
203fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
204    fp.events.fill(0);
205    for &b in src {
206        fp.events[b as usize] += 1;
207    }
208    // Donor `HIST_add` returns the maximum symbol; the caller then sets
209    // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
210    fp.nb_events = src.len();
211}
212
213fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
214    let slots = 1usize << hash_log;
215    let mut distance = 0u64;
216    for idx in 0..slots {
217        let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
218        let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
219        distance = distance.saturating_add(left.abs_diff(right) as u64);
220    }
221    distance
222}
223
224fn presplit_fingerprints_differ(
225    reference: &PreSplitFingerprint,
226    new_fp: &PreSplitFingerprint,
227    penalty: i32,
228    hash_log: usize,
229) -> bool {
230    debug_assert!(reference.nb_events > 0);
231    debug_assert!(new_fp.nb_events > 0);
232    let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
233    let deviation = presplit_distance(reference, new_fp, hash_log);
234    let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
235        / PRESPLIT_THRESHOLD_PENALTY_RATE;
236    deviation >= threshold
237}
238
239fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
240    for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
241        acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
242    }
243    acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
244}
245
246fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
247    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
248    debug_assert!((1..=4).contains(&level));
249    let (sampling_rate, hash_log) = match level - 1 {
250        0 => (43, 8),
251        1 => (11, 9),
252        2 => (5, 10),
253        _ => (1, 10),
254    };
255
256    let mut past = PreSplitFingerprint::default();
257    let mut new_events = PreSplitFingerprint::default();
258    let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
259    presplit_record_fingerprint(
260        &mut past,
261        &block[..PRESPLIT_CHUNK_SIZE],
262        sampling_rate,
263        hash_log,
264    );
265    let mut pos = PRESPLIT_CHUNK_SIZE;
266    while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
267        presplit_record_fingerprint(
268            &mut new_events,
269            &block[pos..pos + PRESPLIT_CHUNK_SIZE],
270            sampling_rate,
271            hash_log,
272        );
273        if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
274            return pos;
275        }
276        presplit_merge_events(&mut past, &new_events);
277        if penalty > 0 {
278            penalty -= 1;
279        }
280        pos += PRESPLIT_CHUNK_SIZE;
281    }
282    block.len()
283}
284
285/// Donor port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
286/// Records two 512-byte byte-histograms — one from each end of a 128 KB
287/// block — and a third from the middle as a tie-breaker; returns either
288/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
289/// size when the two ends look indistinguishable. Cheaper than the
290/// chunk-based path because it touches at most 1.5 KB of input
291/// regardless of block size.
292fn donor_split_block_from_borders(block: &[u8]) -> usize {
293    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
294    let block_size = block.len();
295    let mut past = PreSplitFingerprint::default();
296    let mut new_fp = PreSplitFingerprint::default();
297    presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
298    presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
299    // Donor uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
300    // distance with no threshold padding (`zstd_preSplit.c:214`).
301    if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
302        return block_size;
303    }
304
305    let mut middle = PreSplitFingerprint::default();
306    let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
307    presplit_record_byte_histogram(
308        &mut middle,
309        &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
310    );
311
312    let dist_from_begin = presplit_distance(&past, &middle, 8);
313    let dist_from_end = presplit_distance(&new_fp, &middle, 8);
314    // Donor `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
315    // if the middle is roughly equidistant from both ends, the change
316    // sits near the centre — split at the midpoint.
317    let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
318    if dist_from_begin.abs_diff(dist_from_end) < min_distance {
319        return 64 * 1024;
320    }
321    // Larger `dist_from_begin` (i.e. `middle` farther from the head
322    // fingerprint, equivalently closer to the tail) means the new
323    // statistics already dominate the centre — the transition
324    // happened EARLY → emit a small 32 KB head and let the 96 KB
325    // tail absorb the rest. Inverse case: `dist_from_end` larger
326    // (middle still resembles the head) means the transition is
327    // LATE → emit a 96 KB head so the trailing 32 KB carries the
328    // new statistics alone.
329    if dist_from_begin > dist_from_end {
330        32 * 1024
331    } else {
332        96 * 1024
333    }
334}
335
336fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
337    match level {
338        // Donor `ZSTD_blockSplitter_level` table (`clevels.h`): cheap
339        // borders heuristic for lazy2 / btlazy2 strategies (levels
340        // 11..=15) — the splitter still pays for itself on
341        // heterogeneous payloads but the per-block cost stays bounded
342        // by two 512-byte histograms.
343        CompressionLevel::Level(11..=15) => Some(0),
344        // C zstd's default splitter level for btopt/btultra/btultra2 is 4
345        // (`ZSTD_splitBlock_byChunks` with internal level 3 — sampling
346        // rate 1, `hashLog` 10).
347        CompressionLevel::Level(16..=22) => Some(4),
348        _ => None,
349    }
350}
351
352pub(crate) fn donor_optimal_block_size(
353    level: CompressionLevel,
354    block: &[u8],
355    remaining_src_size: usize,
356    block_size_max: usize,
357    savings: i64,
358) -> usize {
359    let Some(split_level) = donor_pre_split_level(level) else {
360        return remaining_src_size.min(block_size_max);
361    };
362    if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
363        return remaining_src_size.min(block_size_max);
364    }
365    if savings < 3 {
366        return MAX_BLOCK_SIZE as usize;
367    }
368    if block.len() < MAX_BLOCK_SIZE as usize {
369        return remaining_src_size.min(block_size_max);
370    }
371    // Donor `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
372    // `split_level == 0` → cheap borders heuristic;
373    // `split_level == 1..=4` → byChunks with internal sampling level
374    // `split_level - 1`.
375    let raw_split = if split_level == 0 {
376        donor_split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
377    } else {
378        donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
379    };
380    raw_split
381        .max(PRESPLIT_BLOCK_MIN)
382        .min(MAX_BLOCK_SIZE as usize)
383}
384
385pub(crate) struct CompressState<M: Matcher> {
386    pub(crate) matcher: M,
387    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
388    pub(crate) fse_tables: FseTables,
389    pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
390    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
391    /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
392    pub(crate) offset_hist: [u32; 3],
393    /// Strategy tag resolved from the current `CompressionLevel` at every
394    /// `matcher.reset()` call. Used by the literal-compression gates
395    /// (`min_literals_to_compress`, `min_gain`) in
396    /// `encoding::blocks::compressed` to mirror donor's strategy-aware
397    /// thresholds (`zstd_compress_literals.c:114-127, 187-188`).
398    ///
399    /// **Invariant (required of every construction site):** must be
400    /// initialized from the active `CompressionLevel` via
401    /// `StrategyTag::for_compression_level`, and re-synced from the
402    /// active level alongside every `matcher.reset()` call so the
403    /// level-aware gates stay correct after a level change. The two
404    /// reset sites that own this sync are `FrameCompressor::compress`
405    /// and `StreamingEncoder::ensure_frame_started`. There is no
406    /// `Default` impl — production constructors
407    /// (`FrameCompressor::new`, `new_with_matcher`, the streaming
408    /// encoder constructor) plumb this explicitly. Tests that build
409    /// `CompressState` by hand must also supply a value.
410    pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
411}
412
413impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
414    /// Create a new `FrameCompressor`
415    pub fn new(compression_level: CompressionLevel) -> Self {
416        Self {
417            uncompressed_data: None,
418            compressed_data: None,
419            compression_level,
420            dictionary: None,
421            dictionary_entropy_cache: None,
422            source_size_hint: None,
423            state: CompressState {
424                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
425                last_huff_table: None,
426                fse_tables: FseTables::new(),
427                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
428                offset_hist: [1, 4, 8],
429                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
430                    compression_level,
431                ),
432            },
433            #[cfg(feature = "hash")]
434            hasher: XxHash64::with_seed(0),
435        }
436    }
437}
438
439impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
440    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
441    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
442        Self {
443            uncompressed_data: None,
444            compressed_data: None,
445            dictionary: None,
446            dictionary_entropy_cache: None,
447            source_size_hint: None,
448            state: CompressState {
449                matcher,
450                last_huff_table: None,
451                fse_tables: FseTables::new(),
452                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
453                offset_hist: [1, 4, 8],
454                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
455                    compression_level,
456                ),
457            },
458            compression_level,
459            #[cfg(feature = "hash")]
460            hasher: XxHash64::with_seed(0),
461        }
462    }
463
464    /// Before calling [FrameCompressor::compress] you need to set the source.
465    ///
466    /// This is the data that is compressed and written into the drain.
467    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
468        self.uncompressed_data.replace(uncompressed_data)
469    }
470
471    /// Before calling [FrameCompressor::compress] you need to set the drain.
472    ///
473    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
474    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
475        self.compressed_data.replace(compressed_data)
476    }
477
478    /// Provide a hint about the total uncompressed size for the next frame.
479    ///
480    /// When set, the encoder selects smaller hash tables and windows for
481    /// small inputs, matching the C zstd source-size-class behavior.
482    ///
483    /// This hint applies only to frame payload bytes (`size`). Dictionary
484    /// history is primed separately and does not inflate the hinted size or
485    /// advertised frame window.
486    /// Must be called before [`compress`](Self::compress).
487    pub fn set_source_size_hint(&mut self, size: u64) {
488        self.source_size_hint = Some(size);
489    }
490
491    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
492    ///
493    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
494    /// All compressed blocks are buffered in memory so that the frame header can include the
495    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
496    /// entire frame — header, blocks, and optional checksum — is then written to the drain
497    /// at the end. This means peak memory usage is O(compressed_size).
498    ///
499    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
500    /// [Read::take] function
501    pub fn compress(&mut self) {
502        let initial_size_hint = self.source_size_hint;
503        let source_size_hint_known = initial_size_hint.is_some();
504        let use_dictionary_state =
505            !matches!(self.compression_level, CompressionLevel::Uncompressed)
506                && self.state.matcher.supports_dictionary_priming()
507                && self.dictionary.is_some();
508        if let Some(size_hint) = self.source_size_hint.take() {
509            // Keep source-size hint scoped to payload bytes; dictionary priming
510            // is applied separately and should not force larger matcher sizing.
511            self.state.matcher.set_source_size_hint(size_hint);
512        }
513        // Clearing buffers to allow re-using of the compressor
514        self.state.matcher.reset(self.compression_level);
515        self.state.offset_hist = [1, 4, 8];
516        // Sync `state.strategy_tag` to the level resolved at this reset so
517        // the literal-compression gates (`min_literals_to_compress` /
518        // `min_gain` in `encoding::blocks::compressed`) see the correct
519        // strategy for the next frame. Frame-by-frame level changes go
520        // through this same `compress()` entry point, so re-syncing here
521        // covers level switches without touching the matcher dispatch.
522        self.state.strategy_tag =
523            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level);
524        let cached_entropy = if use_dictionary_state {
525            self.dictionary_entropy_cache.as_ref()
526        } else {
527            None
528        };
529        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
530            // This state drives sequence encoding, while matcher priming below updates
531            // the match generator's internal repeat-offset history for match finding.
532            self.state.offset_hist = dict.offset_hist;
533            self.state
534                .matcher
535                .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
536        }
537        if let Some(cache) = cached_entropy {
538            self.state.last_huff_table.clone_from(&cache.huff);
539        } else {
540            self.state.last_huff_table = None;
541        }
542        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
543        // reusing existing allocations where possible instead of reallocating every frame.
544        if let Some(cache) = cached_entropy {
545            self.state
546                .fse_tables
547                .ll_previous
548                .clone_from(&cache.ll_previous);
549            self.state
550                .fse_tables
551                .ml_previous
552                .clone_from(&cache.ml_previous);
553            self.state
554                .fse_tables
555                .of_previous
556                .clone_from(&cache.of_previous);
557        } else {
558            self.state.fse_tables.ll_previous = None;
559            self.state.fse_tables.ml_previous = None;
560            self.state.fse_tables.of_previous = None;
561        }
562        let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
563            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
564            _ => None,
565        });
566        let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
567            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
568            _ => None,
569        });
570        let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
571            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
572            _ => None,
573        });
574        self.state.matcher.seed_dictionary_entropy(
575            self.state.last_huff_table.as_ref(),
576            ll_entropy,
577            ml_entropy,
578            of_entropy,
579        );
580        #[cfg(feature = "hash")]
581        {
582            self.hasher = XxHash64::with_seed(0);
583        }
584        let source = self.uncompressed_data.as_mut().unwrap();
585        let drain = self.compressed_data.as_mut().unwrap();
586        let window_size = self.state.matcher.window_size();
587        assert!(
588            window_size != 0,
589            "matcher reported window_size == 0, which is invalid"
590        );
591        // Accumulate all compressed blocks; the frame header is written
592        // after all input has been read so that Frame_Content_Size is
593        // known. The default seed is one donor block; smaller seeds for
594        // small payloads avoid pinning a full block worth of bytes when
595        // the compressed output fits in a few hundred bytes. For larger
596        // inputs the default seed amortises the first few `Vec::extend`
597        // doublings cheaply and the `peak - default_seed` residue is
598        // dominated by internal `compress_block_encoded` buffers anyway,
599        // so changing it produces no measurable savings.
600        //
601        // Seed-size tiers (mirrors donor `ZSTD_CStreamOutSize` naming):
602        //
603        // * `ALL_BLOCKS_TINY_CAP` — payload ≤ this size, seed equals
604        //   payload bound; ≥ everything compressed output could need
605        //   for a tiny input.
606        // * `ALL_BLOCKS_SMALL_CAP` — small-input seed picked to absorb
607        //   one or two doublings without over-allocating.
608        // * `ALL_BLOCKS_DEFAULT_CAP` — one donor block; the value the
609        //   rest of the encoder is sized around.
610        const ALL_BLOCKS_TINY_THRESHOLD: u64 = 4 * 1024;
611        const ALL_BLOCKS_SMALL_THRESHOLD: u64 = 64 * 1024;
612        const ALL_BLOCKS_TINY_CAP: usize = 4 * 1024;
613        const ALL_BLOCKS_SMALL_CAP: usize = 16 * 1024;
614        const ALL_BLOCKS_DEFAULT_CAP: usize = 130 * 1024;
615        let initial_all_blocks_cap = match initial_size_hint {
616            Some(h) if h <= ALL_BLOCKS_TINY_THRESHOLD => ALL_BLOCKS_TINY_CAP,
617            Some(h) if h <= ALL_BLOCKS_SMALL_THRESHOLD => ALL_BLOCKS_SMALL_CAP,
618            _ => ALL_BLOCKS_DEFAULT_CAP,
619        };
620        let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap);
621        let mut total_uncompressed: u64 = 0;
622        let mut pending_input: Vec<u8> = Vec::new();
623        let mut reached_eof = false;
624        let mut savings = 0i64;
625        // Compress block by block
626        loop {
627            // Read up to one donor block. When the pre-block splitter keeps a
628            // suffix, top it back up before compressing the next block, matching
629            // ZSTD_compress_frameChunk() over a contiguous input buffer.
630            let block_capacity = MAX_BLOCK_SIZE as usize;
631            let had_pending = !pending_input.is_empty();
632            let mut uncompressed_data = if had_pending {
633                core::mem::take(&mut pending_input)
634            } else {
635                self.state.matcher.get_next_space()
636            };
637            let mut filled = if had_pending {
638                uncompressed_data.len()
639            } else {
640                0
641            };
642            if uncompressed_data.len() < block_capacity {
643                uncompressed_data.resize(block_capacity, 0);
644            }
645            'read_loop: loop {
646                if reached_eof || filled == block_capacity {
647                    break 'read_loop;
648                }
649                let new_bytes = source
650                    .read(&mut uncompressed_data[filled..block_capacity])
651                    .unwrap();
652                if new_bytes == 0 {
653                    reached_eof = true;
654                    break 'read_loop;
655                }
656                filled += new_bytes;
657                total_uncompressed += new_bytes as u64;
658            }
659            uncompressed_data.truncate(filled);
660            let mut last_block = reached_eof;
661            let remaining_for_split = if reached_eof {
662                uncompressed_data.len()
663            } else {
664                block_capacity
665            };
666            if !matches!(self.compression_level, CompressionLevel::Uncompressed)
667                && uncompressed_data.len() == block_capacity
668            {
669                let block_len = donor_optimal_block_size(
670                    self.compression_level,
671                    &uncompressed_data,
672                    remaining_for_split,
673                    block_capacity,
674                    savings,
675                );
676                if block_len < uncompressed_data.len() {
677                    pending_input = uncompressed_data.split_off(block_len);
678                    // `split_off` returns a Vec whose capacity is typically
679                    // close to its length. Next iteration's `had_pending`
680                    // branch moves `pending_input` into `uncompressed_data`
681                    // and resizes to `block_capacity`, which would reallocate
682                    // from scratch on every pre-split. Pre-reserve here so
683                    // the resize stays in-place.
684                    if pending_input.capacity() < block_capacity {
685                        pending_input.reserve_exact(block_capacity - pending_input.len());
686                    }
687                    last_block = false;
688                }
689            }
690            // As we read, hash that data too
691            #[cfg(feature = "hash")]
692            self.hasher.write(&uncompressed_data);
693            // Special handling is needed for compression of a totally empty file
694            if uncompressed_data.is_empty() {
695                let header = BlockHeader {
696                    last_block: true,
697                    block_type: crate::blocks::block::BlockType::Raw,
698                    block_size: 0,
699                };
700                header.serialize(&mut all_blocks);
701                break;
702            }
703
704            match self.compression_level {
705                CompressionLevel::Uncompressed => {
706                    let header = BlockHeader {
707                        last_block,
708                        block_type: crate::blocks::block::BlockType::Raw,
709                        block_size: uncompressed_data.len().try_into().unwrap(),
710                    };
711                    header.serialize(&mut all_blocks);
712                    all_blocks.extend_from_slice(&uncompressed_data);
713                    savings +=
714                        uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
715                }
716                CompressionLevel::Fastest
717                | CompressionLevel::Default
718                | CompressionLevel::Better
719                | CompressionLevel::Best
720                | CompressionLevel::Level(_) => {
721                    let before_len = all_blocks.len();
722                    let block_len = uncompressed_data.len();
723                    compress_block_encoded(
724                        &mut self.state,
725                        self.compression_level,
726                        last_block,
727                        uncompressed_data,
728                        &mut all_blocks,
729                    );
730                    savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
731                }
732            }
733            if last_block && pending_input.is_empty() {
734                break;
735            }
736        }
737
738        // Now that total_uncompressed is known, write the frame header with FCS.
739        // Match the donor framing policy for pledged one-shot inputs: use a
740        // single-segment frame whenever the source fits the active window.
741        let single_segment = !use_dictionary_state
742            && source_size_hint_known
743            && total_uncompressed >= 512
744            && total_uncompressed <= window_size;
745        let header = FrameHeader {
746            frame_content_size: Some(total_uncompressed),
747            single_segment,
748            content_checksum: cfg!(feature = "hash"),
749            dictionary_id: if use_dictionary_state {
750                self.dictionary.as_ref().map(|dict| dict.id as u64)
751            } else {
752                None
753            },
754            window_size: if single_segment {
755                None
756            } else {
757                Some(window_size)
758            },
759        };
760        // Write the frame header and compressed blocks separately to avoid
761        // shifting the entire `all_blocks` buffer to prepend the header.
762        let mut header_buf: Vec<u8> = Vec::with_capacity(14);
763        header.serialize(&mut header_buf);
764        drain.write_all(&header_buf).unwrap();
765        drain.write_all(&all_blocks).unwrap();
766
767        // If the `hash` feature is enabled, then `content_checksum` is set to true in the header
768        // and a 32 bit hash is written at the end of the data.
769        #[cfg(feature = "hash")]
770        {
771            // Because we only have the data as a reader, we need to read all of it to calculate the checksum
772            // Possible TODO: create a wrapper around self.uncompressed data that hashes the data as it's read?
773            let content_checksum = self.hasher.finish();
774            drain
775                .write_all(&(content_checksum as u32).to_le_bytes())
776                .unwrap();
777        }
778    }
779
780    /// Get a mutable reference to the source
781    pub fn source_mut(&mut self) -> Option<&mut R> {
782        self.uncompressed_data.as_mut()
783    }
784
785    /// Get a mutable reference to the drain
786    pub fn drain_mut(&mut self) -> Option<&mut W> {
787        self.compressed_data.as_mut()
788    }
789
790    /// Get a reference to the source
791    pub fn source(&self) -> Option<&R> {
792        self.uncompressed_data.as_ref()
793    }
794
795    /// Get a reference to the drain
796    pub fn drain(&self) -> Option<&W> {
797        self.compressed_data.as_ref()
798    }
799
800    /// Retrieve the source
801    pub fn take_source(&mut self) -> Option<R> {
802        self.uncompressed_data.take()
803    }
804
805    /// Retrieve the drain
806    pub fn take_drain(&mut self) -> Option<W> {
807        self.compressed_data.take()
808    }
809
810    /// Before calling [FrameCompressor::compress] you can replace the matcher
811    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
812        core::mem::swap(&mut match_generator, &mut self.state.matcher);
813        match_generator
814    }
815
816    /// Before calling [FrameCompressor::compress] you can replace the compression level
817    pub fn set_compression_level(
818        &mut self,
819        compression_level: CompressionLevel,
820    ) -> CompressionLevel {
821        let old = self.compression_level;
822        self.compression_level = compression_level;
823        old
824    }
825
826    /// Get the current compression level
827    pub fn compression_level(&self) -> CompressionLevel {
828        self.compression_level
829    }
830
831    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
832    ///
833    /// In compressed modes, the dictionary id is written only when the active
834    /// matcher supports dictionary priming.
835    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
836    /// at encode time.
837    pub fn set_dictionary(
838        &mut self,
839        dictionary: crate::decoding::Dictionary,
840    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
841    {
842        if dictionary.id == 0 {
843            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
844        }
845        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
846            return Err(
847                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
848                    index: index as u8,
849                },
850            );
851        }
852        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
853            huff: dictionary.huf.table.to_encoder_table(),
854            ll_previous: dictionary
855                .fse
856                .literal_lengths
857                .to_encoder_table()
858                .map(|table| PreviousFseTable::Custom(Box::new(table))),
859            ml_previous: dictionary
860                .fse
861                .match_lengths
862                .to_encoder_table()
863                .map(|table| PreviousFseTable::Custom(Box::new(table))),
864            of_previous: dictionary
865                .fse
866                .offsets
867                .to_encoder_table()
868                .map(|table| PreviousFseTable::Custom(Box::new(table))),
869        });
870        Ok(self.dictionary.replace(dictionary))
871    }
872
873    /// Parse and attach a serialized dictionary blob.
874    pub fn set_dictionary_from_bytes(
875        &mut self,
876        raw_dictionary: &[u8],
877    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
878    {
879        let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
880        self.set_dictionary(dictionary)
881    }
882
883    /// Remove the attached dictionary.
884    pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
885        self.dictionary_entropy_cache = None;
886        self.dictionary.take()
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    #[cfg(all(feature = "dict_builder", feature = "std"))]
893    use alloc::format;
894    use alloc::vec;
895
896    use super::FrameCompressor;
897    use crate::blocks::block::BlockType;
898    use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
899    use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
900    use crate::encoding::{Matcher, Sequence};
901    use alloc::vec::Vec;
902
903    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
904        let mut state = seed;
905        let mut data = Vec::with_capacity(len);
906        for _ in 0..len {
907            state = state
908                .wrapping_mul(6364136223846793005)
909                .wrapping_add(1442695040888963407);
910            data.push((state >> 33) as u8);
911        }
912        data
913    }
914
915    fn first_block_type(frame: &[u8]) -> BlockType {
916        let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
917        let mut decoder = block_decoder::new();
918        let (header, _) = decoder
919            .read_block_header(&frame[header_size as usize..])
920            .expect("block header should parse");
921        header.block_type
922    }
923
924    /// Frame content size is written correctly and C zstd can decompress the output.
925    #[cfg(feature = "std")]
926    #[test]
927    fn fcs_header_written_and_c_zstd_compatible() {
928        let levels = [
929            crate::encoding::CompressionLevel::Uncompressed,
930            crate::encoding::CompressionLevel::Fastest,
931            crate::encoding::CompressionLevel::Default,
932            crate::encoding::CompressionLevel::Better,
933            crate::encoding::CompressionLevel::Best,
934        ];
935        let fcs_2byte = vec![0xCDu8; 300]; // 300 bytes → 2-byte FCS (256..=65791 range)
936        let large = vec![0xABu8; 100_000];
937        let inputs: [&[u8]; 5] = [
938            &[],
939            &[0x00],
940            b"abcdefghijklmnopqrstuvwxy\n",
941            &fcs_2byte,
942            &large,
943        ];
944        for level in levels {
945            for data in &inputs {
946                let compressed = crate::encoding::compress_to_vec(*data, level);
947                // Verify FCS is present and correct
948                let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
949                    .unwrap()
950                    .0;
951                assert_eq!(
952                    header.frame_content_size(),
953                    data.len() as u64,
954                    "FCS mismatch for len={} level={:?}",
955                    data.len(),
956                    level,
957                );
958                // Confirm the FCS field is actually present in the header
959                // (not just the decoder returning 0 for absent FCS).
960                assert_ne!(
961                    header.descriptor.frame_content_size_bytes().unwrap(),
962                    0,
963                    "FCS field must be present for len={} level={:?}",
964                    data.len(),
965                    level,
966                );
967                // Verify C zstd can decompress
968                let mut decoded = Vec::new();
969                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
970                    |e| {
971                        panic!(
972                            "C zstd decode failed for len={} level={level:?}: {e}",
973                            data.len()
974                        )
975                    },
976                );
977                assert_eq!(
978                    decoded.as_slice(),
979                    *data,
980                    "C zstd roundtrip failed for len={}",
981                    data.len()
982                );
983            }
984        }
985    }
986
987    #[cfg(feature = "std")]
988    #[test]
989    fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
990        let data = vec![0xAB; 2047];
991        let compressed = {
992            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
993            compressor.set_source_size_hint(data.len() as u64);
994            compressor.set_source(data.as_slice());
995            let mut out = Vec::new();
996            compressor.set_drain(&mut out);
997            compressor.compress();
998            out
999        };
1000
1001        let mut decoded = Vec::new();
1002        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1003        assert_eq!(decoded, data);
1004    }
1005
1006    #[cfg(feature = "std")]
1007    #[test]
1008    fn small_hinted_default_frame_uses_single_segment_header() {
1009        let data = generate_data(0xD15E_A5ED, 1024);
1010        let compressed = {
1011            let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
1012            compressor.set_source_size_hint(data.len() as u64);
1013            compressor.set_source(data.as_slice());
1014            let mut out = Vec::new();
1015            compressor.set_drain(&mut out);
1016            compressor.compress();
1017            out
1018        };
1019
1020        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1021        assert!(
1022            frame_header.descriptor.single_segment_flag(),
1023            "small hinted default frames should use single-segment header for Rust/FFI parity"
1024        );
1025        assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1026        let mut decoded = Vec::new();
1027        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1028            .expect("ffi decoder must accept single-segment small hinted default frame");
1029        assert_eq!(decoded, data);
1030    }
1031
1032    #[cfg(feature = "std")]
1033    #[test]
1034    fn small_hinted_numeric_default_levels_use_single_segment_header() {
1035        let data = generate_data(0xA11C_E003, 1024);
1036        for level in [
1037            super::CompressionLevel::Level(0),
1038            super::CompressionLevel::Level(3),
1039        ] {
1040            let compressed = {
1041                let mut compressor = FrameCompressor::new(level);
1042                compressor.set_source_size_hint(data.len() as u64);
1043                compressor.set_source(data.as_slice());
1044                let mut out = Vec::new();
1045                compressor.set_drain(&mut out);
1046                compressor.compress();
1047                out
1048            };
1049
1050            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1051            assert!(
1052                frame_header.descriptor.single_segment_flag(),
1053                "small hinted numeric default level frames should use single-segment header (level={level:?})"
1054            );
1055            assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1056            let mut decoded = Vec::new();
1057            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1058                panic!(
1059                    "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
1060                )
1061            });
1062            assert_eq!(decoded, data);
1063        }
1064    }
1065
1066    #[cfg(feature = "std")]
1067    #[test]
1068    fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
1069        let levels = [
1070            super::CompressionLevel::Fastest,
1071            super::CompressionLevel::Default,
1072            super::CompressionLevel::Better,
1073            super::CompressionLevel::Best,
1074            super::CompressionLevel::Level(-1),
1075            super::CompressionLevel::Level(2),
1076            super::CompressionLevel::Level(3),
1077            super::CompressionLevel::Level(4),
1078            super::CompressionLevel::Level(11),
1079        ];
1080        let sizes = [
1081            511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
1082        ];
1083
1084        for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
1085            for &size in &sizes {
1086                let data = generate_data(seed + seed_idx as u64, size);
1087                for &level in &levels {
1088                    let compressed = {
1089                        let mut compressor = FrameCompressor::new(level);
1090                        compressor.set_source_size_hint(data.len() as u64);
1091                        compressor.set_source(data.as_slice());
1092                        let mut out = Vec::new();
1093                        compressor.set_drain(&mut out);
1094                        compressor.compress();
1095                        out
1096                    };
1097                    if matches!(size, 511 | 512) {
1098                        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1099                        assert_eq!(
1100                            frame_header.descriptor.single_segment_flag(),
1101                            size == 512,
1102                            "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1103                        );
1104                    }
1105
1106                    let mut decoded = Vec::new();
1107                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1108                        |e| {
1109                            panic!(
1110                                "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
1111                                seed + seed_idx as u64
1112                            )
1113                        },
1114                    );
1115                    assert_eq!(
1116                        decoded,
1117                        data,
1118                        "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
1119                        seed + seed_idx as u64
1120                    );
1121                }
1122            }
1123        }
1124    }
1125
1126    #[cfg(feature = "std")]
1127    #[test]
1128    fn hinted_levels_use_single_segment_header_symmetrically() {
1129        let levels = [
1130            super::CompressionLevel::Fastest,
1131            super::CompressionLevel::Default,
1132            super::CompressionLevel::Better,
1133            super::CompressionLevel::Best,
1134            super::CompressionLevel::Level(0),
1135            super::CompressionLevel::Level(2),
1136            super::CompressionLevel::Level(3),
1137            super::CompressionLevel::Level(4),
1138            super::CompressionLevel::Level(11),
1139        ];
1140        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1141            let size = 1024 + seed_idx * 97;
1142            let data = generate_data(seed, size);
1143            for &level in &levels {
1144                let compressed = {
1145                    let mut compressor = FrameCompressor::new(level);
1146                    compressor.set_source_size_hint(data.len() as u64);
1147                    compressor.set_source(data.as_slice());
1148                    let mut out = Vec::new();
1149                    compressor.set_drain(&mut out);
1150                    compressor.compress();
1151                    out
1152                };
1153                let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1154                assert!(
1155                    frame_header.descriptor.single_segment_flag(),
1156                    "hinted frame should be single-segment for level={level:?} size={}",
1157                    data.len()
1158                );
1159                assert_eq!(frame_header.frame_content_size(), data.len() as u64);
1160                let mut decoded = Vec::new();
1161                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
1162                    panic!(
1163                        "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
1164                        data.len()
1165                    )
1166                });
1167                assert_eq!(decoded, data);
1168            }
1169        }
1170    }
1171
1172    #[cfg(feature = "std")]
1173    #[test]
1174    fn hinted_levels_pin_511_512_single_segment_boundary() {
1175        let levels = [
1176            super::CompressionLevel::Fastest,
1177            super::CompressionLevel::Default,
1178            super::CompressionLevel::Better,
1179            super::CompressionLevel::Best,
1180            super::CompressionLevel::Level(0),
1181            super::CompressionLevel::Level(2),
1182            super::CompressionLevel::Level(3),
1183            super::CompressionLevel::Level(4),
1184            super::CompressionLevel::Level(11),
1185        ];
1186        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
1187            for &size in &[511usize, 512] {
1188                let data = generate_data(seed + seed_idx as u64, size);
1189                for &level in &levels {
1190                    let compressed = {
1191                        let mut compressor = FrameCompressor::new(level);
1192                        compressor.set_source_size_hint(data.len() as u64);
1193                        compressor.set_source(data.as_slice());
1194                        let mut out = Vec::new();
1195                        compressor.set_drain(&mut out);
1196                        compressor.compress();
1197                        out
1198                    };
1199                    let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1200                    assert_eq!(
1201                        frame_header.descriptor.single_segment_flag(),
1202                        size == 512,
1203                        "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
1204                    );
1205                    let mut decoded = Vec::new();
1206                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
1207                        |e| {
1208                            panic!(
1209                                "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
1210                                seed + seed_idx as u64
1211                            )
1212                        },
1213                    );
1214                    assert_eq!(decoded, data);
1215                }
1216            }
1217        }
1218    }
1219
1220    #[cfg(feature = "std")]
1221    #[test]
1222    fn fastest_random_block_uses_raw_fast_path() {
1223        let data = generate_data(0xC0FF_EE11, 10 * 1024);
1224        let compressed =
1225            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
1226
1227        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1228
1229        let mut decoded = Vec::new();
1230        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1231        assert_eq!(decoded, data);
1232    }
1233
1234    #[cfg(feature = "std")]
1235    #[test]
1236    fn default_random_block_uses_raw_fast_path() {
1237        let data = generate_data(0xD15E_A5ED, 10 * 1024);
1238        let compressed =
1239            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
1240
1241        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1242
1243        let mut decoded = Vec::new();
1244        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1245        assert_eq!(decoded, data);
1246    }
1247
1248    #[cfg(feature = "std")]
1249    #[test]
1250    fn best_random_block_uses_raw_fast_path() {
1251        let data = generate_data(0xB35C_AFE1, 10 * 1024);
1252        let compressed =
1253            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
1254
1255        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1256
1257        let mut decoded = Vec::new();
1258        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1259        assert_eq!(decoded, data);
1260    }
1261
1262    #[cfg(feature = "std")]
1263    #[test]
1264    fn level2_random_block_uses_raw_fast_path() {
1265        let data = generate_data(0xA11C_E222, 10 * 1024);
1266        let compressed =
1267            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
1268
1269        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1270
1271        let mut decoded = Vec::new();
1272        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1273        assert_eq!(decoded, data);
1274    }
1275
1276    #[cfg(feature = "std")]
1277    #[test]
1278    fn better_random_block_uses_raw_fast_path() {
1279        let data = generate_data(0xBE77_E111, 10 * 1024);
1280        let compressed =
1281            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
1282
1283        assert_eq!(first_block_type(&compressed), BlockType::Raw);
1284
1285        let mut decoded = Vec::new();
1286        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1287        assert_eq!(decoded, data);
1288    }
1289
1290    #[cfg(feature = "std")]
1291    #[test]
1292    fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
1293        let mut data = Vec::with_capacity(16 * 1024);
1294        const LINE: &[u8] =
1295            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1296        while data.len() < 16 * 1024 {
1297            let remaining = 16 * 1024 - data.len();
1298            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1299        }
1300
1301        fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
1302            let compressed = crate::encoding::compress_to_vec(data, level);
1303            assert_ne!(first_block_type(&compressed), BlockType::Raw);
1304            assert!(
1305                compressed.len() < data.len(),
1306                "compressible input should remain compressible for level={level:?}"
1307            );
1308            let mut decoded = Vec::new();
1309            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1310            assert_eq!(decoded, data);
1311        }
1312
1313        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
1314        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
1315        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
1316        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
1317        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
1318    }
1319
1320    #[cfg(feature = "std")]
1321    #[test]
1322    fn hinted_small_compressible_frames_use_single_segment_across_levels() {
1323        let mut data = Vec::with_capacity(4 * 1024);
1324        const LINE: &[u8] =
1325            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
1326        while data.len() < 4 * 1024 {
1327            let remaining = 4 * 1024 - data.len();
1328            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
1329        }
1330
1331        for level in [
1332            super::CompressionLevel::Fastest,
1333            super::CompressionLevel::Default,
1334            super::CompressionLevel::Better,
1335            super::CompressionLevel::Best,
1336            super::CompressionLevel::Level(0),
1337            super::CompressionLevel::Level(3),
1338            super::CompressionLevel::Level(4),
1339            super::CompressionLevel::Level(11),
1340        ] {
1341            let compressed = {
1342                let mut compressor = FrameCompressor::new(level);
1343                compressor.set_source_size_hint(data.len() as u64);
1344                compressor.set_source(data.as_slice());
1345                let mut out = Vec::new();
1346                compressor.set_drain(&mut out);
1347                compressor.compress();
1348                out
1349            };
1350            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
1351            assert!(
1352                frame_header.descriptor.single_segment_flag(),
1353                "hinted small compressible frame should use single-segment (level={level:?})"
1354            );
1355            assert_ne!(
1356                first_block_type(&compressed),
1357                BlockType::Raw,
1358                "compressible hinted frame should stay off raw fast path (level={level:?})"
1359            );
1360            assert!(
1361                compressed.len() < data.len(),
1362                "compressible hinted frame should still shrink (level={level:?})"
1363            );
1364            let mut decoded = Vec::new();
1365            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
1366                .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
1367            assert_eq!(decoded, data);
1368        }
1369    }
1370
1371    struct NoDictionaryMatcher {
1372        last_space: Vec<u8>,
1373        window_size: u64,
1374    }
1375
1376    impl NoDictionaryMatcher {
1377        fn new(window_size: u64) -> Self {
1378            Self {
1379                last_space: Vec::new(),
1380                window_size,
1381            }
1382        }
1383    }
1384
1385    impl Matcher for NoDictionaryMatcher {
1386        fn get_next_space(&mut self) -> Vec<u8> {
1387            vec![0; self.window_size as usize]
1388        }
1389
1390        fn get_last_space(&mut self) -> &[u8] {
1391            self.last_space.as_slice()
1392        }
1393
1394        fn commit_space(&mut self, space: Vec<u8>) {
1395            self.last_space = space;
1396        }
1397
1398        fn skip_matching(&mut self) {}
1399
1400        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1401            handle_sequence(Sequence::Literals {
1402                literals: self.last_space.as_slice(),
1403            });
1404        }
1405
1406        fn reset(&mut self, _level: super::CompressionLevel) {
1407            self.last_space.clear();
1408        }
1409
1410        fn window_size(&self) -> u64 {
1411            self.window_size
1412        }
1413    }
1414
1415    #[test]
1416    fn frame_starts_with_magic_num() {
1417        let mock_data = [1_u8, 2, 3].as_slice();
1418        let mut output: Vec<u8> = Vec::new();
1419        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1420        compressor.set_source(mock_data);
1421        compressor.set_drain(&mut output);
1422
1423        compressor.compress();
1424        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1425    }
1426
1427    #[test]
1428    fn very_simple_raw_compress() {
1429        let mock_data = [1_u8, 2, 3].as_slice();
1430        let mut output: Vec<u8> = Vec::new();
1431        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1432        compressor.set_source(mock_data);
1433        compressor.set_drain(&mut output);
1434
1435        compressor.compress();
1436    }
1437
1438    #[test]
1439    fn very_simple_compress() {
1440        let mut mock_data = vec![0; 1 << 17];
1441        mock_data.extend(vec![1; (1 << 17) - 1]);
1442        mock_data.extend(vec![2; (1 << 18) - 1]);
1443        mock_data.extend(vec![2; 1 << 17]);
1444        mock_data.extend(vec![3; (1 << 17) - 1]);
1445        let mut output: Vec<u8> = Vec::new();
1446        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1447        compressor.set_source(mock_data.as_slice());
1448        compressor.set_drain(&mut output);
1449
1450        compressor.compress();
1451
1452        let mut decoder = FrameDecoder::new();
1453        let mut decoded = Vec::with_capacity(mock_data.len());
1454        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1455        assert_eq!(mock_data, decoded);
1456
1457        let mut decoded = Vec::new();
1458        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1459        assert_eq!(mock_data, decoded);
1460    }
1461
1462    #[test]
1463    fn rle_compress() {
1464        let mock_data = vec![0; 1 << 19];
1465        let mut output: Vec<u8> = Vec::new();
1466        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1467        compressor.set_source(mock_data.as_slice());
1468        compressor.set_drain(&mut output);
1469
1470        compressor.compress();
1471
1472        let mut decoder = FrameDecoder::new();
1473        let mut decoded = Vec::with_capacity(mock_data.len());
1474        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1475        assert_eq!(mock_data, decoded);
1476    }
1477
1478    #[test]
1479    fn aaa_compress() {
1480        let mock_data = vec![0, 1, 3, 4, 5];
1481        let mut output: Vec<u8> = Vec::new();
1482        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1483        compressor.set_source(mock_data.as_slice());
1484        compressor.set_drain(&mut output);
1485
1486        compressor.compress();
1487
1488        let mut decoder = FrameDecoder::new();
1489        let mut decoded = Vec::with_capacity(mock_data.len());
1490        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1491        assert_eq!(mock_data, decoded);
1492
1493        let mut decoded = Vec::new();
1494        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1495        assert_eq!(mock_data, decoded);
1496    }
1497
1498    #[test]
1499    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1500        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1501        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1502        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1503
1504        let mut data = Vec::new();
1505        for _ in 0..8 {
1506            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1507        }
1508
1509        let mut with_dict = Vec::new();
1510        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1511        let previous = compressor
1512            .set_dictionary_from_bytes(dict_raw)
1513            .expect("dictionary bytes should parse");
1514        assert!(
1515            previous.is_none(),
1516            "first dictionary insert should return None"
1517        );
1518        assert_eq!(
1519            compressor
1520                .set_dictionary(dict_for_encoder)
1521                .expect("valid dictionary should attach")
1522                .expect("set_dictionary_from_bytes inserted previous dictionary")
1523                .id,
1524            dict_for_decoder.id
1525        );
1526        compressor.set_source(data.as_slice());
1527        compressor.set_drain(&mut with_dict);
1528        compressor.compress();
1529
1530        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1531            .expect("encoded stream should have a frame header");
1532        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1533
1534        let mut decoder = FrameDecoder::new();
1535        let mut missing_dict_target = Vec::with_capacity(data.len());
1536        let err = decoder
1537            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1538            .unwrap_err();
1539        assert!(
1540            matches!(
1541                &err,
1542                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1543            ),
1544            "dict-compressed stream should require dictionary id, got: {err:?}"
1545        );
1546
1547        let mut decoder = FrameDecoder::new();
1548        decoder.add_dict(dict_for_decoder).unwrap();
1549        let mut decoded = Vec::with_capacity(data.len());
1550        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1551        assert_eq!(decoded, data);
1552
1553        let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1554        let mut ffi_decoded = Vec::with_capacity(data.len());
1555        let ffi_written = ffi_decoder
1556            .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1557            .unwrap();
1558        assert_eq!(ffi_written, data.len());
1559        assert_eq!(ffi_decoded, data);
1560    }
1561
1562    #[cfg(all(feature = "dict_builder", feature = "std"))]
1563    #[test]
1564    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1565        use std::io::Cursor;
1566
1567        let mut training = Vec::new();
1568        for idx in 0..256u32 {
1569            training.extend_from_slice(
1570                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1571            );
1572        }
1573        let mut raw_dict = Vec::new();
1574        crate::dictionary::create_raw_dict_from_source(
1575            Cursor::new(training.as_slice()),
1576            training.len(),
1577            &mut raw_dict,
1578            4096,
1579        )
1580        .expect("dict_builder training should succeed");
1581        assert!(
1582            !raw_dict.is_empty(),
1583            "dict_builder produced an empty dictionary"
1584        );
1585
1586        let dict_id = 0xD1C7_0008;
1587        let encoder_dict =
1588            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1589        let decoder_dict =
1590            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1591
1592        let mut payload = Vec::new();
1593        for idx in 0..96u32 {
1594            payload.extend_from_slice(
1595                format!(
1596                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1597                )
1598                .as_bytes(),
1599            );
1600        }
1601
1602        let mut without_dict = Vec::new();
1603        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1604        baseline.set_source(payload.as_slice());
1605        baseline.set_drain(&mut without_dict);
1606        baseline.compress();
1607
1608        let mut with_dict = Vec::new();
1609        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1610        compressor
1611            .set_dictionary(encoder_dict)
1612            .expect("valid dict_builder dictionary should attach");
1613        compressor.set_source(payload.as_slice());
1614        compressor.set_drain(&mut with_dict);
1615        compressor.compress();
1616
1617        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1618            .expect("encoded stream should have a frame header");
1619        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1620        let mut decoder = FrameDecoder::new();
1621        decoder.add_dict(decoder_dict).unwrap();
1622        let mut decoded = Vec::with_capacity(payload.len());
1623        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1624        assert_eq!(decoded, payload);
1625        assert!(
1626            with_dict.len() < without_dict.len(),
1627            "trained dictionary should improve compression for this small payload"
1628        );
1629    }
1630
1631    #[test]
1632    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1633        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1634        let mut output = Vec::new();
1635        let input = b"";
1636
1637        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1638        let previous = compressor
1639            .set_dictionary_from_bytes(dict_raw)
1640            .expect("dictionary bytes should parse");
1641        assert!(previous.is_none());
1642
1643        compressor.set_source(input.as_slice());
1644        compressor.set_drain(&mut output);
1645        compressor.compress();
1646
1647        assert!(
1648            compressor.state.last_huff_table.is_some(),
1649            "dictionary entropy should seed previous huffman table before first block"
1650        );
1651        assert!(
1652            compressor.state.fse_tables.ll_previous.is_some(),
1653            "dictionary entropy should seed previous ll table before first block"
1654        );
1655        assert!(
1656            compressor.state.fse_tables.ml_previous.is_some(),
1657            "dictionary entropy should seed previous ml table before first block"
1658        );
1659        assert!(
1660            compressor.state.fse_tables.of_previous.is_some(),
1661            "dictionary entropy should seed previous of table before first block"
1662        );
1663    }
1664
1665    #[test]
1666    fn set_dictionary_rejects_zero_dictionary_id() {
1667        let invalid = crate::decoding::Dictionary {
1668            id: 0,
1669            fse: crate::decoding::scratch::FSEScratch::new(),
1670            huf: crate::decoding::scratch::HuffmanScratch::new(),
1671            dict_content: vec![1, 2, 3],
1672            offset_hist: [1, 4, 8],
1673        };
1674
1675        let mut compressor: FrameCompressor<
1676            &[u8],
1677            Vec<u8>,
1678            crate::encoding::match_generator::MatchGeneratorDriver,
1679        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1680        let result = compressor.set_dictionary(invalid);
1681        assert!(matches!(
1682            result,
1683            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1684        ));
1685    }
1686
1687    #[test]
1688    fn set_dictionary_rejects_zero_repeat_offsets() {
1689        let invalid = crate::decoding::Dictionary {
1690            id: 1,
1691            fse: crate::decoding::scratch::FSEScratch::new(),
1692            huf: crate::decoding::scratch::HuffmanScratch::new(),
1693            dict_content: vec![1, 2, 3],
1694            offset_hist: [0, 4, 8],
1695        };
1696
1697        let mut compressor: FrameCompressor<
1698            &[u8],
1699            Vec<u8>,
1700            crate::encoding::match_generator::MatchGeneratorDriver,
1701        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1702        let result = compressor.set_dictionary(invalid);
1703        assert!(matches!(
1704            result,
1705            Err(
1706                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1707                    index: 0
1708                }
1709            )
1710        ));
1711    }
1712
1713    #[test]
1714    fn uncompressed_mode_does_not_require_dictionary() {
1715        let dict_id = 0xABCD_0001;
1716        let dict =
1717            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1718                .expect("raw dictionary should be valid");
1719
1720        let payload = b"plain-bytes-that-should-stay-raw";
1721        let mut output = Vec::new();
1722        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1723        compressor
1724            .set_dictionary(dict)
1725            .expect("dictionary should attach in uncompressed mode");
1726        compressor.set_source(payload.as_slice());
1727        compressor.set_drain(&mut output);
1728        compressor.compress();
1729
1730        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1731            .expect("encoded frame should have a header");
1732        assert_eq!(
1733            frame_header.dictionary_id(),
1734            None,
1735            "raw/uncompressed frames must not advertise dictionary dependency"
1736        );
1737
1738        let mut decoder = FrameDecoder::new();
1739        let mut decoded = Vec::with_capacity(payload.len());
1740        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1741        assert_eq!(decoded, payload);
1742    }
1743
1744    #[test]
1745    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
1746        use crate::encoding::match_generator::MatchGeneratorDriver;
1747
1748        let dict_id = 0xABCD_0002;
1749        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1750            .expect("raw dictionary should be valid");
1751        let dict_for_decoder =
1752            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1753                .expect("raw dictionary should be valid");
1754
1755        // Payload must exceed the encoder's advertised window (512 KiB
1756        // for Fastest after `window_log = 19` alignment with donor's
1757        // L1 fast row in `clevels.h`) so the test actually exercises
1758        // cross-window-boundary behavior.
1759        let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
1760        let matcher = MatchGeneratorDriver::new(1024, 1);
1761
1762        let mut no_dict_output = Vec::new();
1763        let mut no_dict_compressor =
1764            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1765        no_dict_compressor.set_source(payload.as_slice());
1766        no_dict_compressor.set_drain(&mut no_dict_output);
1767        no_dict_compressor.compress();
1768        let (no_dict_frame_header, _) =
1769            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
1770                .expect("baseline frame should have a header");
1771        let no_dict_window = no_dict_frame_header
1772            .window_size()
1773            .expect("window size should be present");
1774
1775        let mut output = Vec::new();
1776        let matcher = MatchGeneratorDriver::new(1024, 1);
1777        let mut compressor =
1778            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1779        compressor
1780            .set_dictionary(dict)
1781            .expect("dictionary should attach");
1782        compressor.set_source(payload.as_slice());
1783        compressor.set_drain(&mut output);
1784        compressor.compress();
1785
1786        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1787            .expect("encoded frame should have a header");
1788        let advertised_window = frame_header
1789            .window_size()
1790            .expect("window size should be present");
1791        assert_eq!(
1792            advertised_window, no_dict_window,
1793            "dictionary priming must not inflate advertised window size"
1794        );
1795        assert!(
1796            payload.len() > advertised_window as usize,
1797            "test must cross the advertised window boundary"
1798        );
1799
1800        let mut decoder = FrameDecoder::new();
1801        decoder.add_dict(dict_for_decoder).unwrap();
1802        let mut decoded = Vec::with_capacity(payload.len());
1803        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1804        assert_eq!(decoded, payload);
1805    }
1806
1807    #[test]
1808    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
1809        let dict_id = 0xABCD_0004;
1810        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1811        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1812        let dict_for_decoder =
1813            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1814        let payload = b"abcdabcdabcdabcd".repeat(128);
1815
1816        let mut hinted_output = Vec::new();
1817        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1818        hinted.set_dictionary(dict).unwrap();
1819        hinted.set_source_size_hint(1);
1820        hinted.set_source(payload.as_slice());
1821        hinted.set_drain(&mut hinted_output);
1822        hinted.compress();
1823
1824        let mut no_hint_output = Vec::new();
1825        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1826        no_hint
1827            .set_dictionary(
1828                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1829                    .unwrap(),
1830            )
1831            .unwrap();
1832        no_hint.set_source(payload.as_slice());
1833        no_hint.set_drain(&mut no_hint_output);
1834        no_hint.compress();
1835
1836        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1837            .expect("encoded frame should have a header")
1838            .0
1839            .window_size()
1840            .expect("window size should be present");
1841        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1842            .expect("encoded frame should have a header")
1843            .0
1844            .window_size()
1845            .expect("window size should be present");
1846        assert!(
1847            hinted_window <= no_hint_window,
1848            "source-size hint should not increase advertised window with dictionary priming",
1849        );
1850
1851        let mut decoder = FrameDecoder::new();
1852        decoder.add_dict(dict_for_decoder).unwrap();
1853        let mut decoded = Vec::with_capacity(payload.len());
1854        decoder
1855            .decode_all_to_vec(&hinted_output, &mut decoded)
1856            .unwrap();
1857        assert_eq!(decoded, payload);
1858    }
1859
1860    #[test]
1861    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
1862        let dict_id = 0xABCD_0005;
1863        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1864        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1865        let dict_for_decoder =
1866            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1867        let payload = b"abcd".repeat(1024); // 4 KiB payload
1868        let payload_len = payload.len() as u64;
1869
1870        let mut hinted_output = Vec::new();
1871        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1872        hinted.set_dictionary(dict).unwrap();
1873        hinted.set_source_size_hint(payload_len);
1874        hinted.set_source(payload.as_slice());
1875        hinted.set_drain(&mut hinted_output);
1876        hinted.compress();
1877
1878        let mut no_hint_output = Vec::new();
1879        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1880        no_hint
1881            .set_dictionary(
1882                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1883                    .unwrap(),
1884            )
1885            .unwrap();
1886        no_hint.set_source(payload.as_slice());
1887        no_hint.set_drain(&mut no_hint_output);
1888        no_hint.compress();
1889
1890        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1891            .expect("encoded frame should have a header")
1892            .0
1893            .window_size()
1894            .expect("window size should be present");
1895        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1896            .expect("encoded frame should have a header")
1897            .0
1898            .window_size()
1899            .expect("window size should be present");
1900        assert!(
1901            hinted_window <= no_hint_window,
1902            "source-size hint should not increase advertised window with dictionary priming",
1903        );
1904
1905        let mut decoder = FrameDecoder::new();
1906        decoder.add_dict(dict_for_decoder).unwrap();
1907        let mut decoded = Vec::with_capacity(payload.len());
1908        decoder
1909            .decode_all_to_vec(&hinted_output, &mut decoded)
1910            .unwrap();
1911        assert_eq!(decoded, payload);
1912    }
1913
1914    #[test]
1915    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
1916        let dict_id = 0xABCD_0003;
1917        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1918            .expect("raw dictionary should be valid");
1919        let payload = b"abcdefghabcdefgh";
1920
1921        let mut output = Vec::new();
1922        let matcher = NoDictionaryMatcher::new(64);
1923        let mut compressor =
1924            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1925        compressor
1926            .set_dictionary(dict)
1927            .expect("dictionary should attach");
1928        compressor.set_source(payload.as_slice());
1929        compressor.set_drain(&mut output);
1930        compressor.compress();
1931
1932        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1933            .expect("encoded frame should have a header");
1934        assert_eq!(
1935            frame_header.dictionary_id(),
1936            None,
1937            "matchers that do not support dictionary priming must not advertise dictionary dependency"
1938        );
1939
1940        let mut decoder = FrameDecoder::new();
1941        let mut decoded = Vec::with_capacity(payload.len());
1942        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1943        assert_eq!(decoded, payload);
1944    }
1945
1946    #[cfg(feature = "hash")]
1947    #[test]
1948    fn checksum_two_frames_reused_compressor() {
1949        // Compress the same data twice using the same compressor and verify that:
1950        // 1. The checksum written in each frame matches what the decoder calculates.
1951        // 2. The hasher is correctly reset between frames (no cross-contamination).
1952        //    If the hasher were NOT reset, the second frame's calculated checksum
1953        //    would differ from the one stored in the frame data, causing assert_eq to fail.
1954        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
1955
1956        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1957
1958        // --- Frame 1 ---
1959        let mut compressed1 = Vec::new();
1960        compressor.set_source(data.as_slice());
1961        compressor.set_drain(&mut compressed1);
1962        compressor.compress();
1963
1964        // --- Frame 2 (reuse the same compressor) ---
1965        let mut compressed2 = Vec::new();
1966        compressor.set_source(data.as_slice());
1967        compressor.set_drain(&mut compressed2);
1968        compressor.compress();
1969
1970        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
1971            let mut decoder = FrameDecoder::new();
1972            let mut source = compressed;
1973            decoder.reset(&mut source).unwrap();
1974            while !decoder.is_finished() {
1975                decoder
1976                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
1977                    .unwrap();
1978            }
1979            let mut decoded = Vec::new();
1980            decoder.collect_to_writer(&mut decoded).unwrap();
1981            (
1982                decoded,
1983                decoder.get_checksum_from_data(),
1984                decoder.get_calculated_checksum(),
1985            )
1986        }
1987
1988        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
1989        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
1990        assert_eq!(
1991            chksum_from_data1, chksum_calculated1,
1992            "frame 1: checksum mismatch"
1993        );
1994
1995        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
1996        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
1997        assert_eq!(
1998            chksum_from_data2, chksum_calculated2,
1999            "frame 2: checksum mismatch"
2000        );
2001
2002        // Same data compressed twice must produce the same checksum.
2003        // If state leaked across frames, the second calculated checksum would differ.
2004        assert_eq!(
2005            chksum_from_data1, chksum_from_data2,
2006            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
2007        );
2008    }
2009
2010    #[cfg(feature = "std")]
2011    #[test]
2012    fn fuzz_targets() {
2013        use std::io::Read;
2014        fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
2015            let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
2016            let mut result: Vec<u8> = Vec::new();
2017            decoder.read_to_end(&mut result).expect("Decoding failed");
2018            result
2019        }
2020
2021        fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
2022            let mut decoder = crate::decoding::FrameDecoder::new();
2023            decoder.reset(&mut data).unwrap();
2024            let mut result = vec![];
2025            while !decoder.is_finished() || decoder.can_collect() > 0 {
2026                decoder
2027                    .decode_blocks(
2028                        &mut data,
2029                        crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
2030                    )
2031                    .unwrap();
2032                decoder.collect_to_writer(&mut result).unwrap();
2033            }
2034            result
2035        }
2036
2037        fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2038            zstd::stream::encode_all(std::io::Cursor::new(data), 3)
2039        }
2040
2041        fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2042            let mut input = Vec::new();
2043            data.read_to_end(&mut input).unwrap();
2044
2045            crate::encoding::compress_to_vec(
2046                input.as_slice(),
2047                crate::encoding::CompressionLevel::Uncompressed,
2048            )
2049        }
2050
2051        fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
2052            let mut input = Vec::new();
2053            data.read_to_end(&mut input).unwrap();
2054
2055            crate::encoding::compress_to_vec(
2056                input.as_slice(),
2057                crate::encoding::CompressionLevel::Fastest,
2058            )
2059        }
2060
2061        fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
2062            let mut output = Vec::new();
2063            zstd::stream::copy_decode(data, &mut output)?;
2064            Ok(output)
2065        }
2066        if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
2067            for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
2068                if file.as_ref().unwrap().file_type().unwrap().is_file() {
2069                    let data = std::fs::read(file.unwrap().path()).unwrap();
2070                    let data = data.as_slice();
2071                    // Decoding
2072                    let compressed = encode_zstd(data).unwrap();
2073                    let decoded = decode_szstd(&mut compressed.as_slice());
2074                    let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
2075                    assert!(
2076                        decoded == data,
2077                        "Decoded data did not match the original input during decompression"
2078                    );
2079                    assert_eq!(
2080                        decoded2, data,
2081                        "Decoded data did not match the original input during decompression"
2082                    );
2083
2084                    // Encoding
2085                    // Uncompressed encoding
2086                    let mut input = data;
2087                    let compressed = encode_szstd_uncompressed(&mut input);
2088                    let decoded = decode_zstd(&compressed).unwrap();
2089                    assert_eq!(
2090                        decoded, data,
2091                        "Decoded data did not match the original input during compression"
2092                    );
2093                    // Compressed encoding
2094                    let mut input = data;
2095                    let compressed = encode_szstd_compressed(&mut input);
2096                    let decoded = decode_zstd(&compressed).unwrap();
2097                    assert_eq!(
2098                        decoded, data,
2099                        "Decoded data did not match the original input during compression"
2100                    );
2101                }
2102            }
2103        }
2104    }
2105
2106    /// Homogeneous input — every byte the same — must NOT be split:
2107    /// both border histograms are identical (all 512 hits on a single
2108    /// slot), so `presplit_fingerprints_differ` returns `false` and the
2109    /// function takes the early-return path at
2110    /// `zstd_preSplit.c:214` returning `blockSize`.
2111    #[test]
2112    fn donor_split_block_from_borders_keeps_homogeneous_block() {
2113        let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
2114        let split = super::donor_split_block_from_borders(&block);
2115        assert_eq!(split, MAX_BLOCK_SIZE as usize);
2116    }
2117
2118    /// Heterogeneous input — first half all zeros, second half a
2119    /// counter sequence — has clearly distinguishable border
2120    /// histograms, so the borders heuristic decides to split.
2121    ///
2122    /// The transition sits at exactly the block midpoint, so the
2123    /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
2124    /// zeros + half counter values. That makes it roughly
2125    /// equidistant from both border fingerprints — the
2126    /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
2127    /// branch fires and the heuristic returns the midpoint (64 KiB)
2128    /// per `zstd_preSplit.c:222`. The test asserts the exact value
2129    /// rather than just "one of {32K, 64K, 96K}" so a regression
2130    /// to a different quantised arm cannot silently slip through.
2131    #[test]
2132    fn donor_split_block_from_borders_returns_midpoint_for_centred_transition() {
2133        let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
2134        for (i, byte) in block
2135            .iter_mut()
2136            .enumerate()
2137            .skip(MAX_BLOCK_SIZE as usize / 2)
2138        {
2139            *byte = (i % 251 + 1) as u8;
2140        }
2141        let split = super::donor_split_block_from_borders(&block);
2142        assert_eq!(
2143            split,
2144            64 * 1024,
2145            "centred-transition fixture must take the symmetric \
2146             midpoint arm (`abs_diff < min_distance`), got {split}"
2147        );
2148    }
2149
2150    /// `donor_pre_split_level` maps mid-range levels to the cheap
2151    /// borders heuristic and high levels to the byChunks path. Levels
2152    /// below 11 stay unsplit so the splitter never runs on fast /
2153    /// default presets where its per-block cost would dominate.
2154    #[test]
2155    fn donor_pre_split_level_dispatches_by_compression_level() {
2156        use crate::encoding::CompressionLevel;
2157        assert_eq!(
2158            super::donor_pre_split_level(CompressionLevel::Fastest),
2159            None
2160        );
2161        assert_eq!(
2162            super::donor_pre_split_level(CompressionLevel::Default),
2163            None
2164        );
2165        assert_eq!(super::donor_pre_split_level(CompressionLevel::Better), None);
2166        assert_eq!(
2167            super::donor_pre_split_level(CompressionLevel::Level(7)),
2168            None
2169        );
2170        assert_eq!(
2171            super::donor_pre_split_level(CompressionLevel::Level(11)),
2172            Some(0)
2173        );
2174        assert_eq!(
2175            super::donor_pre_split_level(CompressionLevel::Level(15)),
2176            Some(0)
2177        );
2178        assert_eq!(
2179            super::donor_pre_split_level(CompressionLevel::Level(16)),
2180            Some(4)
2181        );
2182        assert_eq!(
2183            super::donor_pre_split_level(CompressionLevel::Level(22)),
2184            Some(4)
2185        );
2186    }
2187
2188    /// End-to-end: a 256 KB heterogeneous payload compressed at
2189    /// Level(13) (borders heuristic active) round-trips through the
2190    /// crate's own decoder. The pre-split path runs over the first
2191    /// 128 KB block and emits two consecutive sub-blocks; the second
2192    /// 128 KB block goes through the splitter on its own. The test
2193    /// proves the split decisions do not corrupt the frame bitstream.
2194    #[test]
2195    fn level_13_borders_split_roundtrips_through_own_decoder() {
2196        use crate::encoding::CompressionLevel;
2197        let mut data = vec![0u8; 256 * 1024];
2198        // First 128 KB: low-entropy repeating run; second 128 KB:
2199        // counter sequence — clearly distinct border histograms.
2200        for (i, byte) in data.iter_mut().enumerate() {
2201            *byte = if i < 128 * 1024 {
2202                (i & 0x07) as u8
2203            } else {
2204                (i % 251 + 1) as u8
2205            };
2206        }
2207
2208        let mut compressed = Vec::new();
2209        let mut compressor = FrameCompressor::new(CompressionLevel::Level(13));
2210        compressor.set_source(data.as_slice());
2211        compressor.set_drain(&mut compressed);
2212        compressor.compress();
2213
2214        let mut decoder = FrameDecoder::new();
2215        let mut source = compressed.as_slice();
2216        decoder
2217            .reset(&mut source)
2218            .expect("frame header should parse");
2219        while !decoder.is_finished() {
2220            decoder
2221                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
2222                .expect("decode should succeed");
2223        }
2224        let mut decoded = Vec::with_capacity(data.len());
2225        decoder.collect_to_writer(&mut decoded).unwrap();
2226        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
2227    }
2228
2229    /// Regression: `set_compression_level` followed by `compress()` must
2230    /// refresh `state.strategy_tag` through the reset-time sync so the
2231    /// literal-compression gates (`min_literals_to_compress`,
2232    /// `min_gain`) use the NEW level's strategy. Picks a level pair
2233    /// that genuinely crosses strategy bands — `Fastest` resolves to
2234    /// `Fast`, `Level(20)` resolves to `BtUltra2` — so a missed sync
2235    /// would leave the construction-time tag visible and trip the
2236    /// assertion. `CompressionLevel::Best` would also pass type-wise
2237    /// but resolves to `Lazy` today, which keeps `min_literals_to_compress`
2238    /// in the same `shift=3 → 64-byte` band as `Fast` and weakens the
2239    /// signal that the gate floor actually moved.
2240    #[cfg(feature = "std")]
2241    #[test]
2242    fn set_compression_level_then_compress_refreshes_strategy_tag() {
2243        use super::CompressionLevel;
2244        use crate::encoding::strategy::StrategyTag;
2245
2246        let data = vec![0xABu8; 256];
2247        let mut out = Vec::new();
2248        let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
2249        let initial_tag = compressor.state.strategy_tag;
2250        assert_eq!(
2251            initial_tag,
2252            StrategyTag::for_compression_level(CompressionLevel::Fastest),
2253            "construction-time strategy_tag must reflect initial level",
2254        );
2255
2256        // Switch to a level whose resolved strategy lives in a different
2257        // band, then run a full compress cycle — the matcher.reset()
2258        // inside `compress` is the only site that can refresh the tag.
2259        let new_level = CompressionLevel::Level(20);
2260        compressor.set_compression_level(new_level);
2261        compressor.set_source(data.as_slice());
2262        compressor.set_drain(&mut out);
2263        compressor.compress();
2264
2265        let new_tag = compressor.state.strategy_tag;
2266        let expected = StrategyTag::for_compression_level(new_level);
2267        assert_eq!(
2268            new_tag, expected,
2269            "strategy_tag must follow set_compression_level → compress, \
2270             got {new_tag:?} expected {expected:?}",
2271        );
2272        assert_eq!(
2273            expected,
2274            StrategyTag::BtUltra2,
2275            "test fixture invariant: Level(20) must resolve to BtUltra2 \
2276             so the post-switch tag visibly crosses the band boundary",
2277        );
2278        assert_ne!(
2279            new_tag, initial_tag,
2280            "test fixture invariant: chosen levels must resolve to \
2281             different StrategyTag variants",
2282        );
2283    }
2284}