Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::vec::Vec;
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// A dictionary prepared for the ENCODER side, analogous to zstd's `CDict`
21/// (vs the decoder's [`Dictionary`](crate::decoding::Dictionary) / `DDict`).
22///
23/// It carries the entropy tables, content, and repeat-offset history the
24/// compressor needs, but is a distinct type with **no decode path**: there is
25/// no way to turn it into a [`DictionaryHandle`](crate::decoding::DictionaryHandle)
26/// or feed it to a [`FrameDecoder`](crate::decoding::FrameDecoder). That keeps
27/// the compress-only state (which may have been parsed without building the
28/// decode lookup tables, see
29/// [`set_dictionary_from_bytes`](FrameCompressor::set_dictionary_from_bytes))
30/// from ever reaching the decode side — the encoder/decoder dictionary split
31/// mirrors C zstd's `CDict` / `DDict`.
32#[derive(Clone)]
33pub struct EncoderDictionary {
34    pub(crate) inner: crate::decoding::Dictionary,
35}
36
37impl EncoderDictionary {
38    /// Wrap an already-parsed [`Dictionary`](crate::decoding::Dictionary) for
39    /// encoder use. A fully-decoded dictionary is valid here; only the encoder
40    /// entropy tables, content, and offset history are read.
41    pub fn from_dictionary(dictionary: crate::decoding::Dictionary) -> Self {
42        Self { inner: dictionary }
43    }
44
45    /// Parse a serialized dictionary blob for encoder use, skipping the decode
46    /// lookup-table build the encoder never reads (see
47    /// `Dictionary::decode_dict_for_encoding`). The encoder entropy tables — and
48    /// thus the emitted frame — are identical to a full parse.
49    pub fn from_bytes(
50        raw_dictionary: &[u8],
51    ) -> Result<Self, crate::decoding::errors::DictionaryDecodeError> {
52        Ok(Self {
53            inner: crate::decoding::Dictionary::decode_dict_for_encoding(raw_dictionary)?,
54        })
55    }
56
57    /// The dictionary id.
58    ///
59    /// A dictionary attached for encoding always has a non-zero id (the
60    /// `set_dictionary*` / `set_encoder_dictionary` attach path rejects a
61    /// zero id). This getter, however, reflects the wrapped dictionary as-is:
62    /// an `EncoderDictionary` built via [`Self::from_dictionary`] from a raw
63    /// `Dictionary` with `id == 0` reports `0` here until it is attached.
64    pub fn id(&self) -> u32 {
65        self.inner.id
66    }
67}
68
69/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
70///
71/// `FrameCompressor` will generally be used by:
72/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
73/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
74///
75/// # Examples
76/// ```
77/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
78/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
79/// let mut output = std::vec::Vec::new();
80/// // Initialize a compressor.
81/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
82/// compressor.set_source(mock_data);
83/// compressor.set_drain(&mut output);
84///
85/// // `compress` writes the compressed output into the provided buffer.
86/// compressor.compress();
87/// ```
88pub struct FrameCompressor<
89    R: Read = &'static [u8],
90    W: Write = Vec<u8>,
91    M: Matcher = MatchGeneratorDriver,
92> {
93    uncompressed_data: Option<R>,
94    compressed_data: Option<W>,
95    compression_level: CompressionLevel,
96    dictionary: Option<EncoderDictionary>,
97    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
98    source_size_hint: Option<u64>,
99    state: CompressState<M>,
100    /// When true, emitted frames omit the 4-byte magic number prefix
101    /// (`ZSTD_f_zstd1_magicless`). Default false. The caller is
102    /// responsible for ensuring the decoder is configured for the
103    /// matching format — wire-format only round-trips with a
104    /// magicless-aware decoder.
105    magicless: bool,
106    /// Whether to emit a trailing XXH64 content checksum and set the frame
107    /// header's `Content_Checksum_flag` (semantics of upstream
108    /// `ZSTD_c_checksumFlag`). Default `false`, matching the upstream
109    /// library default; combined with the `hash` feature at frame-build
110    /// time, so without `hash` no checksum is emitted regardless. Set via
111    /// [`Self::set_content_checksum`].
112    content_checksum: bool,
113    /// Whether to record `Frame_Content_Size` in the frame header when the
114    /// total size is known (semantics of upstream `ZSTD_c_contentSizeFlag`).
115    /// Default `true`, matching upstream. With the flag off the header
116    /// carries a window descriptor instead (single-segment requires an FCS,
117    /// so it is disabled too). Set via [`Self::set_content_size_flag`].
118    content_size_flag: bool,
119    /// Whether to record the dictionary ID in the frame header when a
120    /// dictionary is attached (semantics of upstream `ZSTD_c_dictIDFlag`).
121    /// Default `true`, matching upstream. Decoders can still decode the
122    /// frame by being handed the right dictionary explicitly. Set via
123    /// [`Self::set_dictionary_id_flag`].
124    dict_id_flag: bool,
125    /// Upper bound on emitted block sizes (semantics of upstream
126    /// `ZSTD_c_targetCBlockSize`): capping the RAW block length at the
127    /// target bounds every physical block's compressed payload at the
128    /// target too (a compressed block never exceeds its raw input — the
129    /// raw-block fallback fires otherwise), so blocks land at or under
130    /// `target + 3` header bytes on the wire. `None` = no target (full
131    /// 128 KiB blocks). Set via [`Self::set_target_block_size`].
132    target_block_size: Option<u32>,
133    #[cfg(feature = "hash")]
134    hasher: XxHash64,
135    /// Block-layout introspection populated at the end of every
136    /// successful `compress()`. `None` until the first call.
137    /// Behind the `lsm` feature gate.
138    #[cfg(feature = "lsm")]
139    frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
140    /// When `true`, `compress()` XXH64-hashes each block's
141    /// uncompressed bytes and appends the low-32-bit digest to
142    /// `block_checksums`. Default `false` (zero cost). Gated on
143    /// `all(lsm, hash)` because XXH64 lives behind the `hash`
144    /// feature; an `lsm`-only build has no way to compute digests.
145    #[cfg(all(feature = "lsm", feature = "hash"))]
146    per_block_checksums_enabled: bool,
147    /// Per-block XXH64 (low 32 bits) digests captured during
148    /// `compress()` when `per_block_checksums_enabled` is set. Ordered
149    /// by block-emit order. `None` until the first call after enabling.
150    /// Gated on `all(lsm, hash)` (see `per_block_checksums_enabled`).
151    #[cfg(all(feature = "lsm", feature = "hash"))]
152    block_checksums: Option<alloc::vec::Vec<u32>>,
153    /// Per-physical-block decompressed (regenerated) sizes captured
154    /// during `compress()`, in block-emit order (1:1 with
155    /// `frame_emit_info.blocks`). Always captured under `lsm` (no
156    /// opt-in, unlike `block_checksums`) because `FrameEmitInfo` is
157    /// always built under `lsm` and `decompressed_byte_range` needs
158    /// the per-block sizes. Cleared and refilled per frame.
159    #[cfg(feature = "lsm")]
160    block_decompressed_sizes: alloc::vec::Vec<u32>,
161    /// Effective strategy tag when a public-parameter
162    /// [`Strategy`](crate::encoding::Strategy) override (#27) is active.
163    /// `Some` overrides the level-derived `state.strategy_tag` so the
164    /// literal-compression gates and dict-attach cutoff see the strategy
165    /// the matcher actually runs, not the base level's. `None` keeps the
166    /// level-derived tag.
167    strategy_override: Option<crate::encoding::strategy::StrategyTag>,
168}
169
170#[derive(Clone, Default)]
171pub(crate) struct CachedDictionaryEntropy {
172    pub(crate) huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
173    pub(crate) ll_previous: Option<PreviousFseTable>,
174    pub(crate) ml_previous: Option<PreviousFseTable>,
175    pub(crate) of_previous: Option<PreviousFseTable>,
176}
177
178impl CachedDictionaryEntropy {
179    /// Heap bytes the cached dictionary entropy holds: the literals Huffman
180    /// table plus any `Custom` LL/ML/OF FSE tables (the `Arc`-boxed `FSETable`
181    /// payload and its flat state array). `Default` / `Rle` variants own no heap.
182    pub(crate) fn heap_size(&self) -> usize {
183        let mut total = self.huff.as_ref().map_or(0, |h| h.heap_size());
184        for prev in [&self.ll_previous, &self.ml_previous, &self.of_previous] {
185            if let Some(PreviousFseTable::Custom(table)) = prev {
186                total +=
187                    core::mem::size_of::<crate::fse::fse_encoder::FSETable>() + table.heap_size();
188            }
189        }
190        total
191    }
192
193    /// Derive the encoder-side entropy tables a dictionary seeds for the first
194    /// block of each frame (the upstream zstd `cdict->cBlockState`): the literals
195    /// Huffman table plus the literal-length / match-length / offset FSE
196    /// "previous" tables. Shared by [`FrameCompressor`] and
197    /// [`crate::encoding::StreamingEncoder`] so both seed identically.
198    pub(crate) fn from_dictionary(dictionary: &crate::decoding::Dictionary) -> Self {
199        Self {
200            huff: dictionary.huf.table.to_encoder_table(),
201            ll_previous: dictionary
202                .fse
203                .literal_lengths
204                .to_encoder_table()
205                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
206            ml_previous: dictionary
207                .fse
208                .match_lengths
209                .to_encoder_table()
210                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
211            of_previous: dictionary
212                .fse
213                .offsets
214                .to_encoder_table()
215                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
216        }
217    }
218}
219
220/// Shared owner for a custom "previous" FSE encoder table. `Arc` on
221/// atomic-pointer targets, `Rc` otherwise (keeps `no_std` no-atomics
222/// builds compiling, single-thread there anyway), mirroring
223/// `decoding::dictionary::SharedDictionary`. Cloning the cached
224/// dictionary entropy into the per-frame state is then a refcount bump,
225/// not a full `FSETable` copy — the upstream zstd references `cdict->cBlockState`
226/// instead of rebuilding it per frame.
227#[cfg(target_has_atomic = "ptr")]
228pub(crate) type SharedFseTable = alloc::sync::Arc<FSETable>;
229#[cfg(not(target_has_atomic = "ptr"))]
230pub(crate) type SharedFseTable = alloc::rc::Rc<FSETable>;
231
232#[derive(Clone)]
233pub(crate) enum PreviousFseTable {
234    // Default tables are immutable and already stored alongside the state, so
235    // repeating them only needs a lightweight marker instead of cloning FSETable.
236    Default,
237    // Shared handle: cloning (per-frame dictionary entropy seed) is a refcount
238    // bump. The table is only ever read or REPLACED wholesale (a block that
239    // builds a new table swaps in a fresh `SharedFseTable`), never mutated in
240    // place, so sharing is sound.
241    Custom(SharedFseTable),
242    Rle(u8),
243}
244
245impl PreviousFseTable {
246    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
247        match self {
248            Self::Default => Some(default),
249            Self::Custom(table) => Some(table),
250            Self::Rle(_) => None,
251        }
252    }
253}
254
255pub(crate) struct FseTables {
256    /// The three predefined LL/ML/OF tables are functions of
257    /// compile-time-constant distributions. The
258    /// [`fse_encoder::FseDefaultTable`] type alias resolves to
259    /// `&'static FSETable` when a process-wide cache is available
260    /// (atomic-pointer targets, or no-atomic targets with the
261    /// `critical-section` feature) and to `Box<FSETable>` on the
262    /// cache-less no-atomic path (one per-frame allocation, dropped
263    /// with the compressor — no `Box::leak`, no unbounded growth).
264    /// Both arms `Deref` to `FSETable`, so consumers in
265    /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
266    /// without seeing the per-target divergence.
267    pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
268    pub(crate) ll_previous: Option<PreviousFseTable>,
269    pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
270    pub(crate) ml_previous: Option<PreviousFseTable>,
271    pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
272    pub(crate) of_previous: Option<PreviousFseTable>,
273}
274
275impl FseTables {
276    pub fn new() -> Self {
277        Self {
278            ll_default: default_ll_table(),
279            ll_previous: None,
280            ml_default: default_ml_table(),
281            ml_previous: None,
282            of_default: default_of_table(),
283            of_previous: None,
284        }
285    }
286
287    /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
288    /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
289    /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
290    /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
291    /// through `Box`. Both arms yield `&FSETable` uniformly so
292    /// downstream consumers can stay cfg-agnostic.
293    #[inline]
294    #[allow(clippy::borrow_deref_ref)]
295    pub(crate) fn ll_default_ref(&self) -> &FSETable {
296        &*self.ll_default
297    }
298
299    /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
300    #[inline]
301    #[allow(clippy::borrow_deref_ref)]
302    pub(crate) fn ml_default_ref(&self) -> &FSETable {
303        &*self.ml_default
304    }
305
306    /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
307    #[inline]
308    #[allow(clippy::borrow_deref_ref)]
309    pub(crate) fn of_default_ref(&self) -> &FSETable {
310        &*self.of_default
311    }
312}
313
314const PRESPLIT_BLOCK_MIN: usize = 3500;
315const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
316const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
317const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
318const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
319const PRESPLIT_HASH_LOG_MAX: usize = 10;
320const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
321const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
322/// Upstream zstd `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
323/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
324/// drive the cheap border heuristic; a third one from the middle disambiguates
325/// where in the block the transition sits.
326const PRESPLIT_BORDERS_SEGMENT: usize = 512;
327
328#[derive(Clone)]
329struct PreSplitFingerprint {
330    events: [u32; PRESPLIT_HASH_TABLE_SIZE],
331    nb_events: usize,
332}
333
334impl Default for PreSplitFingerprint {
335    fn default() -> Self {
336        Self {
337            events: [0; PRESPLIT_HASH_TABLE_SIZE],
338            nb_events: 0,
339        }
340    }
341}
342
343/// Grow `out` ahead of the next block so block emission never lands on an
344/// amortized-doubling reallocation mid-frame (whose transient old+new copy
345/// spikes peak memory to ~3x the output), sizing the reservation from the
346/// compression ratio observed so far instead of the whole-input worst case.
347///
348/// `blocks_start` is where this frame's blocks begin in `out`, `consumed`
349/// the input bytes already emitted as blocks, `remaining` the input
350/// bytes still to compress (an estimate is fine: a low one only means one
351/// more re-estimate later), and `block_capacity` the active block-size cap
352/// (`FrameCompressor::block_capacity`) so a small `targetCBlockSize` does
353/// not keep a 128 KiB floor in the buffer or undercount header density.
354/// Incompressible input re-estimates to ~the full `compress_bound` after
355/// the first block — the old up-front policy's worst case — while
356/// compressible input stays at output scale.
357fn reserve_for_next_block(
358    out: &mut Vec<u8>,
359    blocks_start: usize,
360    consumed: u64,
361    remaining: usize,
362    block_capacity: usize,
363) {
364    // Worst-case single-block output: 3-byte header + raw payload, plus
365    // slack for the 4-byte frame checksum trailer and a few extra sub-block
366    // headers from the post-split emitters, so neither can reallocate.
367    let block_bound = remaining.min(block_capacity) + 3 + 16;
368    if out.capacity() - out.len() >= block_bound {
369        return;
370    }
371    let produced = (out.len() - blocks_start) as u64;
372    let estimate = if consumed == 0 {
373        // No ratio signal yet (capacity exhausted before the first block —
374        // only reachable with a caller-shrunk `out`): one block's bound.
375        block_bound
376    } else {
377        // remaining * observed ratio + per-block headers + 1/16 slack so a
378        // slightly-worsening tail doesn't force a reallocation per block.
379        // u128 keeps the product exact for multi-GiB frames.
380        let scaled = ((remaining as u128 * produced as u128) / consumed as u128) as u64;
381        let headers = (remaining as u64 / block_capacity.max(1) as u64 + 1) * 3;
382        usize::try_from(scaled + scaled / 16 + headers + 64).unwrap_or(usize::MAX)
383    };
384    // `reserve_exact`: the estimate already carries its own slack, and the
385    // whole-buffer doubling policy is exactly what this function exists to
386    // avoid. The `produced`-sized floor keeps growth geometric when the
387    // ratio estimate lands BELOW one block's bound (highly compressible
388    // input): without it every block would trigger a block-sized
389    // reallocation — O(blocks) buffer copies — while with it the buffer at
390    // least doubles its produced span per reallocation (O(log) copies) and
391    // the peak stays at output scale.
392    out.reserve_exact(estimate.max(block_bound + produced as usize));
393}
394
395fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
396    debug_assert!(hash_log >= 8);
397    if hash_log == 8 {
398        return bytes[0] as usize;
399    }
400    debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
401    let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
402    (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
403}
404
405fn presplit_record_fingerprint(
406    fp: &mut PreSplitFingerprint,
407    src: &[u8],
408    sampling_rate: usize,
409    hash_log: usize,
410) {
411    fp.events.fill(0);
412    fp.nb_events = 0;
413    if src.len() < 2 {
414        return;
415    }
416    let limit = src.len() - 1;
417    let mut n = 0usize;
418    while n < limit {
419        fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
420        n += sampling_rate;
421    }
422    // Upstream zstd parity: zstd_preSplit.c records the integer division, not the
423    // rounded-up number of sampled events from the loop above.
424    fp.nb_events += limit / sampling_rate;
425}
426
427/// Single-byte histogram pass — matches upstream zstd `HIST_add` over a small
428/// segment with `hashLog == 8` (the `hash2` shortcut at
429/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
430/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
431/// heuristic doesn't pay for that wider hash on its 512-byte windows.
432fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
433    fp.events.fill(0);
434    for &b in src {
435        fp.events[b as usize] += 1;
436    }
437    // Upstream zstd `HIST_add` returns the maximum symbol; the caller then sets
438    // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
439    fp.nb_events = src.len();
440}
441
442fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
443    let slots = 1usize << hash_log;
444    let mut distance = 0u64;
445    for idx in 0..slots {
446        let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
447        let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
448        // Plain `+`: events/nb_events are per-block sample counts (<= block
449        // size), so each |left-right| <= (2^17)^2 and the sum over <= 2^hash_log
450        // slots stays far under u64::MAX — no overflow.
451        distance += left.abs_diff(right) as u64;
452    }
453    distance
454}
455
456fn presplit_fingerprints_differ(
457    reference: &PreSplitFingerprint,
458    new_fp: &PreSplitFingerprint,
459    penalty: i32,
460    hash_log: usize,
461) -> bool {
462    debug_assert!(reference.nb_events > 0);
463    debug_assert!(new_fp.nb_events > 0);
464    let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
465    let deviation = presplit_distance(reference, new_fp, hash_log);
466    // Plain `*`: p50 <= (block-sample-count)^2 and the (base+penalty) factor is
467    // a small constant, so the product stays well under u64::MAX.
468    let threshold =
469        p50 * (PRESPLIT_THRESHOLD_BASE + penalty as u64) / PRESPLIT_THRESHOLD_PENALTY_RATE;
470    deviation >= threshold
471}
472
473fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
474    // Plain `+`: `acc` accumulates only the chunks of a single block (caller
475    // loops within one block, <= MAX_BLOCK_SIZE), so the merged sample counts
476    // stay far under u32 / usize bounds — no overflow.
477    for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
478        acc.events[idx] += new_fp.events[idx];
479    }
480    acc.nb_events += new_fp.nb_events;
481}
482
483fn split_block_by_chunks(block: &[u8], level: usize) -> usize {
484    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
485    debug_assert!((1..=4).contains(&level));
486    let (sampling_rate, hash_log) = match level - 1 {
487        0 => (43, 8),
488        1 => (11, 9),
489        2 => (5, 10),
490        _ => (1, 10),
491    };
492
493    let mut past = PreSplitFingerprint::default();
494    let mut new_events = PreSplitFingerprint::default();
495    let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
496    presplit_record_fingerprint(
497        &mut past,
498        &block[..PRESPLIT_CHUNK_SIZE],
499        sampling_rate,
500        hash_log,
501    );
502    let mut pos = PRESPLIT_CHUNK_SIZE;
503    while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
504        presplit_record_fingerprint(
505            &mut new_events,
506            &block[pos..pos + PRESPLIT_CHUNK_SIZE],
507            sampling_rate,
508            hash_log,
509        );
510        if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
511            return pos;
512        }
513        presplit_merge_events(&mut past, &new_events);
514        if penalty > 0 {
515            penalty -= 1;
516        }
517        pos += PRESPLIT_CHUNK_SIZE;
518    }
519    block.len()
520}
521
522/// Upstream zstd port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
523/// Records two 512-byte byte-histograms — one from each end of a 128 KB
524/// block — and a third from the middle as a tie-breaker; returns either
525/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
526/// size when the two ends look indistinguishable. Cheaper than the
527/// chunk-based path because it touches at most 1.5 KB of input
528/// regardless of block size.
529fn split_block_from_borders(block: &[u8]) -> usize {
530    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
531    let block_size = block.len();
532    let mut past = PreSplitFingerprint::default();
533    let mut new_fp = PreSplitFingerprint::default();
534    presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
535    presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
536    // Upstream zstd uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
537    // distance with no threshold padding (`zstd_preSplit.c:214`).
538    if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
539        return block_size;
540    }
541
542    let mut middle = PreSplitFingerprint::default();
543    let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
544    presplit_record_byte_histogram(
545        &mut middle,
546        &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
547    );
548
549    let dist_from_begin = presplit_distance(&past, &middle, 8);
550    let dist_from_end = presplit_distance(&new_fp, &middle, 8);
551    // Upstream zstd `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
552    // if the middle is roughly equidistant from both ends, the change
553    // sits near the centre — split at the midpoint.
554    let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
555    if dist_from_begin.abs_diff(dist_from_end) < min_distance {
556        return 64 * 1024;
557    }
558    // Larger `dist_from_begin` (i.e. `middle` farther from the head
559    // fingerprint, equivalently closer to the tail) means the new
560    // statistics already dominate the centre — the transition
561    // happened EARLY → emit a small 32 KB head and let the 96 KB
562    // tail absorb the rest. Inverse case: `dist_from_end` larger
563    // (middle still resembles the head) means the transition is
564    // LATE → emit a 96 KB head so the trailing 32 KB carries the
565    // new statistics alone.
566    if dist_from_begin > dist_from_end {
567        32 * 1024
568    } else {
569        96 * 1024
570    }
571}
572
573/// XXH64 (low 32 bits, seed 0) over `data`. Shared helper for the
574/// per-physical-block checksum sidecar so encoder and decoder hash
575/// the exact same byte ranges with the exact same parameters. Gated
576/// at `all(lsm, hash)` because the only consumer is the lsm-side
577/// `block_checksums` sidecar; non-lsm builds carry no reference to
578/// this helper at all.
579#[cfg(all(feature = "lsm", feature = "hash"))]
580#[inline]
581pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
582    let mut h = XxHash64::with_seed(0);
583    h.write(data);
584    h.finish() as u32
585}
586
587/// Bench-only entry point for the upstream zstd-parity comparator test in
588/// `tests/block_splitter_parity.rs`. Dispatches to the same
589/// `_from_borders` (split_level == 0) / `_by_chunks` (split_level ∈
590/// 1..=4) ports that `optimal_block_size` itself routes
591/// through. Caller is responsible for passing exactly
592/// `MAX_BLOCK_SIZE` bytes (per upstream zstd `ZSTD_splitBlock` contract —
593/// "@blockSize must be == 128 KB" in `zstd_preSplit.h`).
594#[cfg(feature = "bench_internals")]
595pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
596    assert_eq!(
597        block.len(),
598        MAX_BLOCK_SIZE as usize,
599        "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
600    );
601    assert!(
602        split_level <= 4,
603        "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
604    );
605    if split_level == 0 {
606        split_block_from_borders(block)
607    } else {
608        split_block_by_chunks(block, split_level)
609    }
610}
611
612/// Pull a pre-split window into cache with one bandwidth-bound sequential
613/// pass before the strided fingerprint histogram + match scan read it.
614///
615/// The borrowed (no-copy) over-window path matches in place on the caller's
616/// input, so the pre-split fingerprint is the FIRST touch of that 128 KiB
617/// region — a cache-cold read. `presplit_record_fingerprint` reads it with a
618/// `sampling_rate` stride and interleaved random writes into the 1 KiB events
619/// table, a latency-bound pattern that pays full DRAM miss latency per line
620/// (measured ~3x the cost of an ERMS streaming read of the same bytes). The
621/// owned path never hits this because its history-mirror copy already warmed
622/// the bytes; this restores that warmth without the copy's write half. One
623/// dependent load per 64-byte line (the i9 line size) streams under the
624/// hardware prefetcher, so the cold read is paid once at memory bandwidth and
625/// every subsequent strided sample lands in L1/L2. `black_box` keeps the loop
626/// from being optimized away as a dead read.
627#[inline]
628fn warm_presplit_window(window: &[u8]) {
629    let mut acc = 0u8;
630    let mut i = 0usize;
631    while i < window.len() {
632        acc ^= window[i];
633        i += 64;
634    }
635    core::hint::black_box(acc);
636}
637
638pub(crate) fn optimal_block_size(
639    level: CompressionLevel,
640    block: &[u8],
641    remaining_src_size: usize,
642    block_size_max: usize,
643    savings: i64,
644) -> usize {
645    let Some(split_level) = crate::encoding::match_generator::level_pre_split(level) else {
646        return remaining_src_size.min(block_size_max);
647    };
648    if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
649        return remaining_src_size.min(block_size_max);
650    }
651    if savings < 3 {
652        return MAX_BLOCK_SIZE as usize;
653    }
654    if block.len() < MAX_BLOCK_SIZE as usize {
655        return remaining_src_size.min(block_size_max);
656    }
657    // Upstream zstd `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
658    // `split_level == 0` → cheap borders heuristic;
659    // `split_level == 1..=4` → byChunks with internal sampling level
660    // `split_level - 1`.
661    let raw_split = if split_level == 0 {
662        split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
663    } else {
664        split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
665    };
666    raw_split
667        .max(PRESPLIT_BLOCK_MIN)
668        .min(MAX_BLOCK_SIZE as usize)
669}
670
671pub(crate) struct CompressState<M: Matcher> {
672    pub(crate) matcher: M,
673    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
674    /// Recycled `HuffmanTable` buffers: when a block clears or replaces
675    /// `last_huff_table`, the old table parks here instead of dropping, so
676    /// the next frame's dictionary entropy seed `clone_from`s into existing
677    /// allocations. Without this, every dict-seeded frame whose last block
678    /// ended raw/RLE paid a fresh two-Vec table clone per frame.
679    pub(crate) huff_table_spare: Option<crate::huff0::huff0_encoder::HuffmanTable>,
680    pub(crate) fse_tables: FseTables,
681    pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
682    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
683    /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
684    pub(crate) offset_hist: [u32; 3],
685    /// Strategy tag resolved from the current `CompressionLevel` at every
686    /// `matcher.reset()` call. Used by the literal-compression gates
687    /// (`min_literals_to_compress`, `min_gain`) in
688    /// `encoding::blocks::compressed` to mirror upstream zstd's strategy-aware
689    /// thresholds (`zstd_compress_literals.c:114-127, 187-188`).
690    ///
691    /// **Invariant (required of every construction site):** must be
692    /// initialized from the active `CompressionLevel` via
693    /// `StrategyTag::for_compression_level`, and re-synced from the
694    /// active level alongside every `matcher.reset()` call so the
695    /// level-aware gates stay correct after a level change. The two
696    /// reset sites that own this sync are `FrameCompressor::compress`
697    /// and `StreamingEncoder::ensure_frame_started`. There is no
698    /// `Default` impl — production constructors
699    /// (`FrameCompressor::new`, `new_with_matcher`, the streaming
700    /// encoder constructor) plumb this explicitly. Tests that build
701    /// `CompressState` by hand must also supply a value.
702    pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
703    /// Whether the HUF literal table build runs the #167 table-log search
704    /// (`true`) or the cheap single-build (`false`). For the Fast strategy the
705    /// search is a clean ratio win over upstream zstd but costs ~1.5 us per
706    /// literal section — negligible on large inputs, ~20% on small ones — and
707    /// the Fast matcher is byte-faithful to upstream zstd (so the cheap path
708    /// ties it). So it is gated ON only for large Fast frames; non-Fast
709    /// strategies always keep it (their matchers diverge, making the search
710    /// load-bearing for ratio — gating those is deferred). Set per frame
711    /// alongside `strategy_tag` via [`fast_huf_search_enabled`].
712    pub(crate) huf_optimal_search: bool,
713}
714
715/// Whether the Fast-strategy HUF literal build should run the #167 table-log
716/// search for a frame of `source_size` bytes (see
717/// [`CompressState::huf_optimal_search`]). Non-Fast strategies always search.
718/// For Fast, enable the search only above 128 KiB (one full block) — below
719/// that the per-frame setup dominates and the search's ~1.5 us is a large
720/// relative cost for a ratio gain the cheap path forgoes while still tying
721/// upstream zstd. Unknown size (streaming) keeps the search for conservative
722/// ratio.
723pub(crate) fn fast_huf_search_enabled(
724    strategy: crate::encoding::strategy::StrategyTag,
725    source_size: Option<u64>,
726) -> bool {
727    if strategy != crate::encoding::strategy::StrategyTag::Fast {
728        return true;
729    }
730    match source_size {
731        Some(size) => size > 128 * 1024,
732        None => true,
733    }
734}
735
736impl<M: Matcher> CompressState<M> {
737    /// Clears `last_huff_table`, parking the table's buffers in
738    /// `huff_table_spare` for reuse instead of dropping them.
739    #[inline]
740    pub(crate) fn clear_huff_table(&mut self) {
741        if let Some(table) = self.last_huff_table.take() {
742            self.huff_table_spare = Some(table);
743        }
744    }
745
746    /// Replaces `last_huff_table` with `table`, parking any displaced table
747    /// in `huff_table_spare` for reuse.
748    #[inline]
749    pub(crate) fn replace_huff_table(&mut self, table: crate::huff0::huff0_encoder::HuffmanTable) {
750        if let Some(old) = self.last_huff_table.replace(table) {
751            self.huff_table_spare = Some(old);
752        }
753    }
754}
755
756/// Per-frame setup resolved once by [`FrameCompressor::prepare_frame`] and
757/// consumed by the block loop + [`FrameCompressor::finish_frame`]. Lets the
758/// owned `compress()` and the borrowed one-shot path share identical
759/// reset / dict-prime / entropy-seed setup and frame-tail emission.
760struct FramePrep {
761    window_size: u64,
762    use_dictionary_state: bool,
763    source_size_hint_known: bool,
764    initial_size_hint: Option<u64>,
765}
766
767/// Initial capacity for the `all_blocks` accumulator, by source-size hint.
768/// The frame header is written only after all input is read (so
769/// Frame_Content_Size is known), so compressed blocks accumulate in memory
770/// first. Seed-size tiers (mirrors upstream zstd `ZSTD_CStreamOutSize` naming):
771/// - tiny (`<= 4 KiB` hint): payload-bound seed, `>=` anything a tiny input's
772///   compressed output could need.
773/// - small (`<= 64 KiB` hint): absorbs one or two `Vec::extend` doublings
774///   without over-allocating.
775/// - default (one upstream zstd block, `130 KiB`): the value the rest of the encoder
776///   is sized around; larger inputs amortise the first doublings cheaply and
777///   the residue is dominated by internal `compress_block_encoded` buffers.
778///
779/// Shared by the owned (`run_owned_block_loop`) and borrowed
780/// (`run_borrowed_block_loop`) paths so the tier table can't drift between them.
781///
782/// `block_capacity` (the active `targetCBlockSize` cap, or the 128 KiB
783/// format ceiling) bounds every tier: with a small target the first
784/// allocation tracks one capped block + header/checksum slack instead of
785/// keeping the upstream zstd-sized floor that only later growth respects.
786fn initial_all_blocks_cap(initial_size_hint: Option<u64>, block_capacity: usize) -> usize {
787    const TINY_THRESHOLD: u64 = 4 * 1024;
788    const SMALL_THRESHOLD: u64 = 64 * 1024;
789    const TINY_CAP: usize = 4 * 1024;
790    const SMALL_CAP: usize = 16 * 1024;
791    const DEFAULT_CAP: usize = 130 * 1024;
792    let first_block_cap = block_capacity + 3 + 16;
793    match initial_size_hint {
794        Some(h) if h <= TINY_THRESHOLD => TINY_CAP.min(first_block_cap),
795        Some(h) if h <= SMALL_THRESHOLD => SMALL_CAP.min(first_block_cap),
796        _ => DEFAULT_CAP.min(first_block_cap),
797    }
798}
799
800/// Per-block feeder for `run_owned_block_loop`.
801///
802/// `fill_block` appends source bytes to `buf` (which already holds any
803/// carried pre-split suffix) until `buf.len() == block_capacity` or the
804/// source is exhausted, returning `(bytes_appended, reached_eof)`.
805/// `reached_eof` is true iff the block could NOT be filled to
806/// `block_capacity` — the boundary the historical `Read`-loop produced (an
807/// input that is an exact multiple of the block size still yields a
808/// trailing empty last block on the next iteration).
809///
810/// The slice impl exists so the slice entry points
811/// (`compress_independent_frame_into`, `compress_oneshot_*` fallbacks)
812/// append with one `extend_from_slice` — the generic reader impl must
813/// `resize` an initialized target region before `Read::read` can fill it,
814/// which costs a zero-fill memset of the whole block on every frame.
815pub(crate) trait OwnedBlockSource {
816    fn fill_block(
817        &mut self,
818        buf: &mut Vec<u8>,
819        block_capacity: usize,
820        size_hint_remaining: Option<u64>,
821    ) -> (usize, bool);
822}
823
824impl OwnedBlockSource for &[u8] {
825    fn fill_block(
826        &mut self,
827        buf: &mut Vec<u8>,
828        block_capacity: usize,
829        _size_hint_remaining: Option<u64>,
830    ) -> (usize, bool) {
831        let want = block_capacity - buf.len();
832        let take = want.min(self.len());
833        buf.extend_from_slice(&self[..take]);
834        *self = &self[take..];
835        (take, take < want)
836    }
837}
838
839/// Adapter routing a generic [`Read`] source through [`OwnedBlockSource`]:
840/// preserves the historical sizing behaviour — an initialized target region
841/// bounded by the source-size hint, grown (doubling, capped) only when the
842/// hint under-counted.
843pub(crate) struct ReaderBlockSource<Rd>(pub(crate) Rd);
844
845impl<Rd: Read> OwnedBlockSource for ReaderBlockSource<Rd> {
846    fn fill_block(
847        &mut self,
848        buf: &mut Vec<u8>,
849        block_capacity: usize,
850        size_hint_remaining: Option<u64>,
851    ) -> (usize, bool) {
852        let start = buf.len();
853        let mut filled = start;
854        let mut reached_eof = false;
855        // Size the read buffer to the bytes this block actually expects
856        // rather than always zero-filling a full MAX_BLOCK_SIZE: a small
857        // frame otherwise pays a 128 KiB `resize(_, 0)` memset per block
858        // just to read a few KiB (the zero-fill past `filled` is then
859        // truncated away).
860        //
861        // Overflow-free by construction (no `saturating_*` masking):
862        // `filled <= block_capacity` always (the read only ever targets
863        // `[filled..len]` with `len <= block_capacity`, and a carried-over
864        // pre-split suffix is a `split_off` below `block_capacity`), so
865        // `block_capacity - filled` never underflows; pinning `remaining`
866        // to `block_capacity` before the `usize` cast keeps the cast and
867        // the final add within `usize` on every target.
868        let initial_target = match size_hint_remaining {
869            Some(remaining) => {
870                let remaining = remaining.min(block_capacity as u64) as usize;
871                filled + remaining.min(block_capacity - filled)
872            }
873            // Unknown hint, or an inexact hint already met by prior blocks:
874            // read against the full block window.
875            None => block_capacity,
876        };
877        if buf.len() < initial_target {
878            buf.resize(initial_target, 0);
879        }
880        loop {
881            if reached_eof || filled == block_capacity {
882                break;
883            }
884            if filled == buf.len() {
885                // Hint under-counted the block; grow toward block_capacity
886                // (doubling, capped) so reading continues without paying a
887                // full-buffer zero up front. `len <= block_capacity` so the
888                // double stays well within `usize`; `filled < block_capacity`
889                // here (the `== block_capacity` break fired otherwise), so
890                // `filled + 1 <= block_capacity`.
891                let grow_to = (buf.len() * 2).clamp(filled + 1, block_capacity);
892                buf.resize(grow_to, 0);
893            }
894            let read_end = buf.len();
895            let new_bytes = self.0.read(&mut buf[filled..read_end]).unwrap();
896            if new_bytes == 0 {
897                reached_eof = true;
898                break;
899            }
900            filled += new_bytes;
901        }
902        buf.truncate(filled);
903        (filled - start, reached_eof)
904    }
905}
906
907impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
908    /// Create a new `FrameCompressor`
909    pub fn new(compression_level: CompressionLevel) -> Self {
910        Self {
911            uncompressed_data: None,
912            compressed_data: None,
913            compression_level,
914            dictionary: None,
915            dictionary_entropy_cache: None,
916            source_size_hint: None,
917            state: CompressState {
918                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
919                last_huff_table: None,
920                huff_table_spare: None,
921                fse_tables: FseTables::new(),
922                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
923                offset_hist: [1, 4, 8],
924                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
925                    compression_level,
926                ),
927                huf_optimal_search: true,
928            },
929            magicless: false,
930            content_checksum: false,
931            content_size_flag: true,
932            dict_id_flag: true,
933            target_block_size: None,
934            #[cfg(feature = "hash")]
935            hasher: XxHash64::with_seed(0),
936            #[cfg(feature = "lsm")]
937            frame_emit_info: None,
938            #[cfg(all(feature = "lsm", feature = "hash"))]
939            per_block_checksums_enabled: false,
940            #[cfg(all(feature = "lsm", feature = "hash"))]
941            block_checksums: None,
942            #[cfg(feature = "lsm")]
943            block_decompressed_sizes: alloc::vec::Vec::new(),
944            strategy_override: None,
945        }
946    }
947
948    /// Configure fine-grained compression parameters (#27).
949    ///
950    /// Resets the base [`CompressionLevel`](crate::encoding::CompressionLevel)
951    /// to the parameters' level and installs the per-knob overrides
952    /// (window/hash/chain/search logs, strategy, LDM) applied at the next
953    /// frame. Pass `None`-equivalent (a builder that overrides nothing)
954    /// to fall back to plain level-based compression.
955    ///
956    /// ```rust
957    /// use structured_zstd::encoding::{
958    ///     CompressionLevel, CompressionParameters, FrameCompressor, Strategy,
959    /// };
960    /// let params = CompressionParameters::builder(CompressionLevel::Level(19))
961    ///     .strategy(Strategy::Btultra2)
962    ///     .enable_long_distance_matching(true)
963    ///     .build()
964    ///     .unwrap();
965    /// let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
966    /// compressor.set_parameters(&params);
967    /// let compressed = compressor.compress_independent_frame(b"some data to compress");
968    /// assert!(!compressed.is_empty());
969    /// ```
970    pub fn set_parameters(&mut self, params: &crate::encoding::CompressionParameters) {
971        self.compression_level = params.level();
972        let overrides = params.overrides();
973        self.strategy_override = overrides.strategy.map(|s| s.tag());
974        // Keep `state.strategy_tag` consistent immediately so the borrowed
975        // one-shot eligibility gate (`borrowed_eligible`) and literal gates
976        // are correct even before the next `compress()` re-sync.
977        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
978            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
979        });
980        self.state.huf_optimal_search =
981            fast_huf_search_enabled(self.state.strategy_tag, self.source_size_hint);
982        self.state.matcher.set_param_overrides(Some(overrides));
983    }
984
985    /// Whether the borrowed (no per-block history copy) one-shot loop is
986    /// valid for an `input_len`-byte slice under the resolved `prep`.
987    ///
988    /// `Uncompressed` resolves to `StrategyTag::Fast` but must emit stored
989    /// Raw blocks, which the borrowed loop's
990    /// `compress_block_encoded_borrowed` (RLE/raw-fast/compressed) does NOT
991    /// do, so exclude it; it then takes the owned path's dedicated
992    /// Uncompressed arm.
993    ///
994    /// No window-size gate: over-window inputs are handled too. The owned
995    /// path bounds matches to the last `advertised_window` bytes via
996    /// `window_low` and evicts/rehashes its history; the borrowed path
997    /// computes the identical `window_low = block_end - advertised_window`
998    /// and the kernel rejects any hash candidate below it, while the
999    /// per-position `put` during the scan keeps in-window slots current,
1000    /// so it produces byte-identical output to the owned (evicting) path
1001    /// without ever copying the input into `history`, even when the input
1002    /// far exceeds the window.
1003    ///
1004    /// BUT gate on `input_len <= u32::MAX`: the Fast kernel stores ABSOLUTE
1005    /// positions in a `u32` hash table, and the borrowed scan walks
1006    /// absolute input offsets up to `block_end == input.len()`. Past 4 GiB
1007    /// those offsets truncate / overflow the `u32` position math
1008    /// (`base_off + ip0 as u32`, `window_low`), panicking or corrupting.
1009    /// The owned/evicting path keeps the scanned window bounded (positions
1010    /// stay small), so >4 GiB inputs fall back to it.
1011    fn borrowed_eligible(&self, input_len: usize, prep: &FramePrep) -> bool {
1012        if matches!(self.compression_level, CompressionLevel::Uncompressed)
1013            || input_len > u32::MAX as usize
1014        {
1015            return false;
1016        }
1017        if prep.use_dictionary_state {
1018            // The borrowed dict scan runs in VIRTUAL `[dict][input]` coordinates,
1019            // so the position space is `dict_content.len() + input_len`, not just
1020            // `input_len`. A large attached dictionary plus an otherwise-allowed
1021            // input can exceed the `u32` floor the kernel asserts — fall back to
1022            // the owned (copy) path in that case.
1023            let fits_u32 = self
1024                .dictionary
1025                .as_ref()
1026                .and_then(|dict| dict.inner.dict_content.len().checked_add(input_len))
1027                .is_some_and(|virtual_len| virtual_len <= u32::MAX as usize);
1028            if !fits_u32 {
1029                return false;
1030            }
1031            // Dictionary frames: only the Simple (Fast) backend in attach mode
1032            // has a borrowed (no input copy) dict scan. Copy-mode dict frames
1033            // and the other backends still take the owned path.
1034            return self.state.matcher.borrowed_dict_supported();
1035        }
1036        // The borrowed (no-copy, in-place over-window) scan exists for the
1037        // Simple (Fast), Dfast, and Row backends, and for the HashChain
1038        // backend's lazy CHAIN parser; BT/optimal (BinaryTree search) stay on
1039        // the owned path. Every borrowed scan applies the per-position
1040        // `window_low = abs_ip - advertised_window` offset cap so over-window
1041        // inputs are matched in place (no input->history copy), matching C's
1042        // continuous-index + windowLow one-shot behaviour.
1043        self.state.matcher.borrowed_supported()
1044    }
1045
1046    /// Compress `input` as one frame's worth of blocks into `out` (appended
1047    /// from its current end): the borrowed in-place loop when
1048    /// [`Self::borrowed_eligible`], else the owned (history-copying) loop fed
1049    /// an in-place `&[u8]` cursor. Returns `total_uncompressed`; the caller
1050    /// emits the frame header (before this call, when the content size is
1051    /// known) or the drain tail.
1052    fn run_one_frame(&mut self, input: &[u8], prep: &FramePrep, out: &mut Vec<u8>) -> u64 {
1053        if self.borrowed_eligible(input.len(), prep) {
1054            self.run_borrowed_block_loop(input, out)
1055        } else {
1056            let mut cursor: &[u8] = input;
1057            self.run_owned_block_loop(&mut cursor, prep.initial_size_hint, true, out)
1058        }
1059    }
1060
1061    /// Compress one contiguous `&[u8]` as a single independent Zstd frame,
1062    /// writing the frame bytes into `out` (its previous contents are
1063    /// replaced and its allocation reused), reusing this compressor's heavy
1064    /// state across calls.
1065    ///
1066    /// This is the reusable-compression-context (CCtx-equivalent) entry
1067    /// point, mirroring C `ZSTD_compress2` over a reused `ZSTD_CCtx`:
1068    /// construct ONE `FrameCompressor` and call this in a loop to emit N
1069    /// independent, self-describing frames (each carrying its own header,
1070    /// blocks, and checksum, decodable in isolation, with no cross-frame
1071    /// match history). Every call resets the per-frame state via
1072    /// [`Self::prepare_frame`]: only the allocations are kept, so the
1073    /// dominant per-frame setup cost (table allocation + dictionary prime)
1074    /// is paid once instead of N times. Passing the same `out` buffer each
1075    /// call additionally reuses the output allocation, matching C's
1076    /// caller-owned `dst` buffer (no per-frame output allocation).
1077    ///
1078    /// Reusing the context + `out` across many small frames (the typical
1079    /// per-block-frame workload) is far cheaper than a fresh
1080    /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec)
1081    /// per block, which allocates and primes from scratch each time.
1082    ///
1083    /// The input is read in place: no [`Self::set_source`] /
1084    /// [`Self::set_drain`] setup is required, and the input lifetime is not
1085    /// baked into the compressor type, so successive calls may pass slices
1086    /// with unrelated lifetimes. When the Fast (Simple) backend is active
1087    /// and no dictionary is set, the matcher references the input directly
1088    /// (no per-block history copy); other backends / dictionary use copy
1089    /// each block into history exactly as the streaming
1090    /// [`compress`](Self::compress) path does. The source-size hint is
1091    /// derived from the input length on every call, so per-frame table
1092    /// sizing tracks each frame's actual size regardless of any earlier
1093    /// hint.
1094    ///
1095    /// A sticky dictionary set via
1096    /// [`set_dictionary`](Self::set_dictionary) (or its variants) is primed
1097    /// into every frame, mirroring `ZSTD_CCtx_loadDictionary` /
1098    /// `ZSTD_CCtx_refCDict`.
1099    ///
1100    /// # Panics
1101    ///
1102    /// Panics on encoder error, matching [`Self::compress`] and
1103    /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec).
1104    pub fn compress_independent_frame_into(&mut self, input: &[u8], out: &mut Vec<u8>) {
1105        // Size the next frame from the actual payload, not a stale hint a
1106        // previous call may have left behind (a wrong hint would change the
1107        // resolved window/header and could flip borrowed eligibility).
1108        self.source_size_hint = Some(input.len() as u64);
1109        let prep = self.prepare_frame();
1110        // Content size is known up front (one-shot), so write the frame
1111        // header FIRST and emit blocks STRAIGHT into `out` — no separate
1112        // `all_blocks` accumulator and no header+blocks copy (which was the
1113        // dominant per-frame memmove + the only un-amortized per-frame alloc
1114        // even when the compressor is reused).
1115        let total_uncompressed = input.len() as u64;
1116        let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1117        let checksum_len = if emit_checksum { 4 } else { 0 };
1118        out.clear();
1119        // Reserve the header plus ONE block's worst case up front; the block
1120        // loops then grow `out` from the compression ratio observed so far
1121        // (`reserve_for_next_block`). Reserving `compress_bound(input_len)`
1122        // here held a whole-input-sized allocation for the entire frame —
1123        // ~100 MiB peak on a 100 MiB stream whose compressed output is a few
1124        // MiB, where the reference implementation's context peaks at
1125        // window-sized state. Small frames (<= one block) still get their
1126        // full bound in one shot, so the reused-`out` steady state is
1127        // unchanged. 18 = max frame header (magic 4 + descriptor 1 + window
1128        // 1 + dict id 4 + FCS 8).
1129        let first_block_bound = input.len().min(self.block_capacity()) + 3;
1130        out.reserve(18 + first_block_bound + checksum_len);
1131        self.append_frame_header(total_uncompressed, &prep, out);
1132        let header_len = out.len();
1133        let _ = self.run_one_frame(input, &prep, out);
1134        #[cfg(feature = "hash")]
1135        if self.content_checksum {
1136            out.extend_from_slice(&(self.hasher.finish() as u32).to_le_bytes());
1137        }
1138        #[cfg(feature = "lsm")]
1139        {
1140            let blocks_end = out.len() - checksum_len;
1141            self.populate_frame_emit_info(header_len, &out[header_len..blocks_end], emit_checksum);
1142        }
1143        #[cfg(not(feature = "lsm"))]
1144        let _ = header_len;
1145    }
1146
1147    /// Convenience wrapper over [`Self::compress_independent_frame_into`]
1148    /// that allocates and returns a fresh `Vec` per call. Prefer the
1149    /// `_into` form in tight per-block-frame loops to reuse one output
1150    /// buffer across frames (the CCtx-equivalent zero-per-call-alloc
1151    /// output, matching C's caller-owned `dst`).
1152    ///
1153    /// ```rust
1154    /// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
1155    /// let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
1156    /// let frame_a = cctx.compress_independent_frame(b"first block payload");
1157    /// let frame_b = cctx.compress_independent_frame(b"second block payload");
1158    /// assert!(!frame_a.is_empty() && !frame_b.is_empty());
1159    /// ```
1160    pub fn compress_independent_frame(&mut self, input: &[u8]) -> Vec<u8> {
1161        let mut out = Vec::new();
1162        self.compress_independent_frame_into(input, &mut out);
1163        out
1164    }
1165
1166    /// Borrowed one-shot block loop: walks `input` in `MAX_BLOCK_SIZE`
1167    /// strides (the Fast backend never pre-splits, so boundaries match the
1168    /// owned loop), scanning each block range in place against the
1169    /// borrowed window via `compress_block_encoded_borrowed` — no
1170    /// per-block `commit_space` copy. Returns `(all_blocks,
1171    /// total_uncompressed)`. Caller guarantees Fast backend + no
1172    /// dictionary; over-window inputs are fine (matches are bounded by
1173    /// `window_low` exactly as the owned evicting path).
1174    fn run_borrowed_block_loop(&mut self, input: &[u8], out: &mut Vec<u8>) -> u64 {
1175        // Blocks are appended to `out` starting here. `out` may already hold
1176        // the frame header (the one-shot compress-into-Vec path writes it
1177        // first, since the content size is known up front, and the loop
1178        // emits blocks straight after it — no separate `all_blocks` Vec and
1179        // no header+blocks copy). Output-size reads below are taken RELATIVE
1180        // to `blocks_start` so a header prefix never skews the upstream zstd split
1181        // `savings` gate (which would change block boundaries / wire output).
1182        let blocks_start = out.len();
1183        let total_uncompressed = input.len() as u64;
1184        // Empty input: emit a single empty last Raw block (mirrors the
1185        // owned loop's empty-file special case).
1186        if input.is_empty() {
1187            let header = BlockHeader {
1188                last_block: true,
1189                block_type: crate::blocks::block::BlockType::Raw,
1190                block_size: 0,
1191            };
1192            header.serialize(out);
1193            #[cfg(feature = "lsm")]
1194            self.block_decompressed_sizes.push(0);
1195            #[cfg(all(feature = "lsm", feature = "hash"))]
1196            if let Some(checksums) = self.block_checksums.as_mut() {
1197                checksums.push(xxh64_block_low32(&[]));
1198            }
1199            return total_uncompressed;
1200        }
1201        // SAFETY: `input` outlives this call (held by the caller across
1202        // the call) and is not mutated. Only the Simple backend is active
1203        // (gated by `compress_oneshot_borrowed`).
1204        unsafe {
1205            self.state.matcher.set_borrowed_window(input);
1206        }
1207        // Panic-safety: clear the borrowed `(ptr, len)` on EVERY exit,
1208        // including an unwind from an `assert!` inside the block loop, so
1209        // a caught-and-reused compressor never retains a dangling window.
1210        // (The next frame's `reset()` also clears it before any read, but
1211        // this guard makes the invariant local and unwind-proof.)
1212        struct ClearBorrowedOnDrop(*mut MatchGeneratorDriver);
1213        impl Drop for ClearBorrowedOnDrop {
1214            fn drop(&mut self) {
1215                // SAFETY: at drop (normal return or unwind) the loop's
1216                // borrows of the matcher have ended, so this is the only
1217                // access. `addr_of_mut!` produced this pointer without an
1218                // intermediate `&mut`, so the interleaved `&mut` uses in
1219                // the loop did not invalidate it.
1220                unsafe { (*self.0).clear_borrowed_window() };
1221            }
1222        }
1223        let _clear_guard = ClearBorrowedOnDrop(core::ptr::addr_of_mut!(self.state.matcher));
1224        let block_capacity = self.block_capacity();
1225        let mut start = 0usize;
1226        while start < input.len() {
1227            reserve_for_next_block(
1228                out,
1229                blocks_start,
1230                start as u64,
1231                input.len() - start,
1232                block_capacity,
1233            );
1234            // Upstream zstd `ZSTD_compress_frameChunk`: size each block via the cheap
1235            // fingerprint pre-splitter so a full 128 KiB block is cut at a
1236            // statistical boundary when it pays. `savings = consumed -
1237            // produced` mirrors the upstream zstd gate (the first block and
1238            // incompressible input keep the full 128 KiB). The borrowed window
1239            // already spans the whole input, so a smaller block is just a
1240            // narrower `(block_start, block_end)` range into it.
1241            let savings = start as i64 - (out.len() - blocks_start) as i64;
1242            // Borrowed path only: warm the pre-split window before the
1243            // cache-cold strided fingerprint read. Gated to exactly the
1244            // conditions under which `optimal_block_size` reads `block`
1245            // (a pre-split level, a full 128 KiB block remaining, the
1246            // block-size cap admits a full block, and `savings >= 3` so the
1247            // splitter actually runs) — so non-pre-split levels, the first
1248            // block, and the trailing partial block pay nothing. See
1249            // `warm_presplit_window`.
1250            if savings >= 3
1251                && input.len() - start >= MAX_BLOCK_SIZE as usize
1252                && block_capacity >= MAX_BLOCK_SIZE as usize
1253                && crate::encoding::match_generator::level_pre_split(self.compression_level)
1254                    .is_some()
1255            {
1256                warm_presplit_window(&input[start..start + MAX_BLOCK_SIZE as usize]);
1257            }
1258            let block_len = optimal_block_size(
1259                self.compression_level,
1260                &input[start..],
1261                input.len() - start,
1262                block_capacity,
1263                savings,
1264            );
1265            let end = (start + block_len).min(input.len());
1266            let block = &input[start..end];
1267            let last_block = end == input.len();
1268            #[cfg(feature = "hash")]
1269            if self.content_checksum {
1270                self.hasher.write(block);
1271            }
1272            let dict_active =
1273                self.dictionary.is_some() && self.state.matcher.supports_dictionary_priming();
1274            crate::encoding::levels::compress_block_encoded_borrowed(
1275                &mut self.state,
1276                self.compression_level,
1277                last_block,
1278                block,
1279                start,
1280                end,
1281                dict_active,
1282                out,
1283                #[cfg(feature = "lsm")]
1284                Some(&mut self.block_decompressed_sizes),
1285                #[cfg(all(feature = "lsm", feature = "hash"))]
1286                self.block_checksums.as_mut(),
1287            );
1288            start = end;
1289        }
1290        // `_clear_guard` drops here, clearing the borrowed window.
1291        total_uncompressed
1292    }
1293}
1294
1295impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
1296    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
1297    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
1298        Self {
1299            uncompressed_data: None,
1300            compressed_data: None,
1301            dictionary: None,
1302            dictionary_entropy_cache: None,
1303            source_size_hint: None,
1304            state: CompressState {
1305                matcher,
1306                last_huff_table: None,
1307                huff_table_spare: None,
1308                fse_tables: FseTables::new(),
1309                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
1310                offset_hist: [1, 4, 8],
1311                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
1312                    compression_level,
1313                ),
1314                huf_optimal_search: true,
1315            },
1316            compression_level,
1317            magicless: false,
1318            content_checksum: false,
1319            content_size_flag: true,
1320            dict_id_flag: true,
1321            target_block_size: None,
1322            #[cfg(feature = "hash")]
1323            hasher: XxHash64::with_seed(0),
1324            #[cfg(feature = "lsm")]
1325            frame_emit_info: None,
1326            #[cfg(all(feature = "lsm", feature = "hash"))]
1327            per_block_checksums_enabled: false,
1328            #[cfg(all(feature = "lsm", feature = "hash"))]
1329            block_checksums: None,
1330            #[cfg(feature = "lsm")]
1331            block_decompressed_sizes: alloc::vec::Vec::new(),
1332            strategy_override: None,
1333        }
1334    }
1335
1336    /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
1337    ///
1338    /// When set to `true`, emitted frames omit the 4-byte magic number
1339    /// prefix. The matching decoder must be configured to expect a
1340    /// magicless stream — wire-format only round-trips with a
1341    /// magicless-aware decoder.
1342    pub fn set_magicless(&mut self, magicless: bool) {
1343        self.magicless = magicless;
1344    }
1345
1346    /// Enable or disable the trailing XXH64 content checksum
1347    /// (semantics of upstream `ZSTD_c_checksumFlag`). Default `false`,
1348    /// matching the upstream library default (`ZSTD_c_checksumFlag = 0`)
1349    /// so out-of-the-box frames carry the same layout and pay the same
1350    /// costs as the reference implementation.
1351    ///
1352    /// When `false`, emitted frames set `Content_Checksum_flag = 0` and carry
1353    /// no trailing digest; such frames are valid (RFC 8878) and decode
1354    /// correctly in any [`ContentChecksum`](crate::decoding::ContentChecksum)
1355    /// mode. Without the `hash` feature no checksum is emitted regardless of
1356    /// this setting.
1357    pub fn set_content_checksum(&mut self, emit: bool) {
1358        self.content_checksum = emit;
1359    }
1360
1361    /// Enable or disable recording `Frame_Content_Size` in the frame header
1362    /// when the total size is known (semantics of upstream
1363    /// `ZSTD_c_contentSizeFlag`). Default `true`, matching upstream. With
1364    /// the flag off the header carries a window descriptor instead (and the
1365    /// single-segment layout, which requires an FCS, is disabled).
1366    pub fn set_content_size_flag(&mut self, emit: bool) {
1367        self.content_size_flag = emit;
1368    }
1369
1370    /// Enable or disable recording the dictionary ID in the frame header
1371    /// when a dictionary is attached (semantics of upstream
1372    /// `ZSTD_c_dictIDFlag`). Default `true`, matching upstream. Frames
1373    /// emitted with the flag off still decode when the decoder is handed
1374    /// the dictionary explicitly.
1375    pub fn set_dictionary_id_flag(&mut self, emit: bool) {
1376        self.dict_id_flag = emit;
1377    }
1378
1379    /// Set an upper bound on emitted block sizes (semantics of upstream
1380    /// `ZSTD_c_targetCBlockSize`): every physical block's payload is capped
1381    /// at `target` bytes (+3-byte block header on the wire), trading some
1382    /// ratio for bounded per-block latency. The value is clamped to
1383    /// `[MIN_TARGET_BLOCK_SIZE, MAX_BLOCK_SIZE]` (the upstream bounds).
1384    /// `None` removes the target.
1385    pub fn set_target_block_size(&mut self, target: Option<u32>) {
1386        self.target_block_size = target.map(|t| {
1387            t.clamp(
1388                crate::common::MIN_TARGET_BLOCK_SIZE,
1389                crate::common::MAX_BLOCK_SIZE,
1390            )
1391        });
1392    }
1393
1394    /// The active block-size cap: the configured target, or the format's
1395    /// 128 KiB block ceiling.
1396    fn block_capacity(&self) -> usize {
1397        self.target_block_size
1398            .map_or(crate::common::MAX_BLOCK_SIZE as usize, |t| t as usize)
1399    }
1400
1401    /// Before calling [FrameCompressor::compress] you need to set the source.
1402    ///
1403    /// This is the data that is compressed and written into the drain.
1404    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
1405        self.uncompressed_data.replace(uncompressed_data)
1406    }
1407
1408    /// Before calling [FrameCompressor::compress] you need to set the drain.
1409    ///
1410    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
1411    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
1412        self.compressed_data.replace(compressed_data)
1413    }
1414
1415    /// Provide a hint about the total uncompressed size for the next frame.
1416    ///
1417    /// When set, the encoder selects smaller hash tables and windows for
1418    /// small inputs, matching the C zstd source-size-class behavior.
1419    ///
1420    /// This hint applies only to frame payload bytes (`size`). Dictionary
1421    /// history is primed separately and does not inflate the hinted size or
1422    /// advertised frame window.
1423    /// Must be called before [`compress`](Self::compress).
1424    pub fn set_source_size_hint(&mut self, size: u64) {
1425        self.source_size_hint = Some(size);
1426    }
1427
1428    /// Total heap bytes this compressor's allocations hold, excluding the
1429    /// inline struct: the match-finder tables / history / recycled buffers and
1430    /// the primed-dictionary snapshot (via the matcher), the retained
1431    /// Huffman tables (active + recycled spare), the retained dictionary
1432    /// content, the cached dictionary entropy tables (literals Huffman +
1433    /// LL/ML/OF FSE), and the per-block sidecar buffers. Lets a context
1434    /// report its true footprint through `ZSTD_sizeof_CCtx`.
1435    pub fn heap_size(&self) -> usize {
1436        let mut total = self.state.matcher.heap_size();
1437        total += self
1438            .state
1439            .last_huff_table
1440            .as_ref()
1441            .map_or(0, |table| table.heap_size());
1442        total += self
1443            .state
1444            .huff_table_spare
1445            .as_ref()
1446            .map_or(0, |table| table.heap_size());
1447        total += self
1448            .dictionary
1449            .as_ref()
1450            .map_or(0, |d| d.inner.dict_content.capacity());
1451        total += self
1452            .dictionary_entropy_cache
1453            .as_ref()
1454            .map_or(0, CachedDictionaryEntropy::heap_size);
1455        #[cfg(all(feature = "lsm", feature = "hash"))]
1456        {
1457            total += self
1458                .block_checksums
1459                .as_ref()
1460                .map_or(0, |v| v.capacity() * core::mem::size_of::<u32>());
1461        }
1462        #[cfg(feature = "lsm")]
1463        {
1464            total += self.block_decompressed_sizes.capacity() * core::mem::size_of::<u32>();
1465        }
1466        total
1467    }
1468
1469    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
1470    ///
1471    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
1472    /// All compressed blocks are buffered in memory so that the frame header can include the
1473    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
1474    /// entire frame — header, blocks, and optional checksum — is then written to the drain
1475    /// at the end. This means peak memory usage is O(compressed_size).
1476    ///
1477    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
1478    /// [Read::take] function
1479    /// Per-frame setup values resolved by [`Self::prepare_frame`] and
1480    /// consumed by the block loop + [`Self::finish_frame`]. Lets the
1481    /// owned `compress()` and the borrowed one-shot path share the exact
1482    /// same reset / dict-prime / entropy-seed setup and frame tail.
1483    pub fn compress(&mut self) {
1484        let prep = self.prepare_frame();
1485        // Take the reader out so `run_owned_block_loop` can borrow it
1486        // mutably alongside `&mut self` (the rest of the loop touches
1487        // `self.state` / `self.hasher`, disjoint from the reader). Restored
1488        // before the frame tail so a reused compressor keeps its source.
1489        //
1490        // Deliberately NOT restored on unwind: if the block loop panics the
1491        // source has been partially consumed, so handing it back would let a
1492        // `catch_unwind` caller "successfully" compress the remaining tail
1493        // from an arbitrary midpoint — silent data corruption. Leaving the
1494        // slot empty makes any post-panic reuse fail loudly at the `expect`
1495        // below (matcher/entropy state is equally unre-usable after an
1496        // unwind; the reference implementation likewise requires a context
1497        // reset after an error).
1498        let mut source = self
1499            .uncompressed_data
1500            .take()
1501            .expect("source must be set via set_source before compress()");
1502        // Streaming drain: the content size is only known at EOF, so the
1503        // frame header can't precede the blocks — accumulate them in a local
1504        // buffer and let `finish_frame` write header + blocks to the drain.
1505        let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap(
1506            prep.initial_size_hint,
1507            self.block_capacity(),
1508        ));
1509        let mut block_source = ReaderBlockSource(&mut source);
1510        let total_uncompressed = self.run_owned_block_loop(
1511            &mut block_source,
1512            prep.initial_size_hint,
1513            false,
1514            &mut all_blocks,
1515        );
1516        self.uncompressed_data = Some(source);
1517        self.finish_frame(all_blocks, total_uncompressed, &prep);
1518    }
1519
1520    fn prepare_frame(&mut self) -> FramePrep {
1521        // Reset per-frame introspection state so a re-used compressor
1522        // doesn't carry over the previous frame's layout/checksums.
1523        #[cfg(feature = "lsm")]
1524        {
1525            self.frame_emit_info = None;
1526            // Always captured under lsm (drives `decompressed_byte_range`);
1527            // clear, keep the allocation for a reused compressor.
1528            self.block_decompressed_sizes.clear();
1529        }
1530        #[cfg(all(feature = "lsm", feature = "hash"))]
1531        {
1532            if self.per_block_checksums_enabled {
1533                self.block_checksums = Some(alloc::vec::Vec::new());
1534            } else {
1535                self.block_checksums = None;
1536            }
1537        }
1538        let initial_size_hint = self.source_size_hint;
1539        let source_size_hint_known = initial_size_hint.is_some();
1540        let use_dictionary_state =
1541            !matches!(self.compression_level, CompressionLevel::Uncompressed)
1542                && self.state.matcher.supports_dictionary_priming()
1543                && self.dictionary.is_some();
1544        if let Some(size_hint) = self.source_size_hint.take() {
1545            // Keep source-size hint scoped to payload bytes; dictionary priming
1546            // is applied separately and should not force larger matcher sizing.
1547            self.state.matcher.set_source_size_hint(size_hint);
1548        }
1549        // Hand the matcher the dictionary's content size so its binary-tree /
1550        // hash-chain tables shrink to the dictionary's cParams tier (upstream zstd CDict
1551        // economics: the dictionary supplies long matches, so a source-sized live
1552        // table is wasted peak memory). The eviction window stays source-sized so
1553        // the dictionary bytes remain referenceable. Set before `reset` (which
1554        // consumes it) and only when a dictionary will actually be primed.
1555        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1556            self.state
1557                .matcher
1558                .set_dictionary_size_hint(dict.inner.dict_content.len());
1559        }
1560        // Clearing buffers to allow re-using of the compressor
1561        self.state.matcher.reset(self.compression_level);
1562        self.state.offset_hist = [1, 4, 8];
1563        // Sync `state.strategy_tag` to the level resolved at this reset so
1564        // the literal-compression gates (`min_literals_to_compress` /
1565        // `min_gain` in `encoding::blocks::compressed`) see the correct
1566        // strategy for the next frame. Frame-by-frame level changes go
1567        // through this same `compress()` entry point, so re-syncing here
1568        // covers level switches without touching the matcher dispatch.
1569        // A public-parameter strategy override (#27) wins over the level's
1570        // derived tag so the literal-compression gates and dict-attach
1571        // cutoff below see the strategy the matcher actually runs.
1572        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
1573            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
1574        });
1575        // `initial_size_hint` (captured before the `.take()` above) — by here
1576        // `self.source_size_hint` is None.
1577        self.state.huf_optimal_search =
1578            fast_huf_search_enabled(self.state.strategy_tag, initial_size_hint);
1579        let cached_entropy = if use_dictionary_state {
1580            self.dictionary_entropy_cache.as_ref()
1581        } else {
1582            None
1583        };
1584        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1585            // This state drives sequence encoding, while matcher priming below updates
1586            // the match generator's internal repeat-offset history for match finding.
1587            self.state.offset_hist = dict.inner.offset_hist;
1588            // Upstream zstd `ZSTD_shouldAttachDict` (`zstd_compress.c`): a
1589            // precomputed-dictionary table is COPIED into the working context
1590            // only when the source is larger than a per-strategy cutoff; at or
1591            // below it (and for unknown size) the upstream zstd ATTACHES the dictionary
1592            // tables by reference (no per-frame table touch at all). We don't
1593            // have an attach-by-reference path yet, so:
1594            //   - large source (> cutoff): reuse the captured prime snapshot
1595            //     (a table copy) instead of re-hashing the dictionary — the
1596            //     upstream zstd COPY regime, where the copy is cheaper than re-priming;
1597            //   - small / unknown source: re-prime (the snapshot copy of the
1598            //     whole table would cost MORE than the sparse re-prime here,
1599            //     which is exactly why the upstream zstd attaches by reference instead).
1600            // `attachDictSizeCutoffs` per strategy: fast 8K, dfast 16K,
1601            // greedy/lazy/btopt 32K, btultra/btultra2 8K. Expressed as the
1602            // ceil-log bucket (8K = 2^13, 16K = 2^14, 32K = 2^15) so the
1603            // decision uses the SAME bucketed representation as the driver's
1604            // attach/copy gate (`reset_size_log`) — comparing
1605            // `source_size_ceil_log(hint)` on the full u64 avoids the `as usize`
1606            // truncation that could diverge from the driver on 32-bit targets.
1607            // For a power-of-two cutoff `2^k`, `ceil_log2(hint) > k` is exactly
1608            // `hint > 2^k`, so this is identical to the raw `hint > cutoff` on
1609            // 64-bit.
1610            let cutoff_log = match self.state.strategy_tag {
1611                // Fast always attaches now (the copy-mode owned path memmoved the
1612                // whole input into history every frame); keep the copy-snapshot
1613                // gate in sync with the matcher's attach cutoff so Fast never
1614                // captures/restores a copy snapshot it can no longer use.
1615                crate::encoding::strategy::StrategyTag::Fast => {
1616                    crate::encoding::match_generator::FAST_ATTACH_DICT_CUTOFF_LOG
1617                }
1618                crate::encoding::strategy::StrategyTag::BtUltra
1619                | crate::encoding::strategy::StrategyTag::BtUltra2 => 13,
1620                crate::encoding::strategy::StrategyTag::Dfast => 14,
1621                crate::encoding::strategy::StrategyTag::Greedy
1622                | crate::encoding::strategy::StrategyTag::Lazy
1623                | crate::encoding::strategy::StrategyTag::Btlazy2
1624                | crate::encoding::strategy::StrategyTag::BtOpt => 15,
1625            };
1626            if self.state.matcher.dictionary_is_resident() {
1627                // Re-borrow fast path: the previous frame's reset kept this
1628                // dict's bytes + cached index resident, so skip the re-commit /
1629                // re-index and only reapply the offset history.
1630                self.state
1631                    .matcher
1632                    .reapply_resident_dictionary(dict.inner.offset_hist);
1633            } else {
1634                let prefer_copy_snapshot = initial_size_hint.is_some_and(|s| {
1635                    crate::encoding::match_generator::source_size_ceil_log(s) > cutoff_log
1636                });
1637                let restored = prefer_copy_snapshot
1638                    && self
1639                        .state
1640                        .matcher
1641                        .restore_primed_dictionary(self.compression_level);
1642                if !restored {
1643                    self.state.matcher.prime_with_dictionary(
1644                        dict.inner.dict_content.as_slice(),
1645                        dict.inner.offset_hist,
1646                    );
1647                    if prefer_copy_snapshot {
1648                        self.state
1649                            .matcher
1650                            .capture_primed_dictionary(self.compression_level);
1651                    }
1652                }
1653            }
1654        }
1655        if let Some(cache) = cached_entropy {
1656            // Refill an empty slot from the recycled spare before
1657            // `clone_from`: `Option::clone_from(None ← Some)` falls back to
1658            // a fresh clone (two Vec allocations), while `Some ← Some`
1659            // delegates to the table's buffer-reusing `clone_from`. Frames
1660            // whose last block cleared the table would otherwise re-clone
1661            // the dict seed every frame.
1662            match &cache.huff {
1663                Some(src) => {
1664                    if self.state.last_huff_table.is_none() {
1665                        self.state.last_huff_table = self.state.huff_table_spare.take();
1666                    }
1667                    match &mut self.state.last_huff_table {
1668                        Some(dst) => dst.clone_from(src),
1669                        slot => *slot = Some(src.clone()),
1670                    }
1671                }
1672                None => self.state.clear_huff_table(),
1673            }
1674        } else {
1675            self.state.clear_huff_table();
1676        }
1677        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
1678        // reusing existing allocations where possible instead of reallocating every frame.
1679        if let Some(cache) = cached_entropy {
1680            self.state
1681                .fse_tables
1682                .ll_previous
1683                .clone_from(&cache.ll_previous);
1684            self.state
1685                .fse_tables
1686                .ml_previous
1687                .clone_from(&cache.ml_previous);
1688            self.state
1689                .fse_tables
1690                .of_previous
1691                .clone_from(&cache.of_previous);
1692        } else {
1693            self.state.fse_tables.ll_previous = None;
1694            self.state.fse_tables.ml_previous = None;
1695            self.state.fse_tables.of_previous = None;
1696        }
1697        let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
1698            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1699            _ => None,
1700        });
1701        let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
1702            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1703            _ => None,
1704        });
1705        let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
1706            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1707            _ => None,
1708        });
1709        self.state.matcher.seed_dictionary_entropy(
1710            self.state.last_huff_table.as_ref(),
1711            ll_entropy,
1712            ml_entropy,
1713            of_entropy,
1714        );
1715        #[cfg(feature = "hash")]
1716        {
1717            self.hasher = XxHash64::with_seed(0);
1718        }
1719        let window_size = self.state.matcher.window_size();
1720        assert!(
1721            window_size != 0,
1722            "matcher reported window_size == 0, which is invalid"
1723        );
1724        FramePrep {
1725            window_size,
1726            use_dictionary_state,
1727            source_size_hint_known,
1728            initial_size_hint,
1729        }
1730    }
1731
1732    /// Owned streaming block loop: reads blocks from the caller-provided
1733    /// `source` reader, optionally pre-splits, hashes for the content
1734    /// checksum, and emits each block via `compress_block_encoded`,
1735    /// accumulating the block bytes. Returns `(all_blocks,
1736    /// total_uncompressed)`. The source is passed in (rather than read
1737    /// from `self.uncompressed_data`) so the streaming `compress` path can
1738    /// feed the configured reader while the slice paths
1739    /// (`compress_oneshot_borrowed`, `compress_independent_frame`) feed an
1740    /// in-place `&[u8]` cursor without baking its lifetime into the
1741    /// compressor type.
1742    fn run_owned_block_loop<S: OwnedBlockSource>(
1743        &mut self,
1744        source: &mut S,
1745        initial_size_hint: Option<u64>,
1746        // Whether `initial_size_hint` is the input's exact length (the
1747        // one-shot slice paths) or a caller-provided estimate (the streaming
1748        // `Read` path, where `set_source_size_hint` is advisory). An exact
1749        // hint drives the one-shot ratio reservation; an estimate is only
1750        // trusted up to a small lookahead past the bytes actually read.
1751        hint_is_exact: bool,
1752        out: &mut Vec<u8>,
1753    ) -> u64 {
1754        // Compressed blocks are appended to `out` from its current end. The
1755        // streaming drain path passes a fresh buffer (the frame header is
1756        // written to the drain afterward, since Frame_Content_Size is only
1757        // known once the reader hits EOF); the one-shot compress-into-Vec
1758        // path passes `out` already holding the header. The upstream zstd split
1759        // `savings` gate below accumulates block-relative (`before_len`)
1760        // output deltas, so a header prefix never skews it.
1761        let blocks_start = out.len();
1762        let mut total_uncompressed: u64 = 0;
1763        let mut pending_input: Vec<u8> = Vec::new();
1764        let mut reached_eof = false;
1765        let mut savings = 0i64;
1766        // Compress block by block
1767        loop {
1768            // Read up to one upstream zstd block. When the pre-block splitter keeps a
1769            // suffix, top it back up before compressing the next block, matching
1770            // ZSTD_compress_frameChunk() over a contiguous input buffer.
1771            let block_capacity = self.block_capacity();
1772            // Always draw the block buffer from the matcher's recycled pool
1773            // (its capacity already covers the block size, so the resize below
1774            // stays in-place). Any carried pre-split suffix is copied in, and
1775            // `pending_input` is retained as a reusable carry buffer. The prior
1776            // approach `split_off`'d a fresh suffix Vec per pre-split and
1777            // `reserve_exact`-grew it to `block_capacity` every block; on a
1778            // heavily pre-split frame that churned one block-sized allocation
1779            // per split (~12 MB over ~90 splits on a 1 MiB corpus input).
1780            let mut uncompressed_data = self.state.matcher.get_next_space();
1781            uncompressed_data.clear();
1782            uncompressed_data.extend_from_slice(&pending_input);
1783            pending_input.clear();
1784            if !reached_eof {
1785                // Remaining-bytes expectation for the reader source's sizing
1786                // (`None` = unknown, or an inexact hint already met by prior
1787                // blocks). The slice source appends directly and ignores it.
1788                let size_hint_remaining = match initial_size_hint {
1789                    Some(hint) if hint > total_uncompressed => Some(hint - total_uncompressed),
1790                    _ => None,
1791                };
1792                let (appended, eof) =
1793                    source.fill_block(&mut uncompressed_data, block_capacity, size_hint_remaining);
1794                total_uncompressed += appended as u64;
1795                reached_eof = eof;
1796            }
1797            let mut last_block = reached_eof;
1798            let remaining_for_split = if reached_eof {
1799                uncompressed_data.len()
1800            } else {
1801                block_capacity
1802            };
1803            if !matches!(self.compression_level, CompressionLevel::Uncompressed)
1804                && uncompressed_data.len() == block_capacity
1805            {
1806                let block_len = optimal_block_size(
1807                    self.compression_level,
1808                    &uncompressed_data,
1809                    remaining_for_split,
1810                    block_capacity,
1811                    savings,
1812                );
1813                if block_len < uncompressed_data.len() {
1814                    // Carry the kept suffix into the reusable `pending_input`
1815                    // buffer (cleared, capacity retained) instead of allocating
1816                    // a fresh Vec via `split_off`. Next iteration copies it back
1817                    // into a pooled block buffer. The block currently being
1818                    // compressed is truncated to the chosen split length.
1819                    pending_input.clear();
1820                    pending_input.extend_from_slice(&uncompressed_data[block_len..]);
1821                    uncompressed_data.truncate(block_len);
1822                    last_block = false;
1823                }
1824            }
1825            // As we read, hash that data too (skipped when the content
1826            // checksum is disabled).
1827            #[cfg(feature = "hash")]
1828            if self.content_checksum {
1829                self.hasher.write(&uncompressed_data);
1830            }
1831            // Per-physical-block XXH64 (low 32 bits) for the optional
1832            // per-block checksum sidecar. Hashing happens INSIDE the
1833            // block emitters (RLE / Raw fast-path / Compressed /
1834            // post-split partitions), so the digests vector has
1835            // exactly one entry per physical Block_Header written to
1836            // `all_blocks` — 1:1 with `FrameEmitInfo.blocks`. See
1837            // `enable_per_block_checksums` rustdoc.
1838            // Size the output ahead of this block's emission from the ratio
1839            // observed so far (see `reserve_for_next_block`); with no usable
1840            // size hint, ensure one block's worst case and let the doubling
1841            // growth policy amortize across blocks.
1842            let emitted =
1843                total_uncompressed - uncompressed_data.len() as u64 - pending_input.len() as u64;
1844            match initial_size_hint {
1845                Some(hint) if hint >= total_uncompressed => {
1846                    // An advisory hint (streaming path) is only trusted up to
1847                    // a small lookahead past the bytes actually read: a hint
1848                    // far above the real input would otherwise reserve the
1849                    // whole phantom remainder up front.
1850                    let hint_remaining = hint - emitted;
1851                    let remaining = if hint_is_exact {
1852                        hint_remaining
1853                    } else {
1854                        let buffered = total_uncompressed - emitted;
1855                        const HINT_LOOKAHEAD: u64 = 64 * 1024;
1856                        hint_remaining.min(buffered + HINT_LOOKAHEAD)
1857                    };
1858                    reserve_for_next_block(
1859                        out,
1860                        blocks_start,
1861                        emitted,
1862                        remaining as usize,
1863                        self.block_capacity(),
1864                    );
1865                }
1866                _ => {
1867                    out.reserve(uncompressed_data.len() + 3 + 16);
1868                }
1869            }
1870            // Special handling is needed for compression of a totally empty file
1871            if uncompressed_data.is_empty() {
1872                let header = BlockHeader {
1873                    last_block: true,
1874                    block_type: crate::blocks::block::BlockType::Raw,
1875                    block_size: 0,
1876                };
1877                header.serialize(out);
1878                #[cfg(feature = "lsm")]
1879                self.block_decompressed_sizes.push(0);
1880                #[cfg(all(feature = "lsm", feature = "hash"))]
1881                if let Some(checksums) = self.block_checksums.as_mut() {
1882                    checksums.push(xxh64_block_low32(&[]));
1883                }
1884                break;
1885            }
1886
1887            match self.compression_level {
1888                CompressionLevel::Uncompressed => {
1889                    let header = BlockHeader {
1890                        last_block,
1891                        block_type: crate::blocks::block::BlockType::Raw,
1892                        block_size: uncompressed_data.len().try_into().unwrap(),
1893                    };
1894                    header.serialize(out);
1895                    #[cfg(feature = "lsm")]
1896                    self.block_decompressed_sizes
1897                        .push(uncompressed_data.len() as u32);
1898                    #[cfg(all(feature = "lsm", feature = "hash"))]
1899                    if let Some(checksums) = self.block_checksums.as_mut() {
1900                        checksums.push(xxh64_block_low32(&uncompressed_data));
1901                    }
1902                    out.extend_from_slice(&uncompressed_data);
1903                    savings +=
1904                        uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
1905                }
1906                CompressionLevel::Fastest
1907                | CompressionLevel::Default
1908                | CompressionLevel::Better
1909                | CompressionLevel::Best
1910                | CompressionLevel::Level(_) => {
1911                    let before_len = out.len();
1912                    let block_len = uncompressed_data.len();
1913                    // A primed dictionary makes "incompressible-looking"
1914                    // blocks matchable against the dict, so the raw-fast-
1915                    // path inside must be bypassed (it skips matching).
1916                    // Mirror prepare_frame's `use_dictionary_state`: a dict
1917                    // is only PRIMED (and thus matchable) when the matcher
1918                    // supports priming — a non-priming matcher ignores an
1919                    // attached dictionary, so the raw-fast-path must stay
1920                    // enabled for it. (This arm is already non-Uncompressed.)
1921                    let dict_active = self.dictionary.is_some()
1922                        && self.state.matcher.supports_dictionary_priming();
1923                    compress_block_encoded(
1924                        &mut self.state,
1925                        self.compression_level,
1926                        last_block,
1927                        uncompressed_data,
1928                        out,
1929                        dict_active,
1930                        #[cfg(feature = "lsm")]
1931                        Some(&mut self.block_decompressed_sizes),
1932                        #[cfg(all(feature = "lsm", feature = "hash"))]
1933                        self.block_checksums.as_mut(),
1934                    );
1935                    savings += block_len as i64 - (out.len() - before_len) as i64;
1936                }
1937            }
1938            if last_block && pending_input.is_empty() {
1939                break;
1940            }
1941        }
1942        total_uncompressed
1943    }
1944
1945    /// Append the frame header bytes onto `out` once the total payload size
1946    /// is known (so `Frame_Content_Size` / `single_segment` can be set).
1947    /// Appends rather than returns so the one-shot path serializes straight
1948    /// into the reused output buffer with no per-frame header `Vec`.
1949    fn append_frame_header(&self, total_uncompressed: u64, prep: &FramePrep, out: &mut Vec<u8>) {
1950        // Match the upstream zstd framing policy (`ZSTD_writeFrameHeader`):
1951        // single-segment whenever the content size is known and the whole
1952        // source fits the active window (`contentSizeFlag && windowSize >=
1953        // srcSize`). A single-segment frame REQUIRES an FCS field, so
1954        // suppressing the content size (`content_size_flag` off) forces the
1955        // windowed layout. There is no lower size bound: small payloads
1956        // benefit most, since a windowed frame cannot encode a content size
1957        // below 256 in fewer than 4 FCS bytes (the 1-byte FCS class is
1958        // single-segment-only, see `find_fcs_field_size`), whereas a
1959        // single-segment frame stores it in one byte and omits the window
1960        // descriptor. The single-segment window equals the FCS, so a block
1961        // must never reference past the content: the post-hoc raw fallback in
1962        // the block emitters guarantees any non-shrinking block is stored raw,
1963        // and genuine matches stay within the already-emitted output.
1964        // Dictionary frames qualify too (the dictionary is decoder setup
1965        // state, not part of the regenerated segment), keeping the decoder's
1966        // single-allocation path (our decoder caps reservation to
1967        // min(window, FCS) either way).
1968        let single_segment = self.content_size_flag
1969            && prep.source_size_hint_known
1970            && total_uncompressed <= prep.window_size;
1971        let header = FrameHeader {
1972            frame_content_size: self.content_size_flag.then_some(total_uncompressed),
1973            single_segment,
1974            content_checksum: cfg!(feature = "hash") && self.content_checksum,
1975            dictionary_id: if prep.use_dictionary_state && self.dict_id_flag {
1976                self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
1977            } else {
1978                None
1979            },
1980            window_size: if single_segment {
1981                None
1982            } else {
1983                Some(prep.window_size)
1984            },
1985            magicless: self.magicless,
1986        };
1987        header.serialize(out);
1988    }
1989
1990    /// Write the frame header, accumulated block bytes, and optional
1991    /// trailing content checksum to the configured drain; populate
1992    /// `frame_emit_info` (lsm). Header and blocks are written separately to
1993    /// avoid shifting `all_blocks` to prepend the header. Used by
1994    /// `compress` and `compress_oneshot_borrowed`.
1995    fn finish_frame(&mut self, all_blocks: Vec<u8>, total_uncompressed: u64, prep: &FramePrep) {
1996        let mut header_buf: Vec<u8> = Vec::with_capacity(18);
1997        self.append_frame_header(total_uncompressed, prep, &mut header_buf);
1998        // Snapshot the checksum before borrowing the drain field so the
1999        // `self.hasher` read and the `self.compressed_data` write don't
2000        // both need `&mut self` simultaneously.
2001        #[cfg(feature = "hash")]
2002        let checksum_bytes = self
2003            .content_checksum
2004            .then(|| (self.hasher.finish() as u32).to_le_bytes());
2005        let drain = self.compressed_data.as_mut().unwrap();
2006        drain.write_all(&header_buf).unwrap();
2007        drain.write_all(&all_blocks).unwrap();
2008        // With the `hash` feature AND the content checksum enabled, the header
2009        // set `Content_Checksum_flag` and the 32-bit digest is written at the
2010        // end of the frame. Disabled => no trailing bytes, flag stays 0.
2011        #[cfg(feature = "hash")]
2012        if let Some(checksum_bytes) = checksum_bytes {
2013            drain.write_all(&checksum_bytes).unwrap();
2014        }
2015        #[cfg(feature = "lsm")]
2016        {
2017            let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
2018            self.populate_frame_emit_info(header_buf.len(), &all_blocks, emit_checksum);
2019        }
2020    }
2021
2022    /// Assemble the frame (header + blocks + optional checksum) into the
2023    /// caller-provided `out` buffer, replacing its contents, and populate
2024    /// `frame_emit_info` (lsm). `out` is cleared first (its allocation is
2025    /// reused, the CCtx-equivalent zero-per-call-alloc output path) then
2026    /// grown once to the exact frame size. Used by
2027    /// `compress_independent_frame_into`. The single `all_blocks` copy into
2028    /// `out` is the same one copy `finish_frame` performs writing
2029    /// `all_blocks` into a `Vec` drain, no extra buffering vs the drain
2030    /// path.
2031    /// Walk `all_blocks` to recover per-block layout and store it in
2032    /// `frame_emit_info`. Each Block_Header is 3 bytes LE packing
2033    /// `(block_size << 3) | (block_type << 1) | last_block`. Physical body
2034    /// size differs by type: RLE bodies are always 1 byte (the repeated
2035    /// byte), Raw/Compressed bodies span `block_size`. `header_len` is the
2036    /// serialized frame-header length (frame offset of the first block).
2037    #[cfg(feature = "lsm")]
2038    fn populate_frame_emit_info(
2039        &mut self,
2040        header_len: usize,
2041        all_blocks: &[u8],
2042        emit_checksum: bool,
2043    ) {
2044        use crate::blocks::block::BlockType as BT;
2045        use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
2046        // All frame-offset arithmetic below is bounded by u32 on the wire
2047        // (Block_Size is a 21-bit field, frames bounded by MAX_BLOCK_SIZE *
2048        // #blocks). A pathologically large frame whose total emitted size
2049        // exceeds u32::MAX would overflow the cast; bail out by leaving
2050        // `frame_emit_info` at `None` rather than handing the caller a
2051        // silently-truncated layout. The overflow path is statically
2052        // unreachable on every realistic frame so the predictor amortises
2053        // the branch to zero cost.
2054        let frame_header_len: u32 = match u32::try_from(header_len) {
2055            Ok(v) => v,
2056            Err(_) => return,
2057        };
2058        let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
2059            Ok(v) => v,
2060            Err(_) => return,
2061        };
2062        let mut blocks: Vec<FrameBlock> = Vec::new();
2063        let mut cursor: usize = 0;
2064        while cursor + 3 <= all_blocks.len() {
2065            let mut header_u32 = [0u8; 4];
2066            header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
2067            let raw = u32::from_le_bytes(header_u32);
2068            let last_block = (raw & 1) != 0;
2069            let block_type = match (raw >> 1) & 0b11 {
2070                0 => BT::Raw,
2071                1 => BT::RLE,
2072                2 => BT::Compressed,
2073                _ => BT::Reserved,
2074            };
2075            let block_size_field = raw >> 3;
2076            // RLE bodies are always 1 byte physical on the wire (the single
2077            // repeated byte); the spec's Block_Size field carries the
2078            // logical repeat count. Raw and Compressed bodies physically
2079            // span block_size_field bytes. Store the physical length in
2080            // body_size so the 'offset + header + body_size' arithmetic
2081            // always lands on the next block boundary, and surface the raw
2082            // spec field separately as block_size_field.
2083            let physical_body: u32 = match block_type {
2084                BT::RLE => 1,
2085                _ => block_size_field,
2086            };
2087            let cursor_u32: u32 = match u32::try_from(cursor) {
2088                Ok(v) => v,
2089                Err(_) => return,
2090            };
2091            let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
2092                Some(v) => v,
2093                None => return,
2094            };
2095            // Decompressed (regenerated) size, captured per physical block
2096            // during emit (1:1 with the wire blocks scanned here). Raw/RLE are
2097            // wire-derivable (`block_size_field`), so a short sidecar still
2098            // yields the correct value for them. A Compressed block's size is
2099            // NOT on the wire: if the sidecar is missing its entry, fabricating
2100            // 0 would publish a silently-wrong `decompressed_byte_range`. Since
2101            // this metadata is the authoritative mapping for a successful
2102            // encode, bail out (leave `frame_emit_info` at `None`) rather than
2103            // hand back a corrupt layout; the 1:1 push invariant makes this
2104            // unreachable in practice (debug_assert catches a regression).
2105            let decompressed_size = match self.block_decompressed_sizes.get(blocks.len()).copied() {
2106                Some(size) => size,
2107                None if matches!(block_type, BT::Raw | BT::RLE) => block_size_field,
2108                None => {
2109                    debug_assert!(
2110                        false,
2111                        "missing decompressed-size sidecar entry for compressed block {}",
2112                        blocks.len()
2113                    );
2114                    return;
2115                }
2116            };
2117            blocks.push(FrameBlock {
2118                offset_in_frame,
2119                header_size: 3,
2120                body_size: physical_body,
2121                block_size_field,
2122                block_type,
2123                last_block,
2124                decompressed_size,
2125            });
2126            cursor += 3 + physical_body as usize;
2127            if last_block {
2128                break;
2129            }
2130        }
2131        // Fail closed on a structurally incomplete scan: the loop must have
2132        // consumed the whole block section AND ended on a parsed last block.
2133        // A premature `last_block` (bytes left over) or a run-off without any
2134        // last block would otherwise publish an invalid public `FrameEmitInfo`.
2135        // Unreachable for a well-formed self-produced frame (debug_assert
2136        // catches a regression); on release we bail, leaving `frame_emit_info`
2137        // at `None` rather than handing back a corrupt layout.
2138        if cursor != all_blocks.len() || !blocks.last().is_some_and(|b| b.last_block) {
2139            debug_assert!(
2140                false,
2141                "incomplete block scan in populate_frame_emit_info: cursor={} len={} last_block={:?}",
2142                cursor,
2143                all_blocks.len(),
2144                blocks.last().map(|b| b.last_block)
2145            );
2146            return;
2147        }
2148        let checksum_range = if emit_checksum {
2149            let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
2150                Some(v) => v,
2151                None => return,
2152            };
2153            let cs_end = match cs_start.checked_add(4) {
2154                Some(v) => v,
2155                None => return,
2156            };
2157            Some(cs_start..cs_end)
2158        } else {
2159            None
2160        };
2161        let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
2162            Some(v) => v,
2163            None => return,
2164        };
2165        let total_size = if checksum_range.is_some() {
2166            match body_total.checked_add(4) {
2167                Some(v) => v,
2168                None => return,
2169            }
2170        } else {
2171            body_total
2172        };
2173        self.frame_emit_info = Some(FrameEmitInfo {
2174            frame_header_range: 0..frame_header_len,
2175            blocks,
2176            checksum_range,
2177            total_size,
2178        });
2179    }
2180
2181    /// Layout of the most recently emitted frame.
2182    ///
2183    /// Returns `None` if [`compress`](Self::compress) has not been
2184    /// called yet on this compressor. After a successful `compress()`
2185    /// the returned `FrameEmitInfo` describes the frame header range,
2186    /// every emitted block's offset / size / type, and the optional
2187    /// trailing content-checksum range — all in frame-absolute byte
2188    /// offsets matching the bytes written to the drain.
2189    ///
2190    /// Behind the `lsm` Cargo feature.
2191    #[cfg(feature = "lsm")]
2192    pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
2193        self.frame_emit_info.as_ref()
2194    }
2195
2196    /// Opt in to per-block XXH64 checksum computation during
2197    /// [`compress`](Self::compress). Default off; zero cost when
2198    /// disabled. The captured digests are accessible via
2199    /// [`last_frame_block_checksums`](Self::last_frame_block_checksums).
2200    ///
2201    /// One checksum is emitted per physical FrameBlock written to
2202    /// the drain: 1:1 cardinality with
2203    /// [`last_frame_emit_info`](Self::last_frame_emit_info)'s
2204    /// `blocks` vector. On the post-split optimization path
2205    /// (Level 16-22 with large window) the per-partition decompressed
2206    /// range is hashed inside the partition loop so the digest count
2207    /// still matches the emitted block count. The decoder collects
2208    /// per-physical-block digests on the same granularity, so
2209    /// element-wise equality holds round-trip.
2210    ///
2211    /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
2212    /// primitive lives behind the `hash` feature, so this method only
2213    /// compiles when both are enabled.
2214    #[cfg(all(feature = "lsm", feature = "hash"))]
2215    pub fn enable_per_block_checksums(&mut self) {
2216        self.per_block_checksums_enabled = true;
2217    }
2218
2219    /// Per-block XXH64 (low 32 bits) digests captured during the most
2220    /// recent `compress()` call. `None` unless
2221    /// [`enable_per_block_checksums`](Self::enable_per_block_checksums)
2222    /// was called before `compress()`.
2223    ///
2224    /// Behind `all(feature = "lsm", feature = "hash")`.
2225    #[cfg(all(feature = "lsm", feature = "hash"))]
2226    pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
2227        self.block_checksums.as_deref()
2228    }
2229
2230    /// Get a mutable reference to the source
2231    pub fn source_mut(&mut self) -> Option<&mut R> {
2232        self.uncompressed_data.as_mut()
2233    }
2234
2235    /// Get a mutable reference to the drain
2236    pub fn drain_mut(&mut self) -> Option<&mut W> {
2237        self.compressed_data.as_mut()
2238    }
2239
2240    /// Get a reference to the source
2241    pub fn source(&self) -> Option<&R> {
2242        self.uncompressed_data.as_ref()
2243    }
2244
2245    /// Get a reference to the drain
2246    pub fn drain(&self) -> Option<&W> {
2247        self.compressed_data.as_ref()
2248    }
2249
2250    /// Retrieve the source
2251    pub fn take_source(&mut self) -> Option<R> {
2252        self.uncompressed_data.take()
2253    }
2254
2255    /// Retrieve the drain
2256    pub fn take_drain(&mut self) -> Option<W> {
2257        self.compressed_data.take()
2258    }
2259
2260    /// Before calling [FrameCompressor::compress] you can replace the matcher
2261    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
2262        core::mem::swap(&mut match_generator, &mut self.state.matcher);
2263        match_generator
2264    }
2265
2266    /// Before calling [FrameCompressor::compress] you can replace the compression level.
2267    ///
2268    /// This also clears any fine-grained parameter overrides installed via
2269    /// [`set_parameters`](Self::set_parameters): reverting to a bare level
2270    /// means plain level-based tuning, not the previous frame's customized
2271    /// strategy / LDM / log overrides. To keep overriding, call
2272    /// [`set_parameters`](Self::set_parameters) again with the new base level.
2273    pub fn set_compression_level(
2274        &mut self,
2275        compression_level: CompressionLevel,
2276    ) -> CompressionLevel {
2277        let old = self.compression_level;
2278        self.compression_level = compression_level;
2279        // Drop sticky overrides so the level switch yields plain geometry.
2280        self.strategy_override = None;
2281        self.state.matcher.clear_param_overrides();
2282        old
2283    }
2284
2285    /// Get the current compression level
2286    pub fn compression_level(&self) -> CompressionLevel {
2287        self.compression_level
2288    }
2289
2290    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
2291    ///
2292    /// In compressed modes, the dictionary id is written only when the active
2293    /// matcher supports dictionary priming.
2294    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
2295    /// at encode time.
2296    pub fn set_dictionary(
2297        &mut self,
2298        dictionary: crate::decoding::Dictionary,
2299    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2300        self.attach_dictionary(EncoderDictionary::from_dictionary(dictionary))
2301    }
2302
2303    /// Parse and attach a serialized dictionary blob.
2304    ///
2305    /// Parses with the encoder-only path (skips the FSE/HUF decode lookup-table
2306    /// build the encoder never reads); the entropy ENCODER tables — and thus
2307    /// the emitted frame — are identical to a full parse.
2308    pub fn set_dictionary_from_bytes(
2309        &mut self,
2310        raw_dictionary: &[u8],
2311    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2312        self.attach_dictionary(EncoderDictionary::from_bytes(raw_dictionary)?)
2313    }
2314
2315    /// Attach an already-parsed [`EncoderDictionary`] without reparsing a raw
2316    /// blob.
2317    ///
2318    /// Accepts an `EncoderDictionary` produced once via
2319    /// [`EncoderDictionary::from_bytes`] / [`EncoderDictionary::from_dictionary`]
2320    /// or handed back by [`Self::clear_dictionary`] / the `set_dictionary*`
2321    /// return value, so callers can reattach or reuse a prepared dictionary
2322    /// across compressions without re-running the dictionary parse each time.
2323    /// Returns the previously-attached dictionary, if any.
2324    pub fn set_encoder_dictionary(
2325        &mut self,
2326        dictionary: EncoderDictionary,
2327    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2328        self.attach_dictionary(dictionary)
2329    }
2330
2331    /// Remove the attached dictionary, returning it as an [`EncoderDictionary`].
2332    pub fn clear_dictionary(&mut self) -> Option<EncoderDictionary> {
2333        self.dictionary_entropy_cache = None;
2334        // Drop the CDict prime snapshot — it is keyed to the dictionary
2335        // being removed and must not be restored against a different (or no)
2336        // dictionary on the next frame.
2337        self.state.matcher.invalidate_primed_dictionary();
2338        self.dictionary.take()
2339    }
2340
2341    /// Validate `enc`, build the encoder entropy cache from it, store it, and
2342    /// return the previously-attached dictionary. Shared by every public
2343    /// attach entry point: `set_dictionary`, `set_dictionary_from_bytes`, and
2344    /// `set_encoder_dictionary`.
2345    fn attach_dictionary(
2346        &mut self,
2347        enc: EncoderDictionary,
2348    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2349        let dictionary = &enc.inner;
2350        if dictionary.id == 0 {
2351            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
2352        }
2353        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
2354            return Err(
2355                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
2356                    index: index as u8,
2357                },
2358            );
2359        }
2360        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(dictionary));
2361        // A previously-captured CDict prime snapshot belongs to the OLD
2362        // dictionary; drop it so the first frame with the new dictionary
2363        // re-primes (and re-captures) instead of restoring stale tables.
2364        self.state.matcher.invalidate_primed_dictionary();
2365        Ok(self.dictionary.replace(enc))
2366    }
2367}
2368
2369#[cfg(test)]
2370mod tests {
2371    // `format!` is used by ungated tests (e.g. the btlazy2 dict-reuse
2372    // byte-identity test), so the import must not be feature-gated — under
2373    // default features (no `dict_builder`) the gated form left `format!`
2374    // unresolved when the test module is compiled.
2375    use alloc::format;
2376    use alloc::vec;
2377
2378    use super::FrameCompressor;
2379    use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
2380    use crate::decoding::FrameDecoder;
2381    use crate::encoding::{Matcher, Sequence};
2382    use alloc::vec::Vec;
2383
2384    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
2385        let mut state = seed;
2386        let mut data = Vec::with_capacity(len);
2387        for _ in 0..len {
2388            state = state
2389                .wrapping_mul(6364136223846793005)
2390                .wrapping_add(1442695040888963407);
2391            data.push((state >> 33) as u8);
2392        }
2393        data
2394    }
2395
2396    // Cross-implementation parity tests (compress here, decode through the C
2397    // bindings) moved to `ffi-bench/tests/frame_compressor_ffi.rs` so the
2398    // library crate never links libzstd.
2399
2400    struct NoDictionaryMatcher {
2401        last_space: Vec<u8>,
2402        window_size: u64,
2403    }
2404
2405    impl NoDictionaryMatcher {
2406        fn new(window_size: u64) -> Self {
2407            Self {
2408                last_space: Vec::new(),
2409                window_size,
2410            }
2411        }
2412    }
2413
2414    impl Matcher for NoDictionaryMatcher {
2415        fn get_next_space(&mut self) -> Vec<u8> {
2416            vec![0; self.window_size as usize]
2417        }
2418
2419        fn get_last_space(&mut self) -> &[u8] {
2420            self.last_space.as_slice()
2421        }
2422
2423        fn commit_space(&mut self, space: Vec<u8>) {
2424            self.last_space = space;
2425        }
2426
2427        fn skip_matching(&mut self) {}
2428
2429        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
2430            handle_sequence(Sequence::Literals {
2431                literals: self.last_space.as_slice(),
2432            });
2433        }
2434
2435        fn reset(&mut self, _level: super::CompressionLevel) {
2436            self.last_space.clear();
2437        }
2438
2439        fn window_size(&self) -> u64 {
2440            self.window_size
2441        }
2442    }
2443
2444    #[test]
2445    fn frame_starts_with_magic_num() {
2446        let mock_data = [1_u8, 2, 3].as_slice();
2447        let mut output: Vec<u8> = Vec::new();
2448        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2449        compressor.set_source(mock_data);
2450        compressor.set_drain(&mut output);
2451
2452        compressor.compress();
2453        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
2454    }
2455
2456    #[test]
2457    fn very_simple_raw_compress() {
2458        let mock_data = [1_u8, 2, 3].as_slice();
2459        let mut output: Vec<u8> = Vec::new();
2460        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2461        compressor.set_source(mock_data);
2462        compressor.set_drain(&mut output);
2463
2464        compressor.compress();
2465    }
2466
2467    #[test]
2468    fn very_simple_compress() {
2469        let mut mock_data = vec![0; 1 << 17];
2470        mock_data.extend(vec![1; (1 << 17) - 1]);
2471        mock_data.extend(vec![2; (1 << 18) - 1]);
2472        mock_data.extend(vec![2; 1 << 17]);
2473        mock_data.extend(vec![3; (1 << 17) - 1]);
2474        let mut output: Vec<u8> = Vec::new();
2475        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2476        compressor.set_source(mock_data.as_slice());
2477        compressor.set_drain(&mut output);
2478
2479        compressor.compress();
2480
2481        let mut decoder = FrameDecoder::new();
2482        let mut decoded = Vec::with_capacity(mock_data.len());
2483        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2484        assert_eq!(mock_data, decoded);
2485    }
2486
2487    #[test]
2488    fn rle_compress() {
2489        let mock_data = vec![0; 1 << 19];
2490        let mut output: Vec<u8> = Vec::new();
2491        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2492        compressor.set_source(mock_data.as_slice());
2493        compressor.set_drain(&mut output);
2494
2495        compressor.compress();
2496
2497        let mut decoder = FrameDecoder::new();
2498        let mut decoded = Vec::with_capacity(mock_data.len());
2499        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2500        assert_eq!(mock_data, decoded);
2501    }
2502
2503    #[test]
2504    fn aaa_compress() {
2505        let mock_data = vec![0, 1, 3, 4, 5];
2506        let mut output: Vec<u8> = Vec::new();
2507        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2508        compressor.set_source(mock_data.as_slice());
2509        compressor.set_drain(&mut output);
2510
2511        compressor.compress();
2512
2513        let mut decoder = FrameDecoder::new();
2514        let mut decoded = Vec::with_capacity(mock_data.len());
2515        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2516        assert_eq!(mock_data, decoded);
2517    }
2518
2519    #[test]
2520    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
2521        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2522        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2523        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2524
2525        let mut data = Vec::new();
2526        for _ in 0..8 {
2527            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
2528        }
2529
2530        let mut with_dict = Vec::new();
2531        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2532        let previous = compressor
2533            .set_dictionary_from_bytes(dict_raw)
2534            .expect("dictionary bytes should parse");
2535        assert!(
2536            previous.is_none(),
2537            "first dictionary insert should return None"
2538        );
2539        assert_eq!(
2540            compressor
2541                .set_dictionary(dict_for_encoder)
2542                .expect("valid dictionary should attach")
2543                .expect("set_dictionary_from_bytes inserted previous dictionary")
2544                .id(),
2545            dict_for_decoder.id
2546        );
2547        compressor.set_source(data.as_slice());
2548        compressor.set_drain(&mut with_dict);
2549        compressor.compress();
2550
2551        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2552            .expect("encoded stream should have a frame header");
2553        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
2554
2555        let mut decoder = FrameDecoder::new();
2556        let mut missing_dict_target = Vec::with_capacity(data.len());
2557        let err = decoder
2558            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
2559            .unwrap_err();
2560        assert!(
2561            matches!(
2562                &err,
2563                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
2564            ),
2565            "dict-compressed stream should require dictionary id, got: {err:?}"
2566        );
2567
2568        let mut decoder = FrameDecoder::new();
2569        decoder.add_dict(dict_for_decoder).unwrap();
2570        let mut decoded = Vec::with_capacity(data.len());
2571        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2572        assert_eq!(decoded, data);
2573    }
2574
2575    #[cfg(all(feature = "dict_builder", feature = "std"))]
2576    #[test]
2577    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
2578        use std::io::Cursor;
2579
2580        let mut training = Vec::new();
2581        for idx in 0..256u32 {
2582            training.extend_from_slice(
2583                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
2584            );
2585        }
2586        let mut raw_dict = Vec::new();
2587        crate::dictionary::create_raw_dict_from_source(
2588            Cursor::new(training.as_slice()),
2589            training.len(),
2590            &mut raw_dict,
2591            4096,
2592        )
2593        .expect("dict_builder training should succeed");
2594        assert!(
2595            !raw_dict.is_empty(),
2596            "dict_builder produced an empty dictionary"
2597        );
2598
2599        let dict_id = 0xD1C7_0008;
2600        let encoder_dict =
2601            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2602        let decoder_dict =
2603            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2604
2605        let mut payload = Vec::new();
2606        for idx in 0..96u32 {
2607            payload.extend_from_slice(
2608                format!(
2609                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
2610                )
2611                .as_bytes(),
2612            );
2613        }
2614
2615        let mut without_dict = Vec::new();
2616        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
2617        baseline.set_source(payload.as_slice());
2618        baseline.set_drain(&mut without_dict);
2619        baseline.compress();
2620
2621        let mut with_dict = Vec::new();
2622        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2623        compressor
2624            .set_dictionary(encoder_dict)
2625            .expect("valid dict_builder dictionary should attach");
2626        compressor.set_source(payload.as_slice());
2627        compressor.set_drain(&mut with_dict);
2628        compressor.compress();
2629
2630        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2631            .expect("encoded stream should have a frame header");
2632        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
2633        let mut decoder = FrameDecoder::new();
2634        decoder.add_dict(decoder_dict).unwrap();
2635        let mut decoded = Vec::with_capacity(payload.len());
2636        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2637        assert_eq!(decoded, payload);
2638        assert!(
2639            with_dict.len() < without_dict.len(),
2640            "trained dictionary should improve compression for this small payload"
2641        );
2642    }
2643
2644    #[test]
2645    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
2646        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2647        let mut output = Vec::new();
2648        let input = b"";
2649
2650        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2651        let previous = compressor
2652            .set_dictionary_from_bytes(dict_raw)
2653            .expect("dictionary bytes should parse");
2654        assert!(previous.is_none());
2655
2656        compressor.set_source(input.as_slice());
2657        compressor.set_drain(&mut output);
2658        compressor.compress();
2659
2660        assert!(
2661            compressor.state.last_huff_table.is_some(),
2662            "dictionary entropy should seed previous huffman table before first block"
2663        );
2664        assert!(
2665            compressor.state.fse_tables.ll_previous.is_some(),
2666            "dictionary entropy should seed previous ll table before first block"
2667        );
2668        assert!(
2669            compressor.state.fse_tables.ml_previous.is_some(),
2670            "dictionary entropy should seed previous ml table before first block"
2671        );
2672        assert!(
2673            compressor.state.fse_tables.of_previous.is_some(),
2674            "dictionary entropy should seed previous of table before first block"
2675        );
2676    }
2677
2678    // `set_content_size_flag(false)`: the header must omit the FCS field
2679    // (and the single-segment layout that requires it) while the frame
2680    // still round-trips through our decoder.
2681    #[test]
2682    fn content_size_flag_off_omits_fcs_and_roundtrips() {
2683        let payload = alloc::vec![0x42u8; 4096];
2684
2685        let mut compressor: FrameCompressor =
2686            FrameCompressor::new(super::CompressionLevel::Fastest);
2687        let mut with_fcs = Vec::new();
2688        compressor.compress_independent_frame_into(&payload, &mut with_fcs);
2689
2690        compressor.set_content_size_flag(false);
2691        let mut without_fcs = Vec::new();
2692        compressor.compress_independent_frame_into(&payload, &mut without_fcs);
2693
2694        let parsed_with = crate::decoding::frame::read_frame_header(with_fcs.as_slice())
2695            .expect("flag-on frame header must parse")
2696            .0;
2697        assert_eq!(parsed_with.frame_content_size(), 4096);
2698
2699        let parsed_without = crate::decoding::frame::read_frame_header(without_fcs.as_slice())
2700            .expect("flag-off frame header must parse")
2701            .0;
2702        // 0 is the decoder's "unknown content size" sentinel...
2703        assert_eq!(
2704            parsed_without.frame_content_size(),
2705            0,
2706            "FCS must be omitted with the content-size flag off"
2707        );
2708        // ...and the descriptor must confirm the field is ABSENT (0 bytes),
2709        // not present with an explicit zero value.
2710        assert_eq!(
2711            parsed_without
2712                .descriptor
2713                .frame_content_size_bytes()
2714                .expect("descriptor must parse"),
2715            0,
2716            "the FCS field itself must be omitted, not written as zero"
2717        );
2718
2719        let mut decoder = crate::decoding::FrameDecoder::new();
2720        // `decode_all_to_vec` fills existing capacity (no FCS to pre-size
2721        // from with the flag off), so reserve the expected payload upfront.
2722        let mut decoded = Vec::with_capacity(payload.len() + 64);
2723        decoder
2724            .decode_all_to_vec(&without_fcs, &mut decoded)
2725            .expect("flag-off frame must decode");
2726        assert_eq!(decoded, payload);
2727    }
2728
2729    // `set_dictionary_id_flag(false)`: a dict-compressed frame must omit
2730    // the dictionary ID and still decode when the dictionary is handed to
2731    // the decoder explicitly.
2732    #[test]
2733    fn dict_id_flag_off_omits_dictionary_id_and_roundtrips() {
2734        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2735        let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(8);
2736
2737        let mut compressor: FrameCompressor =
2738            FrameCompressor::new(super::CompressionLevel::Fastest);
2739        compressor
2740            .set_dictionary_from_bytes(dict_raw)
2741            .expect("dictionary bytes should parse");
2742        compressor.set_dictionary_id_flag(false);
2743        let mut frame = Vec::new();
2744        compressor.compress_independent_frame_into(&payload, &mut frame);
2745
2746        let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
2747            .expect("frame header must parse")
2748            .0;
2749        assert_eq!(
2750            parsed.dictionary_id(),
2751            None,
2752            "dictionary id must be omitted with the dict-id flag off"
2753        );
2754
2755        // With the ID omitted the decoder cannot look the dictionary up by
2756        // header; hand it explicitly (the `reset_with_dict_handle` path).
2757        let mut sd = crate::decoding::StreamingDecoder::new_with_dictionary_bytes(
2758            frame.as_slice(),
2759            dict_raw,
2760        )
2761        .expect("decoder must accept the dictionary");
2762        let mut dec = Vec::new();
2763        std::io::Read::read_to_end(&mut sd, &mut dec)
2764            .expect("frame must decode with the dictionary handed explicitly");
2765        assert_eq!(dec, payload);
2766    }
2767
2768    // The output reservation must track the observed compression ratio, not
2769    // the whole-input `compress_bound`: a multi-MiB compressible stream's
2770    // output buffer stays at output scale (the old up-front bound held an
2771    // input-sized allocation for the whole frame). Incompressible input may
2772    // still re-estimate to ~the full bound — that is the genuine worst case.
2773    #[test]
2774    fn compressible_stream_output_capacity_stays_at_output_scale() {
2775        // 4 MiB of highly repetitive log-like lines.
2776        let line = b"ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo\n";
2777        let mut input = Vec::with_capacity(4 << 20);
2778        while input.len() < (4 << 20) {
2779            let take = line.len().min((4 << 20) - input.len());
2780            input.extend_from_slice(&line[..take]);
2781        }
2782
2783        let mut compressor: FrameCompressor =
2784            FrameCompressor::new(super::CompressionLevel::Fastest);
2785        let mut out = Vec::new();
2786        compressor.compress_independent_frame_into(&input, &mut out);
2787
2788        assert!(!out.is_empty());
2789        assert!(
2790            out.capacity() < input.len() / 4,
2791            "capacity {} must stay at output scale (input {}, output {})",
2792            out.capacity(),
2793            input.len(),
2794            out.len()
2795        );
2796
2797        // Round-trip: the adaptive reservation must not affect the bytes.
2798        let mut decoder = crate::decoding::FrameDecoder::new();
2799        let mut decoded = Vec::with_capacity(input.len() + 64);
2800        decoder
2801            .decode_all_to_vec(&out, &mut decoded)
2802            .expect("frame must decode");
2803        assert_eq!(decoded, input);
2804    }
2805
2806    // A dictionary frame with a known content size that fits the window
2807    // must take the single-segment layout (reference parity): the
2808    // dictionary is decoder setup state, not part of the regenerated
2809    // segment, so it must not force the windowed multi-segment layout.
2810    #[test]
2811    fn dict_frame_with_known_size_is_single_segment() {
2812        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2813        let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(64);
2814
2815        let mut compressor: FrameCompressor =
2816            FrameCompressor::new(super::CompressionLevel::Fastest);
2817        compressor
2818            .set_dictionary_from_bytes(dict_raw)
2819            .expect("dictionary bytes should parse");
2820        let mut frame = Vec::new();
2821        compressor.compress_independent_frame_into(&payload, &mut frame);
2822
2823        let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
2824            .expect("frame header must parse")
2825            .0;
2826        assert!(
2827            parsed.descriptor.single_segment_flag(),
2828            "dict frame with known size <= window must be single-segment"
2829        );
2830        assert!(parsed.dictionary_id().is_some());
2831        assert_eq!(parsed.frame_content_size(), payload.len() as u64);
2832
2833        // Round-trip through our own decoder with the dictionary.
2834        let mut decoder = crate::decoding::FrameDecoder::new();
2835        decoder
2836            .add_dict_from_bytes(dict_raw)
2837            .expect("decoder must accept the dictionary");
2838        let mut decoded = Vec::with_capacity(payload.len() + 64);
2839        decoder
2840            .decode_all_to_vec(&frame, &mut decoded)
2841            .expect("single-segment dict frame must decode");
2842        assert_eq!(decoded, payload);
2843    }
2844
2845    // Regression test: `heap_size()` must count the retained Huffman tables
2846    // (the active `last_huff_table` and the recycled `huff_table_spare`).
2847    // A reused context that parks a table would otherwise under-report its
2848    // footprint through the public size API.
2849    #[test]
2850    fn heap_size_counts_active_and_spare_huffman_tables() {
2851        let mut compressor: FrameCompressor =
2852            FrameCompressor::new(super::CompressionLevel::Fastest);
2853        let base = compressor.heap_size();
2854
2855        let active = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
2856            b"abacabadabacabaeabacabadabacaba",
2857        );
2858        let active_bytes = active.heap_size();
2859        assert!(active_bytes > 0, "built table must own heap buffers");
2860        compressor.state.last_huff_table = Some(active);
2861        assert_eq!(
2862            compressor.heap_size(),
2863            base + active_bytes,
2864            "heap_size must include the active last_huff_table"
2865        );
2866
2867        let spare = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
2868            b"the quick brown fox jumps over the lazy dog",
2869        );
2870        let spare_bytes = spare.heap_size();
2871        assert!(spare_bytes > 0, "built table must own heap buffers");
2872        compressor.state.huff_table_spare = Some(spare);
2873        assert_eq!(
2874            compressor.heap_size(),
2875            base + active_bytes + spare_bytes,
2876            "heap_size must include the parked huff_table_spare"
2877        );
2878    }
2879
2880    #[test]
2881    fn set_encoder_dictionary_reattaches_prepared_dict_without_reparse() {
2882        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2883        let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
2884              tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
2885
2886        // Prepare the EncoderDictionary once, then attach it via the prepared-
2887        // dictionary API (no raw-blob reparse at attach time).
2888        let prepared =
2889            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
2890        let dict_id = prepared.id();
2891
2892        let mut with_dict = Vec::new();
2893        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2894        let previous = compressor
2895            .set_encoder_dictionary(prepared)
2896            .expect("prepared dictionary should attach");
2897        assert!(previous.is_none());
2898        compressor.set_source(payload.as_slice());
2899        compressor.set_drain(&mut with_dict);
2900        compressor.compress();
2901        // clear_dictionary hands the prepared dictionary back (last use of
2902        // `compressor`, so its `&mut with_dict` drain borrow ends here).
2903        let returned = compressor
2904            .clear_dictionary()
2905            .expect("dictionary was attached");
2906        assert_eq!(returned.id(), dict_id);
2907
2908        // The reattached dictionary drives the frame: its id is advertised and
2909        // the stream round-trips through a decoder primed with the same dict.
2910        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2911            .expect("encoded stream should have a frame header");
2912        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
2913        let decoder_dict = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2914        let mut decoder = FrameDecoder::new();
2915        decoder.add_dict(decoder_dict).unwrap();
2916        let mut decoded = Vec::with_capacity(payload.len());
2917        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2918        assert_eq!(decoded.as_slice(), payload.as_slice());
2919
2920        // The dictionary handed back by clear_dictionary reattaches to another
2921        // compressor without touching the raw bytes again, producing an
2922        // identical frame.
2923        let mut with_dict2 = Vec::new();
2924        let mut compressor2 = FrameCompressor::new(super::CompressionLevel::Fastest);
2925        compressor2
2926            .set_encoder_dictionary(returned)
2927            .expect("returned dictionary should reattach");
2928        compressor2.set_source(payload.as_slice());
2929        compressor2.set_drain(&mut with_dict2);
2930        compressor2.compress();
2931        assert_eq!(
2932            with_dict2, with_dict,
2933            "reattached prepared dict must produce an identical frame"
2934        );
2935    }
2936
2937    #[test]
2938    fn dict_primed_matcher_snapshot_reused_across_frames_is_byte_identical() {
2939        // CDict-equivalent: a compressor reused across frames with the same
2940        // dictionary restores the primed matcher snapshot on frames 2..N
2941        // (a table copy) instead of re-hashing the dictionary. The restored
2942        // state must reproduce the first-frame (freshly-primed) output
2943        // byte-for-byte, and every frame must round-trip through a
2944        // dict-primed decoder.
2945        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2946        // Source must exceed the Fast strategy's 8 KiB attach cutoff so the
2947        // copy-snapshot (restore) path is taken on frame 2 — at or below the
2948        // cutoff the upstream zstd attaches by reference and we fall back to re-prime,
2949        // which would not exercise restore.
2950        let mut payload = Vec::new();
2951        while payload.len() < 16 * 1024 {
2952            payload.extend_from_slice(
2953                b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n",
2954            );
2955        }
2956
2957        let prepared =
2958            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
2959        let dict_id = prepared.id();
2960        let mut compressor: FrameCompressor =
2961            FrameCompressor::new(super::CompressionLevel::Fastest);
2962        compressor
2963            .set_encoder_dictionary(prepared)
2964            .expect("prepared dictionary should attach");
2965
2966        // Frame 1 primes + captures the snapshot; frame 2 restores it.
2967        let frame1 = compressor.compress_independent_frame(payload.as_slice());
2968        let frame2 = compressor.compress_independent_frame(payload.as_slice());
2969        assert_eq!(
2970            frame1, frame2,
2971            "restored prime snapshot must reproduce the freshly-primed frame byte-for-byte"
2972        );
2973
2974        // Both frames advertise the dict id and round-trip through a
2975        // dict-primed decoder.
2976        for frame in [&frame1, &frame2] {
2977            let (hdr, _) =
2978                crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
2979            assert_eq!(hdr.dictionary_id(), Some(dict_id));
2980            let mut decoder = FrameDecoder::new();
2981            decoder
2982                .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
2983                .unwrap();
2984            let mut decoded = Vec::with_capacity(payload.len());
2985            decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
2986            assert_eq!(decoded.as_slice(), payload.as_slice());
2987        }
2988    }
2989
2990    #[test]
2991    fn dict_primed_matcher_cache_reused_across_small_attach_frames_is_byte_identical() {
2992        // CDict-equivalent ATTACH path (small source, at/below the Fast 8 KiB
2993        // attach cutoff): frames 2..N re-prime — re-committing the dict bytes
2994        // to history — but reuse the already-built dict table instead of
2995        // re-hashing it. The cached-table frame must reproduce the
2996        // freshly-primed first frame byte-for-byte, and a fresh single-frame
2997        // compressor (no prior dict cache) must produce the identical bytes
2998        // too, proving the cache changes timing, not output.
2999        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3000        // Stay under the 8 KiB cutoff so the attach (re-prime) path is taken
3001        // every frame rather than the copy-snapshot restore.
3002        let mut payload = Vec::new();
3003        while payload.len() < 2 * 1024 {
3004            payload.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3005        }
3006
3007        let prepared =
3008            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3009        let dict_id = prepared.id();
3010        let mut compressor: FrameCompressor =
3011            FrameCompressor::new(super::CompressionLevel::Fastest);
3012        compressor
3013            .set_encoder_dictionary(prepared)
3014            .expect("prepared dictionary should attach");
3015
3016        // Frame 1 builds + marks the dict table; frame 2 reuses it.
3017        let frame1 = compressor.compress_independent_frame(payload.as_slice());
3018        let frame2 = compressor.compress_independent_frame(payload.as_slice());
3019        assert_eq!(
3020            frame1, frame2,
3021            "reused dict table (attach path) must reproduce the freshly-built frame byte-for-byte"
3022        );
3023
3024        // A fresh compressor (cold dict cache) must emit the same bytes — the
3025        // cache is a timing optimization, never a content change.
3026        let fresh_prepared =
3027            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3028        let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3029        fresh
3030            .set_encoder_dictionary(fresh_prepared)
3031            .expect("prepared dictionary should attach");
3032        let fresh_frame = fresh.compress_independent_frame(payload.as_slice());
3033        assert_eq!(
3034            fresh_frame, frame1,
3035            "cold-cache compressor must match the warm-cache frame byte-for-byte"
3036        );
3037
3038        for frame in [&frame1, &frame2] {
3039            let (hdr, _) =
3040                crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3041            assert_eq!(hdr.dictionary_id(), Some(dict_id));
3042            let mut decoder = FrameDecoder::new();
3043            decoder
3044                .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3045                .unwrap();
3046            let mut decoded = Vec::with_capacity(payload.len());
3047            decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3048            assert_eq!(decoded.as_slice(), payload.as_slice());
3049        }
3050    }
3051
3052    #[test]
3053    fn dict_reused_across_many_lazy_frames_stays_applied() {
3054        // Regression: a reused HashChain-backed dictionary frame (lazy levels)
3055        // re-primes the dict at the live `history_abs_start` every frame. The
3056        // floor-advance reset let that base climb frame-over-frame until the
3057        // freshly-primed dict region dropped below `window_low`, after which
3058        // every dict match was silently lost and the output ballooned to the
3059        // no-dict size (observed at frame 3-4 of a reused compressor). Drive
3060        // many frames and require every one to stay byte-identical to the
3061        // first — the dict must keep applying, not decay after a few frames.
3062        // Multiple distinct lines so the dictionary is load-bearing: without it
3063        // frame 0 must emit each distinct line as literals; with it those lines
3064        // match the primed dict immediately. A single repeated line matches
3065        // in-frame regardless and would hide the regression.
3066        let lines: &[&[u8]] = &[
3067            b"ts=2026 level=INFO msg=\"flush memtable\" tenant=demo table=orders\n",
3068            b"ts=2026 level=INFO msg=\"rotate segment\" tenant=demo table=orders\n",
3069            b"ts=2026 level=INFO msg=\"compact level\" tenant=demo table=orders\n",
3070            b"ts=2026 level=INFO msg=\"write block\" tenant=demo table=orders\n",
3071        ];
3072        let fill = |n: usize| -> Vec<u8> {
3073            let mut b = Vec::with_capacity(n);
3074            while b.len() < n {
3075                for l in lines {
3076                    if b.len() >= n {
3077                        break;
3078                    }
3079                    let take = (n - b.len()).min(l.len());
3080                    b.extend_from_slice(&l[..take]);
3081                }
3082            }
3083            b
3084        };
3085        let dict = fill(8 * 1024);
3086        let payload = fill(16 * 1024);
3087        let dict_obj =
3088            crate::decoding::Dictionary::from_raw_content(1, dict).expect("raw dict should build");
3089
3090        let mut compressor: FrameCompressor =
3091            FrameCompressor::new(super::CompressionLevel::Level(6));
3092        compressor.set_dictionary_id_flag(false);
3093        compressor
3094            .set_dictionary(dict_obj)
3095            .expect("dict should attach");
3096
3097        let first = compressor.compress_independent_frame(payload.as_slice());
3098
3099        // No-dict baseline at the same level: the dictionary must be
3100        // load-bearing, so the dict-applied frame has to beat it. Without this
3101        // anchor the equal-length loop below would also pass if EVERY frame
3102        // decayed to the no-dict size in lockstep.
3103        let mut nodict: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(6));
3104        let no_dict_frame = nodict.compress_independent_frame(payload.as_slice());
3105        assert!(
3106            first.len() < no_dict_frame.len(),
3107            "dict must be load-bearing: dict frame {} should beat the no-dict baseline {}",
3108            first.len(),
3109            no_dict_frame.len(),
3110        );
3111
3112        for i in 1..16 {
3113            let frame = compressor.compress_independent_frame(payload.as_slice());
3114            // Byte-identity, not just equal length: a same-size divergence (e.g.
3115            // a different match decision once the resident dict bookkeeping
3116            // drifts) would slip past a length-only check.
3117            assert_eq!(
3118                frame, first,
3119                "frame {i} of a reused dict compressor must stay byte-identical to \
3120                 the first (dict still applied, no decay or bookkeeping drift)"
3121            );
3122        }
3123    }
3124
3125    #[test]
3126    fn dict_fast_epoch_reset_many_frames_and_attach_copy_alternation_byte_identical() {
3127        // The Fast attach path invalidates the main hash table between
3128        // frames with an epoch-bias advance instead of a memset. Two things
3129        // need proving against a fresh-compressor reference:
3130        // 1. the bias accumulates across MANY reused frames without ever
3131        //    letting a stale entry through (every frame byte-identical);
3132        // 2. crossing the 8 KiB attach/copy cutoff in both directions
3133        //    (attach → copy clears the bias for the raw-slice kernel,
3134        //    copy → attach re-enters epoch mode) stays byte-identical.
3135        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3136        let mut small = Vec::new();
3137        while small.len() < 2 * 1024 {
3138            small.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3139        }
3140        // Over the Fast 8 KiB attach cutoff → copy-mode frame.
3141        let mut large = Vec::new();
3142        while large.len() < 64 * 1024 {
3143            large.extend_from_slice(b"tenant=demo op=scan range=[k0,k9) limit=500 order=asc\n");
3144        }
3145
3146        let mut reused: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3147        reused
3148            .set_encoder_dictionary(
3149                super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse"),
3150            )
3151            .expect("prepared dictionary should attach");
3152
3153        let reference = |payload: &[u8]| -> alloc::vec::Vec<u8> {
3154            let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3155            fresh
3156                .set_encoder_dictionary(
3157                    super::EncoderDictionary::from_bytes(dict_raw)
3158                        .expect("dict bytes should parse"),
3159                )
3160                .expect("prepared dictionary should attach");
3161            fresh.compress_independent_frame(payload)
3162        };
3163
3164        let small_expected = reference(&small);
3165        let large_expected = reference(&large);
3166
3167        // 1. Long attach-only run: every frame advances the epoch bias.
3168        for i in 0..32 {
3169            let frame = reused.compress_independent_frame(small.as_slice());
3170            assert_eq!(
3171                frame, small_expected,
3172                "attach frame {i} diverged from the fresh-compressor reference"
3173            );
3174        }
3175        // 2. Cutoff alternation: attach → copy → attach → copy.
3176        for i in 0..4 {
3177            let frame = reused.compress_independent_frame(large.as_slice());
3178            assert_eq!(
3179                frame, large_expected,
3180                "copy frame {i} diverged from the fresh-compressor reference"
3181            );
3182            let frame = reused.compress_independent_frame(small.as_slice());
3183            assert_eq!(
3184                frame, small_expected,
3185                "attach frame after copy {i} diverged from the fresh-compressor reference"
3186            );
3187        }
3188    }
3189
3190    #[test]
3191    fn dict_primed_btlazy2_reused_across_attach_and_copy_boundary_is_byte_identical() {
3192        // Btlazy2 (Level 15) uses the 32 KiB dict attach/copy cutoff in
3193        // prepare_frame. Exercise BOTH sides of that boundary on a reused
3194        // compressor: a sub-cutoff payload (re-prime/attach path) and an
3195        // over-cutoff payload (copy-snapshot restore path). In each case the
3196        // warm-cache second frame must reproduce the cold-cache first frame
3197        // byte-for-byte (the dict cache is a timing optimization, never a
3198        // content change), and every frame must round-trip.
3199        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3200        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3201            .expect("dict bytes should parse")
3202            .id();
3203        // Distinct lines so the payload does not trivially self-compress; the
3204        // BT finder + dict dual-probe both get exercised.
3205        let make_payload = |target: usize| {
3206            let mut p = Vec::with_capacity(target);
3207            let mut i = 0u64;
3208            while p.len() < target {
3209                p.extend_from_slice(
3210                    format!(
3211                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3212                        i % 97
3213                    )
3214                    .as_bytes(),
3215                );
3216                i += 1;
3217            }
3218            p
3219        };
3220        // Below the 32 KiB cutoff (attach/re-prime) and above it (copy-snapshot).
3221        for target in [16 * 1024usize, 64 * 1024usize] {
3222            let payload = make_payload(target);
3223            let mut warm: FrameCompressor =
3224                FrameCompressor::new(super::CompressionLevel::Level(15));
3225            warm.set_encoder_dictionary(
3226                super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3227            )
3228            .expect("dict attach");
3229            // Frame 1 builds + marks the dict tables; frame 2 reuses them.
3230            let frame1 = warm.compress_independent_frame(payload.as_slice());
3231            let frame2 = warm.compress_independent_frame(payload.as_slice());
3232            assert_eq!(
3233                frame1, frame2,
3234                "reused dict cache must reproduce the freshly-primed frame byte-for-byte \
3235                 (Level 15, target={target})"
3236            );
3237            // Cold-cache compressor: must match the warm-cache bytes.
3238            let mut cold: FrameCompressor =
3239                FrameCompressor::new(super::CompressionLevel::Level(15));
3240            cold.set_encoder_dictionary(
3241                super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3242            )
3243            .expect("dict attach");
3244            let cold_frame = cold.compress_independent_frame(payload.as_slice());
3245            assert_eq!(
3246                cold_frame, frame1,
3247                "cold-cache compressor must match warm-cache frame (Level 15, target={target})"
3248            );
3249            // Round-trip through a decoder primed with the same dict.
3250            for frame in [&frame1, &frame2] {
3251                let (hdr, _) = crate::decoding::frame::read_frame_header(frame.as_slice())
3252                    .expect("frame header");
3253                assert_eq!(hdr.dictionary_id(), Some(dict_id));
3254                let mut decoder = FrameDecoder::new();
3255                decoder
3256                    .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3257                    .unwrap();
3258                let mut decoded = Vec::with_capacity(payload.len());
3259                decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3260                assert_eq!(decoded.as_slice(), payload.as_slice());
3261            }
3262        }
3263    }
3264
3265    #[test]
3266    fn dict_primed_btultra2_restore_is_floor_safe_and_byte_identical() {
3267        // Regression guard for the dictionary primed-snapshot RESTORE path on
3268        // the binary-tree (btultra2 / Level 22) backend — the path a minimal /
3269        // decoupled prepared-dict refactor rewrites.
3270        //
3271        // The trap it pins: a reused compressor compresses frame A (which fills
3272        // the live hash/chain tables with frame-A positions and advances the
3273        // window floor), then frame B of the SAME resolved shape (same size →
3274        // same PrimedKey → the snapshot RESTORE path) but DIFFERENT content. The
3275        // restore must reinstate the clean post-prime dict state with NO live
3276        // frame-A entries surviving above the restored floor; a restore that
3277        // leaks stale frame-A positions would surface FALSE matches and produce
3278        // a different (or undecodable) frame B. The invariant: a snapshot
3279        // restore is a pure timing optimization and MUST be byte-identical to a
3280        // cold compressor compressing frame B from scratch, and must round-trip.
3281        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3282        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3283            .expect("dict bytes should parse")
3284            .id();
3285        // 48 KiB > the btultra2 8 KiB attach cutoff → the copy-snapshot
3286        // capture/restore path. Two distinct payloads of the SAME size so frame
3287        // B resolves to frame A's snapshot key and takes the restore path.
3288        let make_payload = |seed: u64, target: usize| {
3289            let mut p = Vec::with_capacity(target);
3290            let mut i = seed;
3291            while p.len() < target {
3292                p.extend_from_slice(
3293                    format!(
3294                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3295                        i % 89
3296                    )
3297                    .as_bytes(),
3298                );
3299                i = i.wrapping_add(1);
3300            }
3301            p.truncate(target);
3302            p
3303        };
3304        let size = 48 * 1024usize;
3305        let frame_a = make_payload(0, size);
3306        let frame_b = make_payload(1_000_000, size);
3307
3308        let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3309        warm.set_encoder_dictionary(
3310            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3311        )
3312        .expect("dict attach");
3313        // Frame A: cold cache — primes the dict + captures the snapshot, and
3314        // fills the live tables with frame-A positions.
3315        let _wa = warm.compress_independent_frame(frame_a.as_slice());
3316        // Frame B: warm cache — takes the snapshot RESTORE path (same size).
3317        let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3318
3319        // Cold compressor compressing frame B from scratch: the ground truth.
3320        let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3321        cold.set_encoder_dictionary(
3322            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3323        )
3324        .expect("dict attach");
3325        let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3326
3327        assert_eq!(
3328            warm_b, cold_b,
3329            "frame B via snapshot restore must be byte-identical to a cold compress \
3330             (a restore that leaks frame-A live-table entries would diverge here)"
3331        );
3332
3333        // Round-trip frame B through a dict-primed decoder.
3334        let (hdr, _) =
3335            crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3336        assert_eq!(hdr.dictionary_id(), Some(dict_id));
3337        let mut decoder = FrameDecoder::new();
3338        decoder
3339            .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3340            .unwrap();
3341        let mut decoded = Vec::with_capacity(frame_b.len());
3342        decoder
3343            .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3344            .unwrap();
3345        assert_eq!(decoded.as_slice(), frame_b.as_slice());
3346    }
3347
3348    #[test]
3349    fn dict_primed_btultra2_ldm_restore_is_byte_identical() {
3350        // Same restore-path byte-identity guard as
3351        // `dict_primed_btultra2_restore_is_floor_safe_and_byte_identical`, but
3352        // with long-distance matching ENABLED. The BtMatcher's LDM producer is
3353        // part of the snapshot; a refactor that decouples it (so the snapshot
3354        // does not retain the empty LDM table) must reinstate an equivalent
3355        // empty producer on restore. This pins that the warm-cache (restore)
3356        // frame stays byte-identical to a cold compress when LDM is on.
3357        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3358        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3359            .expect("dict bytes should parse")
3360            .id();
3361        let make_payload = |seed: u64, target: usize| {
3362            let mut p = Vec::with_capacity(target);
3363            let mut i = seed;
3364            while p.len() < target {
3365                p.extend_from_slice(
3366                    format!(
3367                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3368                        i % 89
3369                    )
3370                    .as_bytes(),
3371                );
3372                i = i.wrapping_add(1);
3373            }
3374            p.truncate(target);
3375            p
3376        };
3377        let ldm_params =
3378            crate::encoding::CompressionParameters::builder(super::CompressionLevel::Level(22))
3379                .enable_long_distance_matching(true)
3380                .build()
3381                .expect("LDM-only params build");
3382        let size = 48 * 1024usize;
3383        let frame_a = make_payload(0, size);
3384        let frame_b = make_payload(1_000_000, size);
3385
3386        let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3387        warm.set_parameters(&ldm_params);
3388        warm.set_encoder_dictionary(
3389            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3390        )
3391        .expect("dict attach");
3392        let _wa = warm.compress_independent_frame(frame_a.as_slice());
3393        let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3394
3395        let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3396        cold.set_parameters(&ldm_params);
3397        cold.set_encoder_dictionary(
3398            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3399        )
3400        .expect("dict attach");
3401        let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3402
3403        assert_eq!(
3404            warm_b, cold_b,
3405            "LDM-on frame B via snapshot restore must be byte-identical to a cold compress"
3406        );
3407
3408        let (hdr, _) =
3409            crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3410        assert_eq!(hdr.dictionary_id(), Some(dict_id));
3411        let mut decoder = FrameDecoder::new();
3412        decoder
3413            .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3414            .unwrap();
3415        let mut decoded = Vec::with_capacity(frame_b.len());
3416        decoder
3417            .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3418            .unwrap();
3419        assert_eq!(decoded.as_slice(), frame_b.as_slice());
3420    }
3421
3422    #[test]
3423    fn set_dictionary_from_bytes_matches_full_decode_byte_for_byte() {
3424        // The encoder-only dict parse (`decode_dict_for_encoding`, used by
3425        // `set_dictionary_from_bytes`) skips the FSE/HUF decoder-table build and
3426        // the enrich passes. The encoder entropy tables are derived purely from
3427        // the symbol probabilities / Huffman weights, so the compressed output
3428        // MUST be byte-identical to the full-decode path. This pins the
3429        // load-bearing equivalence so a future FSE/HUF parsing refactor that
3430        // still round-trips but silently diverges on the probabilities/weights
3431        // fails loudly here instead of producing a different (but valid) frame.
3432        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3433        let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3434              tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3435
3436        // Path A: encoder-only parse straight from the raw blob.
3437        let mut from_bytes_out = Vec::new();
3438        {
3439            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3440            compressor
3441                .set_dictionary_from_bytes(dict_raw)
3442                .expect("dictionary bytes should parse");
3443            compressor.set_source(payload.as_slice());
3444            compressor.set_drain(&mut from_bytes_out);
3445            compressor.compress();
3446        }
3447
3448        // Path B: full decode (builds the decoder tables too), then attach for
3449        // encoding via the `Dictionary` setter.
3450        let full_decode = crate::decoding::Dictionary::decode_dict(dict_raw)
3451            .expect("dictionary bytes should fully decode");
3452        let mut full_decode_out = Vec::new();
3453        {
3454            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3455            compressor
3456                .set_dictionary(full_decode)
3457                .expect("full-decode dictionary should attach");
3458            compressor.set_source(payload.as_slice());
3459            compressor.set_drain(&mut full_decode_out);
3460            compressor.compress();
3461        }
3462
3463        assert_eq!(
3464            from_bytes_out, full_decode_out,
3465            "encoder-only dict parse must produce byte-identical output to the full decode"
3466        );
3467    }
3468
3469    #[test]
3470    fn set_dictionary_rejects_zero_dictionary_id() {
3471        let invalid = crate::decoding::Dictionary {
3472            id: 0,
3473            fse: crate::decoding::scratch::FSEScratch::new(),
3474            huf: crate::decoding::scratch::HuffmanScratch::new(),
3475            dict_content: vec![1, 2, 3],
3476            offset_hist: [1, 4, 8],
3477        };
3478
3479        let mut compressor: FrameCompressor<
3480            &[u8],
3481            Vec<u8>,
3482            crate::encoding::match_generator::MatchGeneratorDriver,
3483        > = FrameCompressor::new(super::CompressionLevel::Fastest);
3484        let result = compressor.set_dictionary(invalid);
3485        assert!(matches!(
3486            result,
3487            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
3488        ));
3489    }
3490
3491    #[test]
3492    fn set_dictionary_rejects_zero_repeat_offsets() {
3493        let invalid = crate::decoding::Dictionary {
3494            id: 1,
3495            fse: crate::decoding::scratch::FSEScratch::new(),
3496            huf: crate::decoding::scratch::HuffmanScratch::new(),
3497            dict_content: vec![1, 2, 3],
3498            offset_hist: [0, 4, 8],
3499        };
3500
3501        let mut compressor: FrameCompressor<
3502            &[u8],
3503            Vec<u8>,
3504            crate::encoding::match_generator::MatchGeneratorDriver,
3505        > = FrameCompressor::new(super::CompressionLevel::Fastest);
3506        let result = compressor.set_dictionary(invalid);
3507        assert!(matches!(
3508            result,
3509            Err(
3510                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
3511                    index: 0
3512                }
3513            )
3514        ));
3515    }
3516
3517    #[test]
3518    fn uncompressed_mode_does_not_require_dictionary() {
3519        let dict_id = 0xABCD_0001;
3520        let dict =
3521            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
3522                .expect("raw dictionary should be valid");
3523
3524        let payload = b"plain-bytes-that-should-stay-raw";
3525        let mut output = Vec::new();
3526        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
3527        compressor
3528            .set_dictionary(dict)
3529            .expect("dictionary should attach in uncompressed mode");
3530        compressor.set_source(payload.as_slice());
3531        compressor.set_drain(&mut output);
3532        compressor.compress();
3533
3534        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3535            .expect("encoded frame should have a header");
3536        assert_eq!(
3537            frame_header.dictionary_id(),
3538            None,
3539            "raw/uncompressed frames must not advertise dictionary dependency"
3540        );
3541
3542        let mut decoder = FrameDecoder::new();
3543        let mut decoded = Vec::with_capacity(payload.len());
3544        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3545        assert_eq!(decoded, payload);
3546    }
3547
3548    #[test]
3549    fn default_level_tiny_raw_dict_compresses_cleanly() {
3550        // Coverage for the dfast dict-attach fast path with a
3551        // sub-min-match raw-content dictionary: the dict-table probe in
3552        // `start_matching_fast_loop` is gated on the dict table actually
3553        // existing (`table().is_some()`), not merely on `is_attached()`,
3554        // so a dictionary whose hashable region is shorter than the
3555        // short-hash lookahead (where `prime_dict_tables_for_range`
3556        // returns before allocating the tables) never dereferences a
3557        // null dict pointer. Compressing at the default (dfast) level
3558        // with such a dict must succeed.
3559        let dict_id = 0xABCD_0009;
3560        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3561            .expect("raw dictionary should be valid");
3562        let payload = b"the quick brown fox jumps over the lazy dog, repeatedly and at length";
3563        let mut output = Vec::new();
3564        let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
3565        compressor
3566            .set_dictionary(dict)
3567            .expect("tiny raw dictionary should attach");
3568        compressor.set_source(payload.as_slice());
3569        compressor.set_drain(&mut output);
3570        compressor.compress();
3571        assert!(!output.is_empty(), "compression should produce a frame");
3572
3573        // The emitted frame must advertise the attached dictionary id, proving
3574        // the tiny-dict path stayed active (the payload round-trips either way,
3575        // so without this the test would also pass on a silent no-dict frame).
3576        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3577            .expect("encoded frame should have a readable header");
3578        assert_eq!(
3579            frame_header.dictionary_id(),
3580            Some(dict_id),
3581            "tiny raw dict frame should still advertise its dictionary id",
3582        );
3583
3584        // Full roundtrip: decode the dict-compressed frame with the SAME
3585        // dictionary attached and confirm byte-exact recovery — proves the
3586        // tiny-dict fast path produces a correct frame, not just a non-empty
3587        // one.
3588        let decode_dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3589            .expect("raw dictionary should be valid");
3590        let mut decoder = FrameDecoder::new();
3591        decoder
3592            .add_dict(decode_dict)
3593            .expect("decoder dict should attach");
3594        let mut decoded = Vec::with_capacity(payload.len());
3595        decoder
3596            .decode_all_to_vec(&output, &mut decoded)
3597            .expect("dict roundtrip should decode");
3598        assert_eq!(decoded, payload, "tiny-dict roundtrip mismatch");
3599    }
3600
3601    /// Exercises the dictionary dual-probe (live + immutable dict tables)
3602    /// in the Fast / dfast / Row match finders with a dict whose content
3603    /// the payload actually reuses, so each backend's dict long/short
3604    /// probe (and the dfast `ip+1` dict-long retry) is reached and the
3605    /// dict-compressed frame round-trips through a decoder primed with the
3606    /// same dict. The 3-byte-dict test above only proves the null-table
3607    /// guard; this proves the full attach path produces correct frames.
3608    #[test]
3609    fn dict_attach_roundtrips_across_backends_with_matching_payload() {
3610        let dict_id = 0xD1C7_0001;
3611        // Distinct lines so the payload does NOT self-compress: each line
3612        // appears exactly once in the payload, so without the dictionary there
3613        // are no in-frame back-references to exploit. The dictionary holds the
3614        // SAME lines, so the only way the output shrinks is if the dict probe
3615        // actually fires. A no-dict baseline below pins that the dict path ran
3616        // (self-compressible payloads would round-trip + stay small via
3617        // in-frame matches alone, proving nothing).
3618        let line = |i: u32| {
3619            alloc::format!(
3620                "ts=2026-03-26T21:{:02}:{:02}Z level=INFO msg=\"event {i:05}\" tenant=t{i} region=eu\n",
3621                i / 60 % 60,
3622                i % 60,
3623            )
3624            .into_bytes()
3625        };
3626        let mut dict_content = Vec::new();
3627        for i in 0..256u32 {
3628            dict_content.extend_from_slice(&line(i));
3629        }
3630        // Payload = the same distinct lines in a different (stride) order, each
3631        // once → no self-repeats, every line is a dictionary match.
3632        let mut payload = Vec::new();
3633        let mut i = 0u32;
3634        for _ in 0..256u32 {
3635            payload.extend_from_slice(&line(i));
3636            i = (i + 97) % 256; // coprime stride → permutation, no adjacency
3637        }
3638
3639        let compress_at = |level, dict: Option<Vec<u8>>| -> Vec<u8> {
3640            let mut compressor = FrameCompressor::new(level);
3641            if let Some(bytes) = dict {
3642                let d = crate::decoding::Dictionary::from_raw_content(dict_id, bytes)
3643                    .expect("raw dictionary should be valid");
3644                compressor
3645                    .set_dictionary(d)
3646                    .expect("dictionary should attach");
3647            }
3648            let mut out = Vec::new();
3649            compressor.set_source(payload.as_slice());
3650            compressor.set_drain(&mut out);
3651            compressor.compress();
3652            out
3653        };
3654
3655        for level in [
3656            super::CompressionLevel::Level(-5), // Fast (negative)
3657            super::CompressionLevel::Level(1),  // Fast
3658            super::CompressionLevel::Default,   // dfast (L3)
3659            super::CompressionLevel::Level(8),  // Row-backed lazy2
3660        ] {
3661            let out = compress_at(level, Some(dict_content.clone()));
3662            let no_dict = compress_at(level, None);
3663            // The dict path MUST measurably beat no-dict on this
3664            // non-self-compressible payload — otherwise the dict probe never
3665            // fired and the roundtrip below would prove nothing.
3666            assert!(
3667                out.len() < no_dict.len(),
3668                "level {level:?}: dict-primed output ({}) must beat no-dict ({}) — dict probe did not fire",
3669                out.len(),
3670                no_dict.len(),
3671            );
3672
3673            let ddict =
3674                crate::decoding::Dictionary::from_raw_content(dict_id, dict_content.clone())
3675                    .expect("raw dictionary should be valid");
3676            let mut decoder = FrameDecoder::new();
3677            decoder.add_dict(ddict).expect("decoder dict should attach");
3678            let mut decoded = Vec::with_capacity(payload.len());
3679            decoder
3680                .decode_all_to_vec(&out, &mut decoded)
3681                .unwrap_or_else(|e| panic!("level {level:?}: dict roundtrip decode failed: {e:?}"));
3682            assert_eq!(decoded, payload, "level {level:?}: dict roundtrip mismatch");
3683        }
3684    }
3685
3686    /// Reusing one compressor across independent frames with DIFFERENT
3687    /// dictionaries must drop the per-backend dict cache on each swap
3688    /// (Simple/Dfast/Row keep the attach index across frames). Without the
3689    /// invalidation a later frame would reuse the previous dict's rows.
3690    /// Each frame round-trips through a decoder primed with its own dict.
3691    #[test]
3692    fn dict_swap_across_reused_compressor_roundtrips() {
3693        // Distinct lines per dict (not a single repeated line) so payloads do
3694        // NOT self-compress: each line appears once, so a frame only shrinks if
3695        // the dict probe fires, and — crucially for the invalidation check — if
3696        // frame B reused dict A's stale rows it would emit offsets into A's
3697        // distinct content, which decode under dict B reconstructs as WRONG
3698        // bytes (caught by the roundtrip). A single repeated line would hide
3699        // pollution behind in-frame matches.
3700        let lines = |tag: &str| -> (Vec<u8>, Vec<u8>) {
3701            let line =
3702                |i: u32| alloc::format!("{tag} record {i:05} field=value{i} end\n").into_bytes();
3703            let mut dict = Vec::new();
3704            for i in 0..256u32 {
3705                dict.extend_from_slice(&line(i));
3706            }
3707            let mut payload = Vec::new();
3708            let mut i = 0u32;
3709            for _ in 0..256u32 {
3710                payload.extend_from_slice(&line(i));
3711                i = (i + 97) % 256;
3712            }
3713            (dict, payload)
3714        };
3715        let (dict_a, payload_a) = lines("alpha");
3716        let (dict_b, payload_b) = lines("bravo");
3717
3718        for level in [
3719            super::CompressionLevel::Default,
3720            super::CompressionLevel::Level(8),
3721        ] {
3722            let no_dict = |payload: &[u8]| -> usize {
3723                let mut c: FrameCompressor = FrameCompressor::new(level);
3724                c.compress_independent_frame(payload).len()
3725            };
3726            let no_dict_a = no_dict(&payload_a);
3727            let no_dict_b = no_dict(&payload_b);
3728
3729            let mut compressor: FrameCompressor = FrameCompressor::new(level);
3730            for (dict_bytes, payload, no_dict_len) in [
3731                (&dict_a, &payload_a, no_dict_a),
3732                (&dict_b, &payload_b, no_dict_b),
3733            ] {
3734                let dict =
3735                    crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
3736                        .expect("raw dictionary should be valid");
3737                compressor
3738                    .set_dictionary(dict)
3739                    .expect("dictionary should attach");
3740                let out = compressor.compress_independent_frame(payload.as_slice());
3741                assert!(
3742                    out.len() < no_dict_len,
3743                    "level {level:?}: dict frame ({}) must beat no-dict ({}) — dict probe did not fire",
3744                    out.len(),
3745                    no_dict_len,
3746                );
3747
3748                let ddict =
3749                    crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
3750                        .expect("raw dictionary should be valid");
3751                let mut decoder = FrameDecoder::new();
3752                decoder.add_dict(ddict).expect("decoder dict should attach");
3753                let mut decoded = Vec::with_capacity(payload.len());
3754                decoder
3755                    .decode_all_to_vec(&out, &mut decoded)
3756                    .unwrap_or_else(|e| panic!("level {level:?}: dict-swap decode failed: {e:?}"));
3757                assert_eq!(
3758                    decoded, *payload,
3759                    "level {level:?}: dict-swap roundtrip mismatch (stale dict rows?)"
3760                );
3761            }
3762        }
3763    }
3764
3765    #[test]
3766    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
3767        use crate::encoding::match_generator::MatchGeneratorDriver;
3768
3769        let dict_id = 0xABCD_0002;
3770        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
3771            .expect("raw dictionary should be valid");
3772        let dict_for_decoder =
3773            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
3774                .expect("raw dictionary should be valid");
3775
3776        // Payload must exceed the encoder's advertised window (512 KiB
3777        // for Fastest after `window_log = 19` alignment with upstream zstd's
3778        // L1 fast row in `clevels.h`) so the test actually exercises
3779        // cross-window-boundary behavior.
3780        let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
3781        let matcher = MatchGeneratorDriver::new(1024, 1);
3782
3783        let mut no_dict_output = Vec::new();
3784        let mut no_dict_compressor =
3785            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
3786        no_dict_compressor.set_source(payload.as_slice());
3787        no_dict_compressor.set_drain(&mut no_dict_output);
3788        no_dict_compressor.compress();
3789        let (no_dict_frame_header, _) =
3790            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
3791                .expect("baseline frame should have a header");
3792        let no_dict_window = no_dict_frame_header
3793            .window_size()
3794            .expect("window size should be present");
3795
3796        let mut output = Vec::new();
3797        let matcher = MatchGeneratorDriver::new(1024, 1);
3798        let mut compressor =
3799            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
3800        compressor
3801            .set_dictionary(dict)
3802            .expect("dictionary should attach");
3803        compressor.set_source(payload.as_slice());
3804        compressor.set_drain(&mut output);
3805        compressor.compress();
3806
3807        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3808            .expect("encoded frame should have a header");
3809        let advertised_window = frame_header
3810            .window_size()
3811            .expect("window size should be present");
3812        assert_eq!(
3813            advertised_window, no_dict_window,
3814            "dictionary priming must not inflate advertised window size"
3815        );
3816        assert!(
3817            payload.len() > advertised_window as usize,
3818            "test must cross the advertised window boundary"
3819        );
3820
3821        let mut decoder = FrameDecoder::new();
3822        decoder.add_dict(dict_for_decoder).unwrap();
3823        let mut decoded = Vec::with_capacity(payload.len());
3824        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3825        assert_eq!(decoded, payload);
3826    }
3827
3828    #[test]
3829    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
3830        let dict_id = 0xABCD_0004;
3831        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
3832        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
3833        let dict_for_decoder =
3834            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
3835        let payload = b"abcdabcdabcdabcd".repeat(128);
3836
3837        let mut hinted_output = Vec::new();
3838        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
3839        hinted.set_dictionary(dict).unwrap();
3840        hinted.set_source_size_hint(1);
3841        hinted.set_source(payload.as_slice());
3842        hinted.set_drain(&mut hinted_output);
3843        hinted.compress();
3844
3845        let mut no_hint_output = Vec::new();
3846        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
3847        no_hint
3848            .set_dictionary(
3849                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
3850                    .unwrap(),
3851            )
3852            .unwrap();
3853        no_hint.set_source(payload.as_slice());
3854        no_hint.set_drain(&mut no_hint_output);
3855        no_hint.compress();
3856
3857        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
3858            .expect("encoded frame should have a header")
3859            .0
3860            .window_size()
3861            .expect("window size should be present");
3862        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
3863            .expect("encoded frame should have a header")
3864            .0
3865            .window_size()
3866            .expect("window size should be present");
3867        assert!(
3868            hinted_window <= no_hint_window,
3869            "source-size hint should not increase advertised window with dictionary priming",
3870        );
3871
3872        let mut decoder = FrameDecoder::new();
3873        decoder.add_dict(dict_for_decoder).unwrap();
3874        let mut decoded = Vec::with_capacity(payload.len());
3875        decoder
3876            .decode_all_to_vec(&hinted_output, &mut decoded)
3877            .unwrap();
3878        assert_eq!(decoded, payload);
3879    }
3880
3881    /// A dictionary segment embedded ONCE in otherwise-incompressible
3882    /// input must be matched against the dictionary. Before the fix the
3883    /// raw-fast-path (which skips matching) fired on the
3884    /// incompressible-looking block and the dictionary was never searched,
3885    /// so `with_dict` came out the same size as `no_dict` (the embedded
3886    /// match was lost). Now the block compresses against the dict.
3887    #[test]
3888    fn dictionary_segment_in_incompressible_input_is_matched() {
3889        // Deterministic LCG bytes: high-entropy, so the only compressible
3890        // content is the embedded dictionary segment.
3891        fn lcg(seed: u64, n: usize) -> alloc::vec::Vec<u8> {
3892            let mut s = seed;
3893            (0..n)
3894                .map(|_| {
3895                    s = s
3896                        .wrapping_mul(6364136223846793005)
3897                        .wrapping_add(1442695040888963407);
3898                    (s >> 56) as u8
3899                })
3900                .collect()
3901        }
3902        let dict_id = 0x00DC_7777;
3903        let r = lcg(1, 512); // the dictionary content
3904        let mut payload = lcg(2, 2000); // incompressible filler before
3905        payload.extend_from_slice(&r); // the single dict-matchable segment
3906        payload.extend_from_slice(&lcg(3, 1500)); // filler after
3907
3908        // Precondition: the payload must actually look incompressible so
3909        // that the raw-fast-path WOULD fire (and skip matching) without
3910        // the fix. If the heuristic ever changes and this no longer holds,
3911        // the test below would pass vacuously — assert it up front.
3912        assert!(
3913            crate::encoding::incompressible::block_looks_incompressible(&payload),
3914            "test payload must look incompressible to exercise the raw-fast-path",
3915        );
3916
3917        let compress =
3918            |level: super::CompressionLevel, dict: Option<&[u8]>| -> alloc::vec::Vec<u8> {
3919                let mut out = alloc::vec::Vec::new();
3920                let mut c = FrameCompressor::new(level);
3921                if let Some(d) = dict {
3922                    c.set_dictionary(
3923                        crate::decoding::Dictionary::from_raw_content(dict_id, d.to_vec()).unwrap(),
3924                    )
3925                    .unwrap();
3926                }
3927                c.set_source(payload.as_slice());
3928                c.set_drain(&mut out);
3929                c.compress();
3930                out
3931            };
3932
3933        for lvl in [
3934            super::CompressionLevel::Level(2),
3935            super::CompressionLevel::Level(6),
3936            super::CompressionLevel::Level(19),
3937        ] {
3938            let with_dict = compress(lvl, Some(&r));
3939            let no_dict = compress(lvl, None);
3940            // The 512-byte dict segment should be matched, saving most of
3941            // its length (generous slack for sequence/header coding).
3942            assert!(
3943                with_dict.len() + 300 < no_dict.len(),
3944                "{lvl:?}: dict segment not matched (with_dict={}, no_dict={})",
3945                with_dict.len(),
3946                no_dict.len(),
3947            );
3948            // The dict-compressed frame must round-trip through the decoder.
3949            let mut decoder = FrameDecoder::new();
3950            decoder
3951                .add_dict(
3952                    crate::decoding::Dictionary::from_raw_content(dict_id, r.clone()).unwrap(),
3953                )
3954                .unwrap();
3955            let mut decoded = Vec::with_capacity(payload.len());
3956            decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
3957            assert_eq!(decoded, payload, "{lvl:?}: dict round-trip mismatch");
3958
3959            // A dictionary that does NOT appear in the input must not make
3960            // the output larger than the no-dict (raw) encoding: the
3961            // post-compress raw fallback covers incompressible-with-dict.
3962            let unrelated = lcg(99, 512);
3963            let with_bad_dict = compress(lvl, Some(&unrelated));
3964            assert!(
3965                with_bad_dict.len() <= no_dict.len() + 16,
3966                "{lvl:?}: unhelpful dict expanded output (with={}, no_dict={})",
3967                with_bad_dict.len(),
3968                no_dict.len(),
3969            );
3970        }
3971    }
3972
3973    #[test]
3974    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
3975        let dict_id = 0xABCD_0005;
3976        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
3977        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
3978        let dict_for_decoder =
3979            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
3980        let payload = b"abcd".repeat(1024); // 4 KiB payload
3981        let payload_len = payload.len() as u64;
3982
3983        let mut hinted_output = Vec::new();
3984        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
3985        hinted.set_dictionary(dict).unwrap();
3986        hinted.set_source_size_hint(payload_len);
3987        hinted.set_source(payload.as_slice());
3988        hinted.set_drain(&mut hinted_output);
3989        hinted.compress();
3990
3991        let mut no_hint_output = Vec::new();
3992        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
3993        no_hint
3994            .set_dictionary(
3995                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
3996                    .unwrap(),
3997            )
3998            .unwrap();
3999        no_hint.set_source(payload.as_slice());
4000        no_hint.set_drain(&mut no_hint_output);
4001        no_hint.compress();
4002
4003        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4004            .expect("encoded frame should have a header")
4005            .0
4006            .window_size()
4007            .expect("window size should be present");
4008        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4009            .expect("encoded frame should have a header")
4010            .0
4011            .window_size()
4012            .expect("window size should be present");
4013        assert!(
4014            hinted_window <= no_hint_window,
4015            "source-size hint should not increase advertised window with dictionary priming",
4016        );
4017
4018        let mut decoder = FrameDecoder::new();
4019        decoder.add_dict(dict_for_decoder).unwrap();
4020        let mut decoded = Vec::with_capacity(payload.len());
4021        decoder
4022            .decode_all_to_vec(&hinted_output, &mut decoded)
4023            .unwrap();
4024        assert_eq!(decoded, payload);
4025    }
4026
4027    #[test]
4028    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
4029        let dict_id = 0xABCD_0003;
4030        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4031            .expect("raw dictionary should be valid");
4032        let payload = b"abcdefghabcdefgh";
4033
4034        let mut output = Vec::new();
4035        let matcher = NoDictionaryMatcher::new(64);
4036        let mut compressor =
4037            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4038        compressor
4039            .set_dictionary(dict)
4040            .expect("dictionary should attach");
4041        compressor.set_source(payload.as_slice());
4042        compressor.set_drain(&mut output);
4043        compressor.compress();
4044
4045        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4046            .expect("encoded frame should have a header");
4047        assert_eq!(
4048            frame_header.dictionary_id(),
4049            None,
4050            "matchers that do not support dictionary priming must not advertise dictionary dependency"
4051        );
4052
4053        let mut decoder = FrameDecoder::new();
4054        let mut decoded = Vec::with_capacity(payload.len());
4055        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4056        assert_eq!(decoded, payload);
4057    }
4058
4059    #[cfg(feature = "hash")]
4060    #[test]
4061    fn checksum_two_frames_reused_compressor() {
4062        // Compress the same data twice using the same compressor and verify that:
4063        // 1. The checksum written in each frame matches what the decoder calculates.
4064        // 2. The hasher is correctly reset between frames (no cross-contamination).
4065        //    If the hasher were NOT reset, the second frame's calculated checksum
4066        //    would differ from the one stored in the frame data, causing assert_eq to fail.
4067        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
4068
4069        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
4070
4071        // --- Frame 1 ---
4072        let mut compressed1 = Vec::new();
4073        compressor.set_source(data.as_slice());
4074        compressor.set_drain(&mut compressed1);
4075        compressor.compress();
4076
4077        // --- Frame 2 (reuse the same compressor) ---
4078        let mut compressed2 = Vec::new();
4079        compressor.set_source(data.as_slice());
4080        compressor.set_drain(&mut compressed2);
4081        compressor.compress();
4082
4083        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
4084            let mut decoder = FrameDecoder::new();
4085            let mut source = compressed;
4086            decoder.reset(&mut source).unwrap();
4087            while !decoder.is_finished() {
4088                decoder
4089                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4090                    .unwrap();
4091            }
4092            let mut decoded = Vec::new();
4093            decoder.collect_to_writer(&mut decoded).unwrap();
4094            (
4095                decoded,
4096                decoder.get_checksum_from_data(),
4097                decoder.get_calculated_checksum(),
4098            )
4099        }
4100
4101        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
4102        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
4103        assert_eq!(
4104            chksum_from_data1, chksum_calculated1,
4105            "frame 1: checksum mismatch"
4106        );
4107
4108        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
4109        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
4110        assert_eq!(
4111            chksum_from_data2, chksum_calculated2,
4112            "frame 2: checksum mismatch"
4113        );
4114
4115        // Same data compressed twice must produce the same checksum.
4116        // If state leaked across frames, the second calculated checksum would differ.
4117        assert_eq!(
4118            chksum_from_data1, chksum_from_data2,
4119            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
4120        );
4121    }
4122
4123    #[cfg(feature = "lsm")]
4124    #[test]
4125    fn frame_emit_info_decompressed_ranges_match_decoded_output() {
4126        // Part A correctness: the per-block `decompressed_size` captured during
4127        // encode (and the `decompressed_byte_range` prefix sum derived from it)
4128        // must describe the real decoded output exactly — one entry per
4129        // physical block, contiguous, summing to the full decompressed length.
4130        // A multi-block compressible payload exercises the Compressed-block
4131        // path (whose regenerated size is NOT on the wire, so it relies on the
4132        // encode-side capture this test guards).
4133        let data = emit_info_fixture_data();
4134
4135        // Cover both the single-block-per-chunk path (Default) and the
4136        // Level(16..=22) post-split path (multiple physical partitions per
4137        // input chunk), since lsm-tree compresses at zstd:22 and post-split
4138        // is the riskiest capture site (per-partition `src_size`).
4139        for level in [
4140            super::CompressionLevel::Default,
4141            super::CompressionLevel::Level(22),
4142        ] {
4143            let mut compressed = Vec::new();
4144            let mut compressor = FrameCompressor::new(level);
4145            // Pledge the source size so the high-level (22) window shrinks to
4146            // fit the payload, keeping the frame compact (no oversized window
4147            // descriptor for a small input). Still >= 128 KiB, so post-split
4148            // eligibility is preserved.
4149            compressor.set_source_size_hint(data.len() as u64);
4150            compressor.set_source(data.as_slice());
4151            compressor.set_drain(&mut compressed);
4152            compressor.compress();
4153
4154            let info = compressor
4155                .last_frame_emit_info()
4156                .expect("emit info populated after compress")
4157                .clone();
4158
4159            // Reference: full decode of the same frame.
4160            let mut decoder = FrameDecoder::new();
4161            let mut source = compressed.as_slice();
4162            decoder.reset(&mut source).unwrap();
4163            while !decoder.is_finished() {
4164                decoder
4165                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4166                    .unwrap();
4167            }
4168            let mut decoded = Vec::new();
4169            decoder.collect_to_writer(&mut decoded).unwrap();
4170            assert_eq!(decoded, data, "sanity: frame must round-trip ({level:?})");
4171
4172            assert!(
4173                info.blocks.len() >= 2,
4174                "fixture must span multiple blocks to exercise the mapping ({level:?}, got {})",
4175                info.blocks.len()
4176            );
4177            assert!(
4178                info.blocks.last().unwrap().last_block,
4179                "final block must carry last_block ({level:?})"
4180            );
4181
4182            // Pin the Level(22) post-split path: the owned loop feeds the
4183            // encoder MAX_BLOCK_SIZE input chunks, so without post-split the
4184            // block count cannot exceed the chunk count. More blocks than
4185            // chunks proves at least one chunk was split into multiple physical
4186            // partitions (the per-partition `src_size` capture under test).
4187            if matches!(level, super::CompressionLevel::Level(22)) {
4188                let max_block = crate::common::MAX_BLOCK_SIZE as usize;
4189                let n_chunks = data.len().div_ceil(max_block);
4190                assert!(
4191                    info.blocks.len() > n_chunks,
4192                    "Level(22) must exercise post-split: {} blocks for {} input chunks",
4193                    info.blocks.len(),
4194                    n_chunks
4195                );
4196            }
4197
4198            // Per-block ranges: contiguous, zero-based, summing to the full output.
4199            let mut expected_start = 0u64;
4200            for i in 0..info.blocks.len() {
4201                let range = info
4202                    .decompressed_byte_range(i)
4203                    .expect("in-bounds block has a range");
4204                assert_eq!(
4205                    range.start, expected_start,
4206                    "block {i} range must start where the previous ended ({level:?})"
4207                );
4208                assert_eq!(
4209                    u64::from(info.blocks[i].decompressed_size),
4210                    range.end - range.start,
4211                    "block {i} decompressed_size must equal its range width ({level:?})"
4212                );
4213                // Validate the mapping against REAL per-block bytes, not just
4214                // prefix-sum consistency: decode block `i` alone and require it
4215                // to equal the corresponding slice of the full decode. A
4216                // sidecar that swapped sizes between adjacent blocks (same sum,
4217                // same contiguity) would fail here.
4218                let mut psrc = compressed.as_slice();
4219                let mut pdec = FrameDecoder::new();
4220                pdec.reset(&mut psrc).unwrap();
4221                let pd = pdec
4222                    .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4223                    .unwrap();
4224                assert!(
4225                    pd.stopped_at.is_none(),
4226                    "block {i} must decode cleanly ({level:?})"
4227                );
4228                assert_eq!(
4229                    pd.data.as_slice(),
4230                    &decoded[range.start as usize..range.end as usize],
4231                    "block {i} partial-decode bytes must equal the full-decode slice ({level:?})"
4232                );
4233                expected_start = range.end;
4234            }
4235            assert_eq!(
4236                expected_start,
4237                decoded.len() as u64,
4238                "block decompressed sizes must sum to the full decoded length ({level:?})"
4239            );
4240            assert_eq!(
4241                info.decompressed_byte_range(info.blocks.len()),
4242                None,
4243                "out-of-range index yields None ({level:?})"
4244            );
4245        }
4246    }
4247
4248    /// ~400 KiB semi-repetitive payload (long runs interleaved with a stride
4249    /// phrase) that compresses into several multi-block frames across levels.
4250    #[cfg(feature = "lsm")]
4251    fn emit_info_fixture_data() -> Vec<u8> {
4252        let mut data: Vec<u8> = Vec::with_capacity(400 * 1024);
4253        let mut x = 0x9E37_79B9u32;
4254        while data.len() < 400 * 1024 {
4255            x ^= x << 13;
4256            x ^= x >> 17;
4257            x ^= x << 5;
4258            let run = 16 + (x as usize % 48);
4259            let byte = (x >> 24) as u8;
4260            for _ in 0..run {
4261                data.push(byte);
4262            }
4263            data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4264        }
4265        data
4266    }
4267
4268    #[cfg(feature = "lsm")]
4269    #[test]
4270    fn frame_emit_info_decompressed_ranges_match_on_borrowed_oneshot_path() {
4271        // The borrowed one-shot path (`compress_independent_frame` ->
4272        // `run_borrowed_block_loop` -> `compress_block_encoded_borrowed`)
4273        // threads the decompressed-size sidecar through a DIFFERENT emit site
4274        // than the owned/streaming loop, so it needs its own per-block mapping
4275        // check. A Fast level keeps the encoder on the borrowed-eligible
4276        // (Simple matcher) path.
4277        let data = emit_info_fixture_data();
4278
4279        let mut compressor: FrameCompressor =
4280            FrameCompressor::new(super::CompressionLevel::Fastest);
4281        let compressed = compressor.compress_independent_frame(data.as_slice());
4282        let info = compressor
4283            .last_frame_emit_info()
4284            .expect("emit info populated after compress_independent_frame")
4285            .clone();
4286        // Pin the compressed-block path: without this the fixture could regress
4287        // into the raw-fast fallback and still pass via the Raw wire-size
4288        // fallback in populate_frame_emit_info, never exercising the borrowed
4289        // compressed-block sidecar capture this test targets.
4290        assert!(
4291            info.blocks
4292                .iter()
4293                .any(|b| matches!(b.block_type, crate::blocks::block::BlockType::Compressed)),
4294            "borrowed-path fixture must emit at least one compressed block"
4295        );
4296        assert!(
4297            info.blocks.len() >= 2,
4298            "borrowed fixture must span multiple blocks (got {})",
4299            info.blocks.len()
4300        );
4301        assert!(info.blocks.last().unwrap().last_block);
4302
4303        // Full decode reference.
4304        let mut decoder = FrameDecoder::new();
4305        let mut source = compressed.as_slice();
4306        decoder.reset(&mut source).unwrap();
4307        while !decoder.is_finished() {
4308            decoder
4309                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4310                .unwrap();
4311        }
4312        let mut decoded = Vec::new();
4313        decoder.collect_to_writer(&mut decoded).unwrap();
4314        assert_eq!(decoded, data, "borrowed one-shot frame must round-trip");
4315
4316        // Each block's mapping must match real per-block bytes.
4317        let mut expected_start = 0u64;
4318        for i in 0..info.blocks.len() {
4319            let range = info.decompressed_byte_range(i).unwrap();
4320            assert_eq!(range.start, expected_start, "block {i} range contiguity");
4321            let mut psrc = compressed.as_slice();
4322            let mut pdec = FrameDecoder::new();
4323            pdec.reset(&mut psrc).unwrap();
4324            let pd = pdec
4325                .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4326                .unwrap();
4327            assert!(pd.stopped_at.is_none(), "block {i} must decode cleanly");
4328            assert_eq!(
4329                pd.data.as_slice(),
4330                &decoded[range.start as usize..range.end as usize],
4331                "borrowed block {i} partial-decode bytes must equal the full-decode slice"
4332            );
4333            expected_start = range.end;
4334        }
4335        assert_eq!(
4336            expected_start,
4337            decoded.len() as u64,
4338            "ranges sum to full length"
4339        );
4340    }
4341
4342    // The fuzz-artifact interop replay (C-compress -> our-decode and
4343    // our-compress -> C-decode) moved to `ffi-bench/tests/fuzz_interop.rs` so
4344    // the library crate never links libzstd.
4345
4346    /// Homogeneous input — every byte the same — must NOT be split:
4347    /// both border histograms are identical (all 512 hits on a single
4348    /// slot), so `presplit_fingerprints_differ` returns `false` and the
4349    /// function takes the early-return path at
4350    /// `zstd_preSplit.c:214` returning `blockSize`.
4351    #[test]
4352    fn split_block_from_borders_keeps_homogeneous_block() {
4353        let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
4354        let split = super::split_block_from_borders(&block);
4355        assert_eq!(split, MAX_BLOCK_SIZE as usize);
4356    }
4357
4358    /// Heterogeneous input — first half all zeros, second half a
4359    /// counter sequence — has clearly distinguishable border
4360    /// histograms, so the borders heuristic decides to split.
4361    ///
4362    /// The transition sits at exactly the block midpoint, so the
4363    /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
4364    /// zeros + half counter values. That makes it roughly
4365    /// equidistant from both border fingerprints — the
4366    /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
4367    /// branch fires and the heuristic returns the midpoint (64 KiB)
4368    /// per `zstd_preSplit.c:222`. The test asserts the exact value
4369    /// rather than just "one of {32K, 64K, 96K}" so a regression
4370    /// to a different quantised arm cannot silently slip through.
4371    #[test]
4372    fn split_block_from_borders_returns_midpoint_for_centred_transition() {
4373        let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
4374        for (i, byte) in block
4375            .iter_mut()
4376            .enumerate()
4377            .skip(MAX_BLOCK_SIZE as usize / 2)
4378        {
4379            *byte = (i % 251 + 1) as u8;
4380        }
4381        let split = super::split_block_from_borders(&block);
4382        assert_eq!(
4383            split,
4384            64 * 1024,
4385            "centred-transition fixture must take the symmetric \
4386             midpoint arm (`abs_diff < min_distance`), got {split}"
4387        );
4388    }
4389
4390    /// `level_pre_split` resolves the per-level split knob through the
4391    /// `LevelParams` table, mirroring the upstream zstd `splitLevels[]` by strategy
4392    /// (`ZSTD_optimalBlockSize`): fast → 0 (from-borders), dfast → 1,
4393    /// greedy/lazy → 2, lazy2/btlazy2 (Lazy tag at depth 2) → 3,
4394    /// btopt/btultra/btultra2 → 4. `Uncompressed` has no numeric level so it
4395    /// stays `None`.
4396    #[test]
4397    fn pre_split_level_dispatches_by_compression_level() {
4398        use crate::encoding::CompressionLevel;
4399        use crate::encoding::match_generator::level_pre_split;
4400        assert_eq!(level_pre_split(CompressionLevel::Uncompressed), None);
4401        // Fastest = level 1 (fast) → 0 (from-borders).
4402        assert_eq!(level_pre_split(CompressionLevel::Fastest), Some(0));
4403        // Default = level 3 (dfast) → 1.
4404        assert_eq!(level_pre_split(CompressionLevel::Default), Some(1));
4405        // Better is a pure alias for level 7 (lazy): same as Level(7).
4406        assert_eq!(
4407            level_pre_split(CompressionLevel::Better),
4408            level_pre_split(CompressionLevel::Level(7)),
4409        );
4410        // Best resolves to the level-13 table row (btlazy2): pin it to that
4411        // numeric route so the named path can't drift from the pre-split
4412        // table.
4413        assert_eq!(
4414            level_pre_split(CompressionLevel::Best),
4415            level_pre_split(CompressionLevel::Level(13)),
4416        );
4417        assert_eq!(level_pre_split(CompressionLevel::Level(2)), Some(0)); // fast
4418        assert_eq!(level_pre_split(CompressionLevel::Level(4)), Some(1)); // dfast
4419        assert_eq!(level_pre_split(CompressionLevel::Level(5)), Some(2)); // greedy
4420        assert_eq!(level_pre_split(CompressionLevel::Level(7)), Some(2)); // lazy (depth 1)
4421        // lazy2 / btlazy2 use the rate-1 full-scan splitter (4), not the
4422        // rate-5 sampler (3): the sampler phantom-splits homogeneous periodic
4423        // input (see `pre_split` comment + `periodic_stream_not_oversplit`).
4424        assert_eq!(level_pre_split(CompressionLevel::Level(8)), Some(4)); // lazy2 lower bound
4425        assert_eq!(level_pre_split(CompressionLevel::Level(11)), Some(4)); // lazy2 (depth 2)
4426        assert_eq!(level_pre_split(CompressionLevel::Level(12)), Some(4)); // lazy2 upper bound
4427        assert_eq!(level_pre_split(CompressionLevel::Level(13)), Some(4)); // btlazy2 lower bound
4428        assert_eq!(level_pre_split(CompressionLevel::Level(15)), Some(4)); // btlazy2 (depth 2)
4429        assert_eq!(level_pre_split(CompressionLevel::Level(16)), Some(4)); // btopt
4430        assert_eq!(level_pre_split(CompressionLevel::Level(22)), Some(4)); // btultra2
4431    }
4432
4433    /// Regression: a homogeneous but periodic multi-block stream must not be
4434    /// pre-split into tiny blocks at the lazy2 / btlazy2 levels. The rate-5
4435    /// chunk sampler used to phantom-split such input at every 8 KB chunk,
4436    /// cascading a large stream into hundreds of tiny blocks whose per-block
4437    /// headers ballooned the output (~5x vs the lazy level next door). With
4438    /// the rate-1 full-scan splitter the periodic stream is seen as uniform
4439    /// and stays a few full blocks. We assert the lazy2 (L8) and btlazy2 (L15)
4440    /// outputs stay within 2x of the lazy (L7) output on the same input, and
4441    /// that every output round-trips.
4442    #[test]
4443    fn periodic_stream_not_oversplit() {
4444        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4445        const LINES: &[&str] = &[
4446            "ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo table=orders region=eu-west\n",
4447            "ts=2026-03-26T21:39:29Z level=INFO msg=\"rotate segment\" tenant=demo table=orders region=eu-west\n",
4448            "ts=2026-03-26T21:39:30Z level=INFO msg=\"compact level\" tenant=demo table=orders region=eu-west\n",
4449            "ts=2026-03-26T21:39:31Z level=INFO msg=\"write block\" tenant=demo table=orders region=eu-west\n",
4450        ];
4451        // 512 KB = 4 upstream zstd blocks, enough for the cascade to manifest.
4452        let target = 512 * 1024usize;
4453        let mut data = Vec::with_capacity(target);
4454        let mut i = 0;
4455        while data.len() < target {
4456            let line = LINES[i % LINES.len()].as_bytes();
4457            let take = line.len().min(target - data.len());
4458            data.extend_from_slice(&line[..take]);
4459            i += 1;
4460        }
4461        let l7 = compress_slice_to_vec(&data, CompressionLevel::Level(7)); // lazy depth1
4462        let l8 = compress_slice_to_vec(&data, CompressionLevel::Level(8)); // lazy2
4463        let l15 = compress_slice_to_vec(&data, CompressionLevel::Level(15)); // btlazy2
4464        assert!(
4465            l8.len() < l7.len() * 2,
4466            "lazy2 over-split periodic stream: l7={} l8={}",
4467            l7.len(),
4468            l8.len()
4469        );
4470        assert!(
4471            l15.len() < l7.len() * 2,
4472            "btlazy2 over-split periodic stream: l7={} l15={}",
4473            l7.len(),
4474            l15.len()
4475        );
4476        for out in [&l7, &l8, &l15] {
4477            let mut decoder = FrameDecoder::new();
4478            let mut round = Vec::with_capacity(data.len());
4479            decoder
4480                .decode_all_to_vec(out, &mut round)
4481                .expect("decode periodic stream");
4482            assert_eq!(round, data, "periodic stream roundtrip mismatch");
4483        }
4484    }
4485
4486    /// End-to-end: a 256 KB payload whose SECOND 128 KB upstream zstd block carries
4487    /// an intra-block fingerprint transition, compressed at Level(5)
4488    /// (greedy, the pre-split path this revision routes through the cheap
4489    /// chunk splitter), round-trips through the crate's own decoder.
4490    ///
4491    /// The transition lives in the second block on purpose: the upstream zstd
4492    /// `savings < 3` gate skips splitting the first block (savings start at
4493    /// 0), so the first block is a homogeneous compressible run that banks
4494    /// savings, and the second block is the one whose intra-block transition
4495    /// `split_block_by_chunks()` resolves into a sub-block boundary (the
4496    /// `pending_input.split_off(...)` path). The test asserts that split
4497    /// decision directly so it cannot silently stop exercising the path if
4498    /// the fixture or params drift, then proves the emitted split frame
4499    /// round-trips. Level 13 (lazy) no longer pre-splits, hence Level 5.
4500    #[test]
4501    fn greedy_chunk_split_roundtrips_through_own_decoder() {
4502        use crate::encoding::CompressionLevel;
4503        let mut data = vec![0u8; 256 * 1024];
4504        // First 128 KB: homogeneous low-entropy run (compressible, banks
4505        // the savings the upstream zstd gate needs). Second 128 KB: low-entropy run
4506        // for its first half, then a counter sequence: a clear intra-block
4507        // fingerprint transition at the 192 KB midpoint for the chunk
4508        // splitter to find.
4509        for (i, byte) in data.iter_mut().enumerate() {
4510            *byte = if i < 192 * 1024 {
4511                (i & 0x07) as u8
4512            } else {
4513                (i % 251 + 1) as u8
4514            };
4515        }
4516
4517        // Directly assert the chunk splitter resolves the second block's
4518        // intra-block transition into a sub-block boundary once savings have
4519        // accrued (the compressible first block banks well over the gate).
4520        let second_block = &data[128 * 1024..];
4521        let split = super::optimal_block_size(
4522            CompressionLevel::Level(5),
4523            second_block,
4524            second_block.len(),
4525            MAX_BLOCK_SIZE as usize,
4526            100,
4527        );
4528        assert!(
4529            split < MAX_BLOCK_SIZE as usize,
4530            "second upstream zstd block must chunk-split at its intra-block transition, got {split}",
4531        );
4532
4533        let mut compressed = Vec::new();
4534        let mut compressor = FrameCompressor::new(CompressionLevel::Level(5));
4535        compressor.set_source(data.as_slice());
4536        compressor.set_drain(&mut compressed);
4537        compressor.compress();
4538
4539        let mut decoder = FrameDecoder::new();
4540        let mut source = compressed.as_slice();
4541        decoder
4542            .reset(&mut source)
4543            .expect("frame header should parse");
4544        while !decoder.is_finished() {
4545            decoder
4546                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4547                .expect("decode should succeed");
4548        }
4549        let mut decoded = Vec::with_capacity(data.len());
4550        decoder.collect_to_writer(&mut decoded).unwrap();
4551        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4552    }
4553
4554    /// Outside-diff coverage for the FAST one-shot path.
4555    /// `compress_slice_to_vec` / `compress_independent_frame` on a Fast level
4556    /// routes through `run_borrowed_block_loop` (not the owned loop the test
4557    /// above covers), which must honour `optimal_block_size` and emit a
4558    /// sub-`MAX_BLOCK_SIZE` boundary rather than fixed 128 KiB blocks. A
4559    /// 256 KiB input is two 128 KiB blocks when unsplit; a chunk boundary in
4560    /// the second block yields >= 3 decoded blocks, asserted on the round-trip.
4561    #[test]
4562    fn fast_oneshot_borrowed_split_emits_subblock() {
4563        use crate::encoding::CompressionLevel;
4564        // First 192 KiB: homogeneous zero run (banks the savings the split
4565        // gate needs). The second 128 KiB block flips to a counter sequence
4566        // at its 64 KiB midpoint (the 192 KiB mark) — a fingerprint
4567        // transition the Fast from-borders splitter (split level 0) resolves
4568        // into a sub-block boundary.
4569        let mut data = vec![0u8; 256 * 1024];
4570        for (i, byte) in data.iter_mut().enumerate() {
4571            if i >= 192 * 1024 {
4572                *byte = (i % 251 + 1) as u8;
4573            }
4574        }
4575
4576        // Pin the splitter decision for the Fast path directly (mirrors the
4577        // greedy test): the second upstream zstd block must resolve to a sub-block
4578        // boundary, so the >= 3 block count below cannot pass vacuously.
4579        let second_block = &data[128 * 1024..];
4580        assert!(
4581            super::optimal_block_size(
4582                CompressionLevel::Fastest,
4583                second_block,
4584                second_block.len(),
4585                MAX_BLOCK_SIZE as usize,
4586                100,
4587            ) < MAX_BLOCK_SIZE as usize,
4588            "fixture must resolve to a sub-block split in the second upstream zstd block",
4589        );
4590
4591        // Drive the borrowed one-shot route explicitly (Fast level ->
4592        // run_borrowed_block_loop via compress_independent_frame).
4593        let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4594        let frame = compressor.compress_independent_frame(&data);
4595
4596        let mut decoder = FrameDecoder::new();
4597        let mut source = frame.as_slice();
4598        decoder
4599            .reset(&mut source)
4600            .expect("frame header should parse");
4601        while !decoder.is_finished() {
4602            decoder
4603                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4604                .expect("decode should succeed");
4605        }
4606        let mut decoded = Vec::with_capacity(data.len());
4607        decoder.collect_to_writer(&mut decoded).unwrap();
4608        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4609        assert!(
4610            decoder.blocks_decoded() >= 3,
4611            "fast one-shot borrowed path must split the second upstream zstd block \
4612             (256 KiB unsplit = 2 blocks), got {} blocks",
4613            decoder.blocks_decoded(),
4614        );
4615    }
4616
4617    /// Regression: `set_compression_level` followed by `compress()` must
4618    /// refresh `state.strategy_tag` through the reset-time sync so the
4619    /// literal-compression gates (`min_literals_to_compress`,
4620    /// `min_gain`) use the NEW level's strategy. Picks a level pair
4621    /// that genuinely crosses strategy bands — `Fastest` resolves to
4622    /// `Fast`, `Level(20)` resolves to `BtUltra2` — so a missed sync
4623    /// would leave the construction-time tag visible and trip the
4624    /// assertion. `CompressionLevel::Best` would also pass type-wise
4625    /// but resolves to `Lazy` today, which keeps `min_literals_to_compress`
4626    /// in the same `shift=3 → 64-byte` band as `Fast` and weakens the
4627    /// signal that the gate floor actually moved.
4628    #[cfg(feature = "std")]
4629    #[test]
4630    fn set_compression_level_then_compress_refreshes_strategy_tag() {
4631        use super::CompressionLevel;
4632        use crate::encoding::strategy::StrategyTag;
4633
4634        let data = vec![0xABu8; 256];
4635        let mut out = Vec::new();
4636        let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
4637        let initial_tag = compressor.state.strategy_tag;
4638        assert_eq!(
4639            initial_tag,
4640            StrategyTag::for_compression_level(CompressionLevel::Fastest),
4641            "construction-time strategy_tag must reflect initial level",
4642        );
4643
4644        // Switch to a level whose resolved strategy lives in a different
4645        // band, then run a full compress cycle — the matcher.reset()
4646        // inside `compress` is the only site that can refresh the tag.
4647        let new_level = CompressionLevel::Level(20);
4648        compressor.set_compression_level(new_level);
4649        compressor.set_source(data.as_slice());
4650        compressor.set_drain(&mut out);
4651        compressor.compress();
4652
4653        let new_tag = compressor.state.strategy_tag;
4654        let expected = StrategyTag::for_compression_level(new_level);
4655        assert_eq!(
4656            new_tag, expected,
4657            "strategy_tag must follow set_compression_level → compress, \
4658             got {new_tag:?} expected {expected:?}",
4659        );
4660        assert_eq!(
4661            expected,
4662            StrategyTag::BtUltra2,
4663            "test fixture invariant: Level(20) must resolve to BtUltra2 \
4664             so the post-switch tag visibly crosses the band boundary",
4665        );
4666        assert_ne!(
4667            new_tag, initial_tag,
4668            "test fixture invariant: chosen levels must resolve to \
4669             different StrategyTag variants",
4670        );
4671    }
4672
4673    /// Magicless mode (`ZSTD_f_zstd1_magicless`): encoded frame
4674    /// MUST NOT start with the 4-byte magic prefix, AND must
4675    /// round-trip through a magicless-aware decoder.
4676    #[test]
4677    fn magicless_frame_omits_magic_and_roundtrips() {
4678        use crate::common::MAGIC_NUM;
4679        let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
4680
4681        // Encode with magicless = true.
4682        let mut output: Vec<u8> = Vec::new();
4683        let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
4684        compressor.set_magicless(true);
4685        compressor.set_source(input.as_slice());
4686        compressor.set_drain(&mut output);
4687        compressor.compress();
4688
4689        // 1. Encoded output must NOT begin with the zstd magic number.
4690        assert!(
4691            !output.starts_with(&MAGIC_NUM.to_le_bytes()),
4692            "magicless frame must omit the 4-byte magic prefix",
4693        );
4694
4695        // 2. A magicless-aware decoder must round-trip the payload.
4696        let mut decoder = crate::decoding::FrameDecoder::new();
4697        decoder.set_magicless(true);
4698        let mut cursor: &[u8] = output.as_slice();
4699        decoder.init(&mut cursor).expect("magicless init");
4700        decoder
4701            .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
4702            .expect("decode_blocks");
4703        let mut decoded: Vec<u8> = Vec::new();
4704        decoder
4705            .collect_to_writer(&mut decoded)
4706            .expect("collect_to_writer");
4707        assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
4708
4709        // 3. A standard (magicful) decoder MUST reject a magicless
4710        //    frame at the header-read step — the first 4 bytes are
4711        //    the frame-header descriptor + window / dictionary / FCS
4712        //    metadata, not the magic. We accept either
4713        //    `BadMagicNumber` (typical case: first 4 bytes don't
4714        //    match `MAGIC_NUM` and don't fall in the skippable-frame
4715        //    magic range) or `SkipFrame` (rare: the first 4 bytes
4716        //    coincidentally land in `0x184D2A50..=0x184D2A5F`). Both
4717        //    prove the standard decoder did not treat the bytes as a
4718        //    real magicful frame.
4719        use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
4720        let mut std_decoder = crate::decoding::FrameDecoder::new();
4721        let std_init = std_decoder.init(output.as_slice());
4722        match std_init {
4723            Err(FrameDecoderError::ReadFrameHeaderError(
4724                ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
4725            )) => {}
4726            other => panic!(
4727                "standard decoder must reject a magicless frame with \
4728                 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
4729            ),
4730        }
4731    }
4732
4733    /// A reused `FrameCompressor` must emit byte-identical frames to a
4734    /// fresh compressor per input across both the borrowed (Fast) and
4735    /// owned (Dfast/Lazy/Greedy/Uncompressed) backends. This proves
4736    /// `prepare_frame` fully resets the per-frame state (matcher window,
4737    /// content hasher, FSE/Huffman seeds) between independent frames; a
4738    /// missed reset would corrupt frame N>=2's header checksum or matches.
4739    /// Each emitted frame must also round-trip.
4740    #[test]
4741    fn compress_independent_frame_reuse_matches_fresh_and_roundtrips() {
4742        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4743        let levels = [
4744            CompressionLevel::Uncompressed,
4745            CompressionLevel::Fastest,
4746            CompressionLevel::Default,
4747            CompressionLevel::Better,
4748            CompressionLevel::Best,
4749            CompressionLevel::Level(5),
4750        ];
4751        let inputs: Vec<Vec<u8>> = vec![
4752            Vec::new(),
4753            vec![0x00],
4754            b"the quick brown fox jumps over the lazy dog\n".to_vec(),
4755            vec![0x7Eu8; 50_000],          // highly compressible
4756            generate_data(0xABCD, 70_000), // pseudo-random
4757            generate_data(0x1234, 200_000),
4758        ];
4759        for level in levels {
4760            let mut cctx: FrameCompressor = FrameCompressor::new(level);
4761            for data in &inputs {
4762                let reused = cctx.compress_independent_frame(data);
4763                let fresh = compress_slice_to_vec(data, level);
4764                assert_eq!(
4765                    reused,
4766                    fresh,
4767                    "reused frame != fresh frame for len={} level={:?}",
4768                    data.len(),
4769                    level,
4770                );
4771                let mut decoder = FrameDecoder::new();
4772                let mut decoded = Vec::with_capacity(data.len());
4773                decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
4774                assert_eq!(
4775                    decoded,
4776                    *data,
4777                    "roundtrip failed for len={} level={:?}",
4778                    data.len(),
4779                    level,
4780                );
4781            }
4782        }
4783    }
4784
4785    /// `compress_independent_frame_into` must replace (not append to) the
4786    /// caller's buffer each call, so a smaller frame after a larger one
4787    /// yields exactly the smaller frame, and the reused buffer's content
4788    /// matches a fresh compression of the same input.
4789    #[test]
4790    fn compress_independent_frame_into_replaces_buffer_contents() {
4791        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4792        let large = vec![0x11u8; 40_000];
4793        let small = b"short payload".to_vec();
4794        let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
4795        let mut out = Vec::new();
4796        cctx.compress_independent_frame_into(&large, &mut out);
4797        let frame_large = out.clone();
4798        // Reusing the same buffer for a smaller frame must clear it first.
4799        cctx.compress_independent_frame_into(&small, &mut out);
4800        assert_eq!(
4801            out,
4802            compress_slice_to_vec(&small, CompressionLevel::Default),
4803            "reused buffer must hold exactly the second frame",
4804        );
4805        // The first frame, captured before reuse, still round-trips.
4806        let mut decoder = FrameDecoder::new();
4807        let mut decoded = Vec::with_capacity(large.len());
4808        decoder
4809            .decode_all_to_vec(&frame_large, &mut decoded)
4810            .unwrap();
4811        assert_eq!(decoded, large);
4812    }
4813
4814    /// A sticky dictionary set once on a reused compressor must be primed
4815    /// into every independent frame (mirroring `ZSTD_CCtx_loadDictionary`):
4816    /// each frame decodes with the dictionary and is byte-identical to a
4817    /// fresh compressor carrying the same dictionary. This proves
4818    /// `prepare_frame` re-primes the dictionary (matcher content + offset
4819    /// history + entropy seed) every call rather than only on the first.
4820    #[test]
4821    fn compress_independent_frame_reuses_sticky_dictionary() {
4822        use crate::encoding::CompressionLevel;
4823        let dict_raw = include_bytes!("../../dict_tests/dictionary");
4824        let dict_content = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
4825        let mut payload_a = Vec::new();
4826        for _ in 0..8 {
4827            payload_a.extend_from_slice(&dict_content.dict_content[..2048]);
4828        }
4829        let payload_b = b"a different second frame payload, still dict-attached".to_vec();
4830        let inputs = [payload_a, payload_b];
4831
4832        let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4833        cctx.set_dictionary_from_bytes(dict_raw)
4834            .expect("dictionary bytes should parse");
4835
4836        for data in &inputs {
4837            let reused = cctx.compress_independent_frame(data);
4838            // Fresh compressor carrying the same sticky dictionary.
4839            let mut fresh_enc: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4840            fresh_enc
4841                .set_dictionary_from_bytes(dict_raw)
4842                .expect("dictionary bytes should parse");
4843            let fresh = fresh_enc.compress_independent_frame(data);
4844            assert_eq!(
4845                reused,
4846                fresh,
4847                "reused dict frame != fresh dict frame, len={}",
4848                data.len(),
4849            );
4850            // Round-trip with the dictionary on the decode side.
4851            let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
4852            let mut decoder = FrameDecoder::new();
4853            decoder.add_dict(dict_for_decoder).unwrap();
4854            let mut decoded = Vec::with_capacity(data.len());
4855            decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
4856            assert_eq!(&decoded, data, "dict roundtrip failed, len={}", data.len());
4857        }
4858    }
4859}