Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::vec::Vec;
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// A dictionary prepared for the ENCODER side, analogous to zstd's `CDict`
21/// (vs the decoder's [`Dictionary`](crate::decoding::Dictionary) / `DDict`).
22///
23/// It carries the entropy tables, content, and repeat-offset history the
24/// compressor needs, but is a distinct type with **no decode path**: there is
25/// no way to turn it into a [`DictionaryHandle`](crate::decoding::DictionaryHandle)
26/// or feed it to a [`FrameDecoder`](crate::decoding::FrameDecoder). That keeps
27/// the compress-only state (which may have been parsed without building the
28/// decode lookup tables, see
29/// [`set_dictionary_from_bytes`](FrameCompressor::set_dictionary_from_bytes))
30/// from ever reaching the decode side — the encoder/decoder dictionary split
31/// mirrors C zstd's `CDict` / `DDict`.
32#[derive(Clone)]
33pub struct EncoderDictionary {
34    pub(crate) inner: crate::decoding::Dictionary,
35}
36
37impl EncoderDictionary {
38    /// Wrap an already-parsed [`Dictionary`](crate::decoding::Dictionary) for
39    /// encoder use. A fully-decoded dictionary is valid here; only the encoder
40    /// entropy tables, content, and offset history are read.
41    pub fn from_dictionary(dictionary: crate::decoding::Dictionary) -> Self {
42        Self { inner: dictionary }
43    }
44
45    /// Parse a serialized dictionary blob for encoder use, skipping the decode
46    /// lookup-table build the encoder never reads (see
47    /// `Dictionary::decode_dict_for_encoding`). The encoder entropy tables — and
48    /// thus the emitted frame — are identical to a full parse.
49    pub fn from_bytes(
50        raw_dictionary: &[u8],
51    ) -> Result<Self, crate::decoding::errors::DictionaryDecodeError> {
52        Ok(Self {
53            inner: crate::decoding::Dictionary::decode_dict_for_encoding(raw_dictionary)?,
54        })
55    }
56
57    /// The dictionary id.
58    ///
59    /// A dictionary attached for encoding always has a non-zero id (the
60    /// `set_dictionary*` / `set_encoder_dictionary` attach path rejects a
61    /// zero id). This getter, however, reflects the wrapped dictionary as-is:
62    /// an `EncoderDictionary` built via [`Self::from_dictionary`] from a raw
63    /// `Dictionary` with `id == 0` reports `0` here until it is attached.
64    pub fn id(&self) -> u32 {
65        self.inner.id
66    }
67}
68
69/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
70///
71/// `FrameCompressor` will generally be used by:
72/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
73/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
74///
75/// # Examples
76/// ```
77/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
78/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
79/// let mut output = std::vec::Vec::new();
80/// // Initialize a compressor.
81/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
82/// compressor.set_source(mock_data);
83/// compressor.set_drain(&mut output);
84///
85/// // `compress` writes the compressed output into the provided buffer.
86/// compressor.compress();
87/// ```
88pub struct FrameCompressor<
89    R: Read = &'static [u8],
90    W: Write = Vec<u8>,
91    M: Matcher = MatchGeneratorDriver,
92> {
93    uncompressed_data: Option<R>,
94    compressed_data: Option<W>,
95    compression_level: CompressionLevel,
96    dictionary: Option<EncoderDictionary>,
97    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
98    source_size_hint: Option<u64>,
99    state: CompressState<M>,
100    /// When true, emitted frames omit the 4-byte magic number prefix
101    /// (`ZSTD_f_zstd1_magicless`). Default false. The caller is
102    /// responsible for ensuring the decoder is configured for the
103    /// matching format — wire-format only round-trips with a
104    /// magicless-aware decoder.
105    magicless: bool,
106    /// Whether to emit a trailing XXH64 content checksum and set the frame
107    /// header's `Content_Checksum_flag` (semantics of upstream
108    /// `ZSTD_c_checksumFlag`). Default `false`, matching the upstream
109    /// library default; combined with the `hash` feature at frame-build
110    /// time, so without `hash` no checksum is emitted regardless. Set via
111    /// [`Self::set_content_checksum`].
112    content_checksum: bool,
113    /// Whether to record `Frame_Content_Size` in the frame header when the
114    /// total size is known (semantics of upstream `ZSTD_c_contentSizeFlag`).
115    /// Default `true`, matching upstream. With the flag off the header
116    /// carries a window descriptor instead (single-segment requires an FCS,
117    /// so it is disabled too). Set via [`Self::set_content_size_flag`].
118    content_size_flag: bool,
119    /// Whether to record the dictionary ID in the frame header when a
120    /// dictionary is attached (semantics of upstream `ZSTD_c_dictIDFlag`).
121    /// Default `true`, matching upstream. Decoders can still decode the
122    /// frame by being handed the right dictionary explicitly. Set via
123    /// [`Self::set_dictionary_id_flag`].
124    dict_id_flag: bool,
125    /// Upper bound on emitted block sizes (semantics of upstream
126    /// `ZSTD_c_targetCBlockSize`): capping the RAW block length at the
127    /// target bounds every physical block's compressed payload at the
128    /// target too (a compressed block never exceeds its raw input — the
129    /// raw-block fallback fires otherwise), so blocks land at or under
130    /// `target + 3` header bytes on the wire. `None` = no target (full
131    /// 128 KiB blocks). Set via [`Self::set_target_block_size`].
132    target_block_size: Option<u32>,
133    #[cfg(feature = "hash")]
134    hasher: XxHash64,
135    /// Block-layout introspection populated at the end of every
136    /// successful `compress()`. `None` until the first call.
137    /// Behind the `lsm` feature gate.
138    #[cfg(feature = "lsm")]
139    frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
140    /// When `true`, `compress()` XXH64-hashes each block's
141    /// uncompressed bytes and appends the low-32-bit digest to
142    /// `block_checksums`. Default `false` (zero cost). Gated on
143    /// `all(lsm, hash)` because XXH64 lives behind the `hash`
144    /// feature; an `lsm`-only build has no way to compute digests.
145    #[cfg(all(feature = "lsm", feature = "hash"))]
146    per_block_checksums_enabled: bool,
147    /// Per-block XXH64 (low 32 bits) digests captured during
148    /// `compress()` when `per_block_checksums_enabled` is set. Ordered
149    /// by block-emit order. `None` until the first call after enabling.
150    /// Gated on `all(lsm, hash)` (see `per_block_checksums_enabled`).
151    #[cfg(all(feature = "lsm", feature = "hash"))]
152    block_checksums: Option<alloc::vec::Vec<u32>>,
153    /// Per-physical-block decompressed (regenerated) sizes captured
154    /// during `compress()`, in block-emit order (1:1 with
155    /// `frame_emit_info.blocks`). Always captured under `lsm` (no
156    /// opt-in, unlike `block_checksums`) because `FrameEmitInfo` is
157    /// always built under `lsm` and `decompressed_byte_range` needs
158    /// the per-block sizes. Cleared and refilled per frame.
159    #[cfg(feature = "lsm")]
160    block_decompressed_sizes: alloc::vec::Vec<u32>,
161    /// Effective strategy tag when a public-parameter
162    /// [`Strategy`](crate::encoding::Strategy) override (#27) is active.
163    /// `Some` overrides the level-derived `state.strategy_tag` so the
164    /// literal-compression gates and dict-attach cutoff see the strategy
165    /// the matcher actually runs, not the base level's. `None` keeps the
166    /// level-derived tag.
167    strategy_override: Option<crate::encoding::strategy::StrategyTag>,
168}
169
170#[derive(Clone, Default)]
171pub(crate) struct CachedDictionaryEntropy {
172    pub(crate) huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
173    pub(crate) ll_previous: Option<PreviousFseTable>,
174    pub(crate) ml_previous: Option<PreviousFseTable>,
175    pub(crate) of_previous: Option<PreviousFseTable>,
176}
177
178impl CachedDictionaryEntropy {
179    /// Heap bytes the cached dictionary entropy holds: the literals Huffman
180    /// table plus any `Custom` LL/ML/OF FSE tables (the `Arc`-boxed `FSETable`
181    /// payload and its flat state array). `Default` / `Rle` variants own no heap.
182    pub(crate) fn heap_size(&self) -> usize {
183        let mut total = self.huff.as_ref().map_or(0, |h| h.heap_size());
184        for prev in [&self.ll_previous, &self.ml_previous, &self.of_previous] {
185            if let Some(PreviousFseTable::Custom(table)) = prev {
186                total +=
187                    core::mem::size_of::<crate::fse::fse_encoder::FSETable>() + table.heap_size();
188            }
189        }
190        total
191    }
192
193    /// Derive the encoder-side entropy tables a dictionary seeds for the first
194    /// block of each frame (the donor `cdict->cBlockState`): the literals
195    /// Huffman table plus the literal-length / match-length / offset FSE
196    /// "previous" tables. Shared by [`FrameCompressor`] and
197    /// [`crate::encoding::StreamingEncoder`] so both seed identically.
198    pub(crate) fn from_dictionary(dictionary: &crate::decoding::Dictionary) -> Self {
199        Self {
200            huff: dictionary.huf.table.to_encoder_table(),
201            ll_previous: dictionary
202                .fse
203                .literal_lengths
204                .to_encoder_table()
205                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
206            ml_previous: dictionary
207                .fse
208                .match_lengths
209                .to_encoder_table()
210                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
211            of_previous: dictionary
212                .fse
213                .offsets
214                .to_encoder_table()
215                .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
216        }
217    }
218}
219
220/// Shared owner for a custom "previous" FSE encoder table. `Arc` on
221/// atomic-pointer targets, `Rc` otherwise (keeps `no_std` no-atomics
222/// builds compiling, single-thread there anyway), mirroring
223/// `decoding::dictionary::SharedDictionary`. Cloning the cached
224/// dictionary entropy into the per-frame state is then a refcount bump,
225/// not a full `FSETable` copy — the donor references `cdict->cBlockState`
226/// instead of rebuilding it per frame.
227#[cfg(target_has_atomic = "ptr")]
228pub(crate) type SharedFseTable = alloc::sync::Arc<FSETable>;
229#[cfg(not(target_has_atomic = "ptr"))]
230pub(crate) type SharedFseTable = alloc::rc::Rc<FSETable>;
231
232#[derive(Clone)]
233pub(crate) enum PreviousFseTable {
234    // Default tables are immutable and already stored alongside the state, so
235    // repeating them only needs a lightweight marker instead of cloning FSETable.
236    Default,
237    // Shared handle: cloning (per-frame dictionary entropy seed) is a refcount
238    // bump. The table is only ever read or REPLACED wholesale (a block that
239    // builds a new table swaps in a fresh `SharedFseTable`), never mutated in
240    // place, so sharing is sound.
241    Custom(SharedFseTable),
242    Rle(u8),
243}
244
245impl PreviousFseTable {
246    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
247        match self {
248            Self::Default => Some(default),
249            Self::Custom(table) => Some(table),
250            Self::Rle(_) => None,
251        }
252    }
253}
254
255pub(crate) struct FseTables {
256    /// The three predefined LL/ML/OF tables are functions of
257    /// compile-time-constant distributions. The
258    /// [`fse_encoder::FseDefaultTable`] type alias resolves to
259    /// `&'static FSETable` when a process-wide cache is available
260    /// (atomic-pointer targets, or no-atomic targets with the
261    /// `critical-section` feature) and to `Box<FSETable>` on the
262    /// cache-less no-atomic path (one per-frame allocation, dropped
263    /// with the compressor — no `Box::leak`, no unbounded growth).
264    /// Both arms `Deref` to `FSETable`, so consumers in
265    /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
266    /// without seeing the per-target divergence.
267    pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
268    pub(crate) ll_previous: Option<PreviousFseTable>,
269    pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
270    pub(crate) ml_previous: Option<PreviousFseTable>,
271    pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
272    pub(crate) of_previous: Option<PreviousFseTable>,
273}
274
275impl FseTables {
276    pub fn new() -> Self {
277        Self {
278            ll_default: default_ll_table(),
279            ll_previous: None,
280            ml_default: default_ml_table(),
281            ml_previous: None,
282            of_default: default_of_table(),
283            of_previous: None,
284        }
285    }
286
287    /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
288    /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
289    /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
290    /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
291    /// through `Box`. Both arms yield `&FSETable` uniformly so
292    /// downstream consumers can stay cfg-agnostic.
293    #[inline]
294    #[allow(clippy::borrow_deref_ref)]
295    pub(crate) fn ll_default_ref(&self) -> &FSETable {
296        &*self.ll_default
297    }
298
299    /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
300    #[inline]
301    #[allow(clippy::borrow_deref_ref)]
302    pub(crate) fn ml_default_ref(&self) -> &FSETable {
303        &*self.ml_default
304    }
305
306    /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
307    #[inline]
308    #[allow(clippy::borrow_deref_ref)]
309    pub(crate) fn of_default_ref(&self) -> &FSETable {
310        &*self.of_default
311    }
312}
313
314const PRESPLIT_BLOCK_MIN: usize = 3500;
315const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
316const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
317const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
318const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
319const PRESPLIT_HASH_LOG_MAX: usize = 10;
320const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
321const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
322/// Donor `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
323/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
324/// drive the cheap border heuristic; a third one from the middle disambiguates
325/// where in the block the transition sits.
326const PRESPLIT_BORDERS_SEGMENT: usize = 512;
327
328#[derive(Clone)]
329struct PreSplitFingerprint {
330    events: [u32; PRESPLIT_HASH_TABLE_SIZE],
331    nb_events: usize,
332}
333
334impl Default for PreSplitFingerprint {
335    fn default() -> Self {
336        Self {
337            events: [0; PRESPLIT_HASH_TABLE_SIZE],
338            nb_events: 0,
339        }
340    }
341}
342
343/// Grow `out` ahead of the next block so block emission never lands on an
344/// amortized-doubling reallocation mid-frame (whose transient old+new copy
345/// spikes peak memory to ~3x the output), sizing the reservation from the
346/// compression ratio observed so far instead of the whole-input worst case.
347///
348/// `blocks_start` is where this frame's blocks begin in `out`, `consumed`
349/// the input bytes already emitted as blocks, `remaining` the input
350/// bytes still to compress (an estimate is fine: a low one only means one
351/// more re-estimate later), and `block_capacity` the active block-size cap
352/// (`FrameCompressor::block_capacity`) so a small `targetCBlockSize` does
353/// not keep a 128 KiB floor in the buffer or undercount header density.
354/// Incompressible input re-estimates to ~the full `compress_bound` after
355/// the first block — the old up-front policy's worst case — while
356/// compressible input stays at output scale.
357fn reserve_for_next_block(
358    out: &mut Vec<u8>,
359    blocks_start: usize,
360    consumed: u64,
361    remaining: usize,
362    block_capacity: usize,
363) {
364    // Worst-case single-block output: 3-byte header + raw payload, plus
365    // slack for the 4-byte frame checksum trailer and a few extra sub-block
366    // headers from the post-split emitters, so neither can reallocate.
367    let block_bound = remaining.min(block_capacity) + 3 + 16;
368    if out.capacity() - out.len() >= block_bound {
369        return;
370    }
371    let produced = (out.len() - blocks_start) as u64;
372    let estimate = if consumed == 0 {
373        // No ratio signal yet (capacity exhausted before the first block —
374        // only reachable with a caller-shrunk `out`): one block's bound.
375        block_bound
376    } else {
377        // remaining * observed ratio + per-block headers + 1/16 slack so a
378        // slightly-worsening tail doesn't force a reallocation per block.
379        // u128 keeps the product exact for multi-GiB frames.
380        let scaled = ((remaining as u128 * produced as u128) / consumed as u128) as u64;
381        let headers = (remaining as u64 / block_capacity.max(1) as u64 + 1) * 3;
382        usize::try_from(scaled + scaled / 16 + headers + 64).unwrap_or(usize::MAX)
383    };
384    // `reserve_exact`: the estimate already carries its own slack, and the
385    // whole-buffer doubling policy is exactly what this function exists to
386    // avoid. The `produced`-sized floor keeps growth geometric when the
387    // ratio estimate lands BELOW one block's bound (highly compressible
388    // input): without it every block would trigger a block-sized
389    // reallocation — O(blocks) buffer copies — while with it the buffer at
390    // least doubles its produced span per reallocation (O(log) copies) and
391    // the peak stays at output scale.
392    out.reserve_exact(estimate.max(block_bound + produced as usize));
393}
394
395fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
396    debug_assert!(hash_log >= 8);
397    if hash_log == 8 {
398        return bytes[0] as usize;
399    }
400    debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
401    let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
402    (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
403}
404
405fn presplit_record_fingerprint(
406    fp: &mut PreSplitFingerprint,
407    src: &[u8],
408    sampling_rate: usize,
409    hash_log: usize,
410) {
411    fp.events.fill(0);
412    fp.nb_events = 0;
413    if src.len() < 2 {
414        return;
415    }
416    let limit = src.len() - 1;
417    let mut n = 0usize;
418    while n < limit {
419        fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
420        n += sampling_rate;
421    }
422    // Donor parity: zstd_preSplit.c records the integer division, not the
423    // rounded-up number of sampled events from the loop above.
424    fp.nb_events += limit / sampling_rate;
425}
426
427/// Single-byte histogram pass — matches donor `HIST_add` over a small
428/// segment with `hashLog == 8` (the `hash2` shortcut at
429/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
430/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
431/// heuristic doesn't pay for that wider hash on its 512-byte windows.
432fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
433    fp.events.fill(0);
434    for &b in src {
435        fp.events[b as usize] += 1;
436    }
437    // Donor `HIST_add` returns the maximum symbol; the caller then sets
438    // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
439    fp.nb_events = src.len();
440}
441
442fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
443    let slots = 1usize << hash_log;
444    let mut distance = 0u64;
445    for idx in 0..slots {
446        let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
447        let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
448        // Plain `+`: events/nb_events are per-block sample counts (<= block
449        // size), so each |left-right| <= (2^17)^2 and the sum over <= 2^hash_log
450        // slots stays far under u64::MAX — no overflow.
451        distance += left.abs_diff(right) as u64;
452    }
453    distance
454}
455
456fn presplit_fingerprints_differ(
457    reference: &PreSplitFingerprint,
458    new_fp: &PreSplitFingerprint,
459    penalty: i32,
460    hash_log: usize,
461) -> bool {
462    debug_assert!(reference.nb_events > 0);
463    debug_assert!(new_fp.nb_events > 0);
464    let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
465    let deviation = presplit_distance(reference, new_fp, hash_log);
466    // Plain `*`: p50 <= (block-sample-count)^2 and the (base+penalty) factor is
467    // a small constant, so the product stays well under u64::MAX.
468    let threshold =
469        p50 * (PRESPLIT_THRESHOLD_BASE + penalty as u64) / PRESPLIT_THRESHOLD_PENALTY_RATE;
470    deviation >= threshold
471}
472
473fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
474    // Plain `+`: `acc` accumulates only the chunks of a single block (caller
475    // loops within one block, <= MAX_BLOCK_SIZE), so the merged sample counts
476    // stay far under u32 / usize bounds — no overflow.
477    for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
478        acc.events[idx] += new_fp.events[idx];
479    }
480    acc.nb_events += new_fp.nb_events;
481}
482
483fn split_block_by_chunks(block: &[u8], level: usize) -> usize {
484    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
485    debug_assert!((1..=4).contains(&level));
486    let (sampling_rate, hash_log) = match level - 1 {
487        0 => (43, 8),
488        1 => (11, 9),
489        2 => (5, 10),
490        _ => (1, 10),
491    };
492
493    let mut past = PreSplitFingerprint::default();
494    let mut new_events = PreSplitFingerprint::default();
495    let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
496    presplit_record_fingerprint(
497        &mut past,
498        &block[..PRESPLIT_CHUNK_SIZE],
499        sampling_rate,
500        hash_log,
501    );
502    let mut pos = PRESPLIT_CHUNK_SIZE;
503    while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
504        presplit_record_fingerprint(
505            &mut new_events,
506            &block[pos..pos + PRESPLIT_CHUNK_SIZE],
507            sampling_rate,
508            hash_log,
509        );
510        if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
511            return pos;
512        }
513        presplit_merge_events(&mut past, &new_events);
514        if penalty > 0 {
515            penalty -= 1;
516        }
517        pos += PRESPLIT_CHUNK_SIZE;
518    }
519    block.len()
520}
521
522/// Donor port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
523/// Records two 512-byte byte-histograms — one from each end of a 128 KB
524/// block — and a third from the middle as a tie-breaker; returns either
525/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
526/// size when the two ends look indistinguishable. Cheaper than the
527/// chunk-based path because it touches at most 1.5 KB of input
528/// regardless of block size.
529fn split_block_from_borders(block: &[u8]) -> usize {
530    debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
531    let block_size = block.len();
532    let mut past = PreSplitFingerprint::default();
533    let mut new_fp = PreSplitFingerprint::default();
534    presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
535    presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
536    // Donor uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
537    // distance with no threshold padding (`zstd_preSplit.c:214`).
538    if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
539        return block_size;
540    }
541
542    let mut middle = PreSplitFingerprint::default();
543    let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
544    presplit_record_byte_histogram(
545        &mut middle,
546        &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
547    );
548
549    let dist_from_begin = presplit_distance(&past, &middle, 8);
550    let dist_from_end = presplit_distance(&new_fp, &middle, 8);
551    // Donor `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
552    // if the middle is roughly equidistant from both ends, the change
553    // sits near the centre — split at the midpoint.
554    let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
555    if dist_from_begin.abs_diff(dist_from_end) < min_distance {
556        return 64 * 1024;
557    }
558    // Larger `dist_from_begin` (i.e. `middle` farther from the head
559    // fingerprint, equivalently closer to the tail) means the new
560    // statistics already dominate the centre — the transition
561    // happened EARLY → emit a small 32 KB head and let the 96 KB
562    // tail absorb the rest. Inverse case: `dist_from_end` larger
563    // (middle still resembles the head) means the transition is
564    // LATE → emit a 96 KB head so the trailing 32 KB carries the
565    // new statistics alone.
566    if dist_from_begin > dist_from_end {
567        32 * 1024
568    } else {
569        96 * 1024
570    }
571}
572
573/// XXH64 (low 32 bits, seed 0) over `data`. Shared helper for the
574/// per-physical-block checksum sidecar so encoder and decoder hash
575/// the exact same byte ranges with the exact same parameters. Gated
576/// at `all(lsm, hash)` because the only consumer is the lsm-side
577/// `block_checksums` sidecar; non-lsm builds carry no reference to
578/// this helper at all.
579#[cfg(all(feature = "lsm", feature = "hash"))]
580#[inline]
581pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
582    let mut h = XxHash64::with_seed(0);
583    h.write(data);
584    h.finish() as u32
585}
586
587/// Bench-only entry point for the donor-parity comparator test in
588/// `tests/block_splitter_donor_parity.rs`. Dispatches to the same
589/// `_from_borders` (split_level == 0) / `_by_chunks` (split_level ∈
590/// 1..=4) ports that `optimal_block_size` itself routes
591/// through. Caller is responsible for passing exactly
592/// `MAX_BLOCK_SIZE` bytes (per donor `ZSTD_splitBlock` contract —
593/// "@blockSize must be == 128 KB" in `zstd_preSplit.h`).
594#[cfg(feature = "bench_internals")]
595pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
596    assert_eq!(
597        block.len(),
598        MAX_BLOCK_SIZE as usize,
599        "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
600    );
601    assert!(
602        split_level <= 4,
603        "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
604    );
605    if split_level == 0 {
606        split_block_from_borders(block)
607    } else {
608        split_block_by_chunks(block, split_level)
609    }
610}
611
612/// Pull a pre-split window into cache with one bandwidth-bound sequential
613/// pass before the strided fingerprint histogram + match scan read it.
614///
615/// The borrowed (no-copy) over-window path matches in place on the caller's
616/// input, so the pre-split fingerprint is the FIRST touch of that 128 KiB
617/// region — a cache-cold read. `presplit_record_fingerprint` reads it with a
618/// `sampling_rate` stride and interleaved random writes into the 1 KiB events
619/// table, a latency-bound pattern that pays full DRAM miss latency per line
620/// (measured ~3x the cost of an ERMS streaming read of the same bytes). The
621/// owned path never hits this because its history-mirror copy already warmed
622/// the bytes; this restores that warmth without the copy's write half. One
623/// dependent load per 64-byte line (the i9 line size) streams under the
624/// hardware prefetcher, so the cold read is paid once at memory bandwidth and
625/// every subsequent strided sample lands in L1/L2. `black_box` keeps the loop
626/// from being optimized away as a dead read.
627#[inline]
628fn warm_presplit_window(window: &[u8]) {
629    let mut acc = 0u8;
630    let mut i = 0usize;
631    while i < window.len() {
632        acc ^= window[i];
633        i += 64;
634    }
635    core::hint::black_box(acc);
636}
637
638pub(crate) fn optimal_block_size(
639    level: CompressionLevel,
640    block: &[u8],
641    remaining_src_size: usize,
642    block_size_max: usize,
643    savings: i64,
644) -> usize {
645    let Some(split_level) = crate::encoding::match_generator::level_pre_split(level) else {
646        return remaining_src_size.min(block_size_max);
647    };
648    if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
649        return remaining_src_size.min(block_size_max);
650    }
651    if savings < 3 {
652        return MAX_BLOCK_SIZE as usize;
653    }
654    if block.len() < MAX_BLOCK_SIZE as usize {
655        return remaining_src_size.min(block_size_max);
656    }
657    // Donor `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
658    // `split_level == 0` → cheap borders heuristic;
659    // `split_level == 1..=4` → byChunks with internal sampling level
660    // `split_level - 1`.
661    let raw_split = if split_level == 0 {
662        split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
663    } else {
664        split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
665    };
666    raw_split
667        .max(PRESPLIT_BLOCK_MIN)
668        .min(MAX_BLOCK_SIZE as usize)
669}
670
671pub(crate) struct CompressState<M: Matcher> {
672    pub(crate) matcher: M,
673    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
674    /// Recycled `HuffmanTable` buffers: when a block clears or replaces
675    /// `last_huff_table`, the old table parks here instead of dropping, so
676    /// the next frame's dictionary entropy seed `clone_from`s into existing
677    /// allocations. Without this, every dict-seeded frame whose last block
678    /// ended raw/RLE paid a fresh two-Vec table clone per frame.
679    pub(crate) huff_table_spare: Option<crate::huff0::huff0_encoder::HuffmanTable>,
680    pub(crate) fse_tables: FseTables,
681    pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
682    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
683    /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
684    pub(crate) offset_hist: [u32; 3],
685    /// Strategy tag resolved from the current `CompressionLevel` at every
686    /// `matcher.reset()` call. Used by the literal-compression gates
687    /// (`min_literals_to_compress`, `min_gain`) in
688    /// `encoding::blocks::compressed` to mirror donor's strategy-aware
689    /// thresholds (`zstd_compress_literals.c:114-127, 187-188`).
690    ///
691    /// **Invariant (required of every construction site):** must be
692    /// initialized from the active `CompressionLevel` via
693    /// `StrategyTag::for_compression_level`, and re-synced from the
694    /// active level alongside every `matcher.reset()` call so the
695    /// level-aware gates stay correct after a level change. The two
696    /// reset sites that own this sync are `FrameCompressor::compress`
697    /// and `StreamingEncoder::ensure_frame_started`. There is no
698    /// `Default` impl — production constructors
699    /// (`FrameCompressor::new`, `new_with_matcher`, the streaming
700    /// encoder constructor) plumb this explicitly. Tests that build
701    /// `CompressState` by hand must also supply a value.
702    pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
703}
704
705impl<M: Matcher> CompressState<M> {
706    /// Clears `last_huff_table`, parking the table's buffers in
707    /// `huff_table_spare` for reuse instead of dropping them.
708    #[inline]
709    pub(crate) fn clear_huff_table(&mut self) {
710        if let Some(table) = self.last_huff_table.take() {
711            self.huff_table_spare = Some(table);
712        }
713    }
714
715    /// Replaces `last_huff_table` with `table`, parking any displaced table
716    /// in `huff_table_spare` for reuse.
717    #[inline]
718    pub(crate) fn replace_huff_table(&mut self, table: crate::huff0::huff0_encoder::HuffmanTable) {
719        if let Some(old) = self.last_huff_table.replace(table) {
720            self.huff_table_spare = Some(old);
721        }
722    }
723}
724
725/// Per-frame setup resolved once by [`FrameCompressor::prepare_frame`] and
726/// consumed by the block loop + [`FrameCompressor::finish_frame`]. Lets the
727/// owned `compress()` and the borrowed one-shot path share identical
728/// reset / dict-prime / entropy-seed setup and frame-tail emission.
729struct FramePrep {
730    window_size: u64,
731    use_dictionary_state: bool,
732    source_size_hint_known: bool,
733    initial_size_hint: Option<u64>,
734}
735
736/// Initial capacity for the `all_blocks` accumulator, by source-size hint.
737/// The frame header is written only after all input is read (so
738/// Frame_Content_Size is known), so compressed blocks accumulate in memory
739/// first. Seed-size tiers (mirrors donor `ZSTD_CStreamOutSize` naming):
740/// - tiny (`<= 4 KiB` hint): payload-bound seed, `>=` anything a tiny input's
741///   compressed output could need.
742/// - small (`<= 64 KiB` hint): absorbs one or two `Vec::extend` doublings
743///   without over-allocating.
744/// - default (one donor block, `130 KiB`): the value the rest of the encoder
745///   is sized around; larger inputs amortise the first doublings cheaply and
746///   the residue is dominated by internal `compress_block_encoded` buffers.
747///
748/// Shared by the owned (`run_owned_block_loop`) and borrowed
749/// (`run_borrowed_block_loop`) paths so the tier table can't drift between them.
750///
751/// `block_capacity` (the active `targetCBlockSize` cap, or the 128 KiB
752/// format ceiling) bounds every tier: with a small target the first
753/// allocation tracks one capped block + header/checksum slack instead of
754/// keeping the donor-sized floor that only later growth respects.
755fn initial_all_blocks_cap(initial_size_hint: Option<u64>, block_capacity: usize) -> usize {
756    const TINY_THRESHOLD: u64 = 4 * 1024;
757    const SMALL_THRESHOLD: u64 = 64 * 1024;
758    const TINY_CAP: usize = 4 * 1024;
759    const SMALL_CAP: usize = 16 * 1024;
760    const DEFAULT_CAP: usize = 130 * 1024;
761    let first_block_cap = block_capacity + 3 + 16;
762    match initial_size_hint {
763        Some(h) if h <= TINY_THRESHOLD => TINY_CAP.min(first_block_cap),
764        Some(h) if h <= SMALL_THRESHOLD => SMALL_CAP.min(first_block_cap),
765        _ => DEFAULT_CAP.min(first_block_cap),
766    }
767}
768
769/// Per-block feeder for `run_owned_block_loop`.
770///
771/// `fill_block` appends source bytes to `buf` (which already holds any
772/// carried pre-split suffix) until `buf.len() == block_capacity` or the
773/// source is exhausted, returning `(bytes_appended, reached_eof)`.
774/// `reached_eof` is true iff the block could NOT be filled to
775/// `block_capacity` — the boundary the historical `Read`-loop produced (an
776/// input that is an exact multiple of the block size still yields a
777/// trailing empty last block on the next iteration).
778///
779/// The slice impl exists so the slice entry points
780/// (`compress_independent_frame_into`, `compress_oneshot_*` fallbacks)
781/// append with one `extend_from_slice` — the generic reader impl must
782/// `resize` an initialized target region before `Read::read` can fill it,
783/// which costs a zero-fill memset of the whole block on every frame.
784pub(crate) trait OwnedBlockSource {
785    fn fill_block(
786        &mut self,
787        buf: &mut Vec<u8>,
788        block_capacity: usize,
789        size_hint_remaining: Option<u64>,
790    ) -> (usize, bool);
791}
792
793impl OwnedBlockSource for &[u8] {
794    fn fill_block(
795        &mut self,
796        buf: &mut Vec<u8>,
797        block_capacity: usize,
798        _size_hint_remaining: Option<u64>,
799    ) -> (usize, bool) {
800        let want = block_capacity - buf.len();
801        let take = want.min(self.len());
802        buf.extend_from_slice(&self[..take]);
803        *self = &self[take..];
804        (take, take < want)
805    }
806}
807
808/// Adapter routing a generic [`Read`] source through [`OwnedBlockSource`]:
809/// preserves the historical sizing behaviour — an initialized target region
810/// bounded by the source-size hint, grown (doubling, capped) only when the
811/// hint under-counted.
812pub(crate) struct ReaderBlockSource<Rd>(pub(crate) Rd);
813
814impl<Rd: Read> OwnedBlockSource for ReaderBlockSource<Rd> {
815    fn fill_block(
816        &mut self,
817        buf: &mut Vec<u8>,
818        block_capacity: usize,
819        size_hint_remaining: Option<u64>,
820    ) -> (usize, bool) {
821        let start = buf.len();
822        let mut filled = start;
823        let mut reached_eof = false;
824        // Size the read buffer to the bytes this block actually expects
825        // rather than always zero-filling a full MAX_BLOCK_SIZE: a small
826        // frame otherwise pays a 128 KiB `resize(_, 0)` memset per block
827        // just to read a few KiB (the zero-fill past `filled` is then
828        // truncated away).
829        //
830        // Overflow-free by construction (no `saturating_*` masking):
831        // `filled <= block_capacity` always (the read only ever targets
832        // `[filled..len]` with `len <= block_capacity`, and a carried-over
833        // pre-split suffix is a `split_off` below `block_capacity`), so
834        // `block_capacity - filled` never underflows; pinning `remaining`
835        // to `block_capacity` before the `usize` cast keeps the cast and
836        // the final add within `usize` on every target.
837        let initial_target = match size_hint_remaining {
838            Some(remaining) => {
839                let remaining = remaining.min(block_capacity as u64) as usize;
840                filled + remaining.min(block_capacity - filled)
841            }
842            // Unknown hint, or an inexact hint already met by prior blocks:
843            // read against the full block window.
844            None => block_capacity,
845        };
846        if buf.len() < initial_target {
847            buf.resize(initial_target, 0);
848        }
849        loop {
850            if reached_eof || filled == block_capacity {
851                break;
852            }
853            if filled == buf.len() {
854                // Hint under-counted the block; grow toward block_capacity
855                // (doubling, capped) so reading continues without paying a
856                // full-buffer zero up front. `len <= block_capacity` so the
857                // double stays well within `usize`; `filled < block_capacity`
858                // here (the `== block_capacity` break fired otherwise), so
859                // `filled + 1 <= block_capacity`.
860                let grow_to = (buf.len() * 2).clamp(filled + 1, block_capacity);
861                buf.resize(grow_to, 0);
862            }
863            let read_end = buf.len();
864            let new_bytes = self.0.read(&mut buf[filled..read_end]).unwrap();
865            if new_bytes == 0 {
866                reached_eof = true;
867                break;
868            }
869            filled += new_bytes;
870        }
871        buf.truncate(filled);
872        (filled - start, reached_eof)
873    }
874}
875
876impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
877    /// Create a new `FrameCompressor`
878    pub fn new(compression_level: CompressionLevel) -> Self {
879        Self {
880            uncompressed_data: None,
881            compressed_data: None,
882            compression_level,
883            dictionary: None,
884            dictionary_entropy_cache: None,
885            source_size_hint: None,
886            state: CompressState {
887                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
888                last_huff_table: None,
889                huff_table_spare: None,
890                fse_tables: FseTables::new(),
891                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
892                offset_hist: [1, 4, 8],
893                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
894                    compression_level,
895                ),
896            },
897            magicless: false,
898            content_checksum: false,
899            content_size_flag: true,
900            dict_id_flag: true,
901            target_block_size: None,
902            #[cfg(feature = "hash")]
903            hasher: XxHash64::with_seed(0),
904            #[cfg(feature = "lsm")]
905            frame_emit_info: None,
906            #[cfg(all(feature = "lsm", feature = "hash"))]
907            per_block_checksums_enabled: false,
908            #[cfg(all(feature = "lsm", feature = "hash"))]
909            block_checksums: None,
910            #[cfg(feature = "lsm")]
911            block_decompressed_sizes: alloc::vec::Vec::new(),
912            strategy_override: None,
913        }
914    }
915
916    /// Configure fine-grained compression parameters (#27).
917    ///
918    /// Resets the base [`CompressionLevel`](crate::encoding::CompressionLevel)
919    /// to the parameters' level and installs the per-knob overrides
920    /// (window/hash/chain/search logs, strategy, LDM) applied at the next
921    /// frame. Pass `None`-equivalent (a builder that overrides nothing)
922    /// to fall back to plain level-based compression.
923    ///
924    /// ```rust
925    /// use structured_zstd::encoding::{
926    ///     CompressionLevel, CompressionParameters, FrameCompressor, Strategy,
927    /// };
928    /// let params = CompressionParameters::builder(CompressionLevel::Level(19))
929    ///     .strategy(Strategy::Btultra2)
930    ///     .enable_long_distance_matching(true)
931    ///     .build()
932    ///     .unwrap();
933    /// let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
934    /// compressor.set_parameters(&params);
935    /// let compressed = compressor.compress_independent_frame(b"some data to compress");
936    /// assert!(!compressed.is_empty());
937    /// ```
938    pub fn set_parameters(&mut self, params: &crate::encoding::CompressionParameters) {
939        self.compression_level = params.level();
940        let overrides = params.overrides();
941        self.strategy_override = overrides.strategy.map(|s| s.tag());
942        // Keep `state.strategy_tag` consistent immediately so the borrowed
943        // one-shot eligibility gate (`borrowed_eligible`) and literal gates
944        // are correct even before the next `compress()` re-sync.
945        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
946            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
947        });
948        self.state.matcher.set_param_overrides(Some(overrides));
949    }
950
951    /// Whether the borrowed (no per-block history copy) one-shot loop is
952    /// valid for an `input_len`-byte slice under the resolved `prep`.
953    ///
954    /// `Uncompressed` resolves to `StrategyTag::Fast` but must emit stored
955    /// Raw blocks, which the borrowed loop's
956    /// `compress_block_encoded_borrowed` (RLE/raw-fast/compressed) does NOT
957    /// do, so exclude it; it then takes the owned path's dedicated
958    /// Uncompressed arm.
959    ///
960    /// No window-size gate: over-window inputs are handled too. The owned
961    /// path bounds matches to the last `advertised_window` bytes via
962    /// `window_low` and evicts/rehashes its history; the borrowed path
963    /// computes the identical `window_low = block_end - advertised_window`
964    /// and the kernel rejects any hash candidate below it, while the
965    /// per-position `put` during the scan keeps in-window slots current,
966    /// so it produces byte-identical output to the owned (evicting) path
967    /// without ever copying the input into `history`, even when the input
968    /// far exceeds the window.
969    ///
970    /// BUT gate on `input_len <= u32::MAX`: the Fast kernel stores ABSOLUTE
971    /// positions in a `u32` hash table, and the borrowed scan walks
972    /// absolute input offsets up to `block_end == input.len()`. Past 4 GiB
973    /// those offsets truncate / overflow the `u32` position math
974    /// (`base_off + ip0 as u32`, `window_low`), panicking or corrupting.
975    /// The owned/evicting path keeps the scanned window bounded (positions
976    /// stay small), so >4 GiB inputs fall back to it.
977    fn borrowed_eligible(&self, input_len: usize, prep: &FramePrep) -> bool {
978        if prep.use_dictionary_state
979            || matches!(self.compression_level, CompressionLevel::Uncompressed)
980            || input_len > u32::MAX as usize
981        {
982            return false;
983        }
984        // The borrowed (no-copy, in-place over-window) scan exists for the
985        // Simple (Fast), Dfast, and Row backends, and for the HashChain
986        // backend's lazy CHAIN parser; BT/optimal (BinaryTree search) stay on
987        // the owned path. Every borrowed scan applies the per-position
988        // `window_low = abs_ip - advertised_window` offset cap so over-window
989        // inputs are matched in place (no input->history copy), matching C's
990        // continuous-index + windowLow one-shot behaviour.
991        self.state.matcher.borrowed_supported()
992    }
993
994    /// Compress `input` as one frame's worth of blocks into `out` (appended
995    /// from its current end): the borrowed in-place loop when
996    /// [`Self::borrowed_eligible`], else the owned (history-copying) loop fed
997    /// an in-place `&[u8]` cursor. Returns `total_uncompressed`; the caller
998    /// emits the frame header (before this call, when the content size is
999    /// known) or the drain tail.
1000    fn run_one_frame(&mut self, input: &[u8], prep: &FramePrep, out: &mut Vec<u8>) -> u64 {
1001        if self.borrowed_eligible(input.len(), prep) {
1002            self.run_borrowed_block_loop(input, out)
1003        } else {
1004            let mut cursor: &[u8] = input;
1005            self.run_owned_block_loop(&mut cursor, prep.initial_size_hint, true, out)
1006        }
1007    }
1008
1009    /// Compress one contiguous `&[u8]` as a single independent Zstd frame,
1010    /// writing the frame bytes into `out` (its previous contents are
1011    /// replaced and its allocation reused), reusing this compressor's heavy
1012    /// state across calls.
1013    ///
1014    /// This is the reusable-compression-context (CCtx-equivalent) entry
1015    /// point, mirroring C `ZSTD_compress2` over a reused `ZSTD_CCtx`:
1016    /// construct ONE `FrameCompressor` and call this in a loop to emit N
1017    /// independent, self-describing frames (each carrying its own header,
1018    /// blocks, and checksum, decodable in isolation, with no cross-frame
1019    /// match history). Every call resets the per-frame state via
1020    /// [`Self::prepare_frame`]: only the allocations are kept, so the
1021    /// dominant per-frame setup cost (table allocation + dictionary prime)
1022    /// is paid once instead of N times. Passing the same `out` buffer each
1023    /// call additionally reuses the output allocation, matching C's
1024    /// caller-owned `dst` buffer (no per-frame output allocation).
1025    ///
1026    /// Reusing the context + `out` across many small frames (the typical
1027    /// per-block-frame workload) is far cheaper than a fresh
1028    /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec)
1029    /// per block, which allocates and primes from scratch each time.
1030    ///
1031    /// The input is read in place: no [`Self::set_source`] /
1032    /// [`Self::set_drain`] setup is required, and the input lifetime is not
1033    /// baked into the compressor type, so successive calls may pass slices
1034    /// with unrelated lifetimes. When the Fast (Simple) backend is active
1035    /// and no dictionary is set, the matcher references the input directly
1036    /// (no per-block history copy); other backends / dictionary use copy
1037    /// each block into history exactly as the streaming
1038    /// [`compress`](Self::compress) path does. The source-size hint is
1039    /// derived from the input length on every call, so per-frame table
1040    /// sizing tracks each frame's actual size regardless of any earlier
1041    /// hint.
1042    ///
1043    /// A sticky dictionary set via
1044    /// [`set_dictionary`](Self::set_dictionary) (or its variants) is primed
1045    /// into every frame, mirroring `ZSTD_CCtx_loadDictionary` /
1046    /// `ZSTD_CCtx_refCDict`.
1047    ///
1048    /// # Panics
1049    ///
1050    /// Panics on encoder error, matching [`Self::compress`] and
1051    /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec).
1052    pub fn compress_independent_frame_into(&mut self, input: &[u8], out: &mut Vec<u8>) {
1053        // Size the next frame from the actual payload, not a stale hint a
1054        // previous call may have left behind (a wrong hint would change the
1055        // resolved window/header and could flip borrowed eligibility).
1056        self.source_size_hint = Some(input.len() as u64);
1057        let prep = self.prepare_frame();
1058        // Content size is known up front (one-shot), so write the frame
1059        // header FIRST and emit blocks STRAIGHT into `out` — no separate
1060        // `all_blocks` accumulator and no header+blocks copy (which was the
1061        // dominant per-frame memmove + the only un-amortized per-frame alloc
1062        // even when the compressor is reused).
1063        let total_uncompressed = input.len() as u64;
1064        let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1065        let checksum_len = if emit_checksum { 4 } else { 0 };
1066        out.clear();
1067        // Reserve the header plus ONE block's worst case up front; the block
1068        // loops then grow `out` from the compression ratio observed so far
1069        // (`reserve_for_next_block`). Reserving `compress_bound(input_len)`
1070        // here held a whole-input-sized allocation for the entire frame —
1071        // ~100 MiB peak on a 100 MiB stream whose compressed output is a few
1072        // MiB, where the reference implementation's context peaks at
1073        // window-sized state. Small frames (<= one block) still get their
1074        // full bound in one shot, so the reused-`out` steady state is
1075        // unchanged. 18 = max frame header (magic 4 + descriptor 1 + window
1076        // 1 + dict id 4 + FCS 8).
1077        let first_block_bound = input.len().min(self.block_capacity()) + 3;
1078        out.reserve(18 + first_block_bound + checksum_len);
1079        self.append_frame_header(total_uncompressed, &prep, out);
1080        let header_len = out.len();
1081        let _ = self.run_one_frame(input, &prep, out);
1082        #[cfg(feature = "hash")]
1083        if self.content_checksum {
1084            out.extend_from_slice(&(self.hasher.finish() as u32).to_le_bytes());
1085        }
1086        #[cfg(feature = "lsm")]
1087        {
1088            let blocks_end = out.len() - checksum_len;
1089            self.populate_frame_emit_info(header_len, &out[header_len..blocks_end], emit_checksum);
1090        }
1091        #[cfg(not(feature = "lsm"))]
1092        let _ = header_len;
1093    }
1094
1095    /// Convenience wrapper over [`Self::compress_independent_frame_into`]
1096    /// that allocates and returns a fresh `Vec` per call. Prefer the
1097    /// `_into` form in tight per-block-frame loops to reuse one output
1098    /// buffer across frames (the CCtx-equivalent zero-per-call-alloc
1099    /// output, matching C's caller-owned `dst`).
1100    ///
1101    /// ```rust
1102    /// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
1103    /// let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
1104    /// let frame_a = cctx.compress_independent_frame(b"first block payload");
1105    /// let frame_b = cctx.compress_independent_frame(b"second block payload");
1106    /// assert!(!frame_a.is_empty() && !frame_b.is_empty());
1107    /// ```
1108    pub fn compress_independent_frame(&mut self, input: &[u8]) -> Vec<u8> {
1109        let mut out = Vec::new();
1110        self.compress_independent_frame_into(input, &mut out);
1111        out
1112    }
1113
1114    /// Borrowed one-shot block loop: walks `input` in `MAX_BLOCK_SIZE`
1115    /// strides (the Fast backend never pre-splits, so boundaries match the
1116    /// owned loop), scanning each block range in place against the
1117    /// borrowed window via `compress_block_encoded_borrowed` — no
1118    /// per-block `commit_space` copy. Returns `(all_blocks,
1119    /// total_uncompressed)`. Caller guarantees Fast backend + no
1120    /// dictionary; over-window inputs are fine (matches are bounded by
1121    /// `window_low` exactly as the owned evicting path).
1122    fn run_borrowed_block_loop(&mut self, input: &[u8], out: &mut Vec<u8>) -> u64 {
1123        // Blocks are appended to `out` starting here. `out` may already hold
1124        // the frame header (the one-shot compress-into-Vec path writes it
1125        // first, since the content size is known up front, and the loop
1126        // emits blocks straight after it — no separate `all_blocks` Vec and
1127        // no header+blocks copy). Output-size reads below are taken RELATIVE
1128        // to `blocks_start` so a header prefix never skews the donor split
1129        // `savings` gate (which would change block boundaries / wire output).
1130        let blocks_start = out.len();
1131        let total_uncompressed = input.len() as u64;
1132        // Empty input: emit a single empty last Raw block (mirrors the
1133        // owned loop's empty-file special case).
1134        if input.is_empty() {
1135            let header = BlockHeader {
1136                last_block: true,
1137                block_type: crate::blocks::block::BlockType::Raw,
1138                block_size: 0,
1139            };
1140            header.serialize(out);
1141            #[cfg(feature = "lsm")]
1142            self.block_decompressed_sizes.push(0);
1143            #[cfg(all(feature = "lsm", feature = "hash"))]
1144            if let Some(checksums) = self.block_checksums.as_mut() {
1145                checksums.push(xxh64_block_low32(&[]));
1146            }
1147            return total_uncompressed;
1148        }
1149        // SAFETY: `input` outlives this call (held by the caller across
1150        // the call) and is not mutated. Only the Simple backend is active
1151        // (gated by `compress_oneshot_borrowed`).
1152        unsafe {
1153            self.state.matcher.set_borrowed_window(input);
1154        }
1155        // Panic-safety: clear the borrowed `(ptr, len)` on EVERY exit,
1156        // including an unwind from an `assert!` inside the block loop, so
1157        // a caught-and-reused compressor never retains a dangling window.
1158        // (The next frame's `reset()` also clears it before any read, but
1159        // this guard makes the invariant local and unwind-proof.)
1160        struct ClearBorrowedOnDrop(*mut MatchGeneratorDriver);
1161        impl Drop for ClearBorrowedOnDrop {
1162            fn drop(&mut self) {
1163                // SAFETY: at drop (normal return or unwind) the loop's
1164                // borrows of the matcher have ended, so this is the only
1165                // access. `addr_of_mut!` produced this pointer without an
1166                // intermediate `&mut`, so the interleaved `&mut` uses in
1167                // the loop did not invalidate it.
1168                unsafe { (*self.0).clear_borrowed_window() };
1169            }
1170        }
1171        let _clear_guard = ClearBorrowedOnDrop(core::ptr::addr_of_mut!(self.state.matcher));
1172        let block_capacity = self.block_capacity();
1173        let mut start = 0usize;
1174        while start < input.len() {
1175            reserve_for_next_block(
1176                out,
1177                blocks_start,
1178                start as u64,
1179                input.len() - start,
1180                block_capacity,
1181            );
1182            // Donor `ZSTD_compress_frameChunk`: size each block via the cheap
1183            // fingerprint pre-splitter so a full 128 KiB block is cut at a
1184            // statistical boundary when it pays. `savings = consumed -
1185            // produced` mirrors the donor gate (the first block and
1186            // incompressible input keep the full 128 KiB). The borrowed window
1187            // already spans the whole input, so a smaller block is just a
1188            // narrower `(block_start, block_end)` range into it.
1189            let savings = start as i64 - (out.len() - blocks_start) as i64;
1190            // Borrowed path only: warm the pre-split window before the
1191            // cache-cold strided fingerprint read. Gated to exactly the
1192            // conditions under which `optimal_block_size` reads `block`
1193            // (a pre-split level, a full 128 KiB block remaining, the
1194            // block-size cap admits a full block, and `savings >= 3` so the
1195            // splitter actually runs) — so non-pre-split levels, the first
1196            // block, and the trailing partial block pay nothing. See
1197            // `warm_presplit_window`.
1198            if savings >= 3
1199                && input.len() - start >= MAX_BLOCK_SIZE as usize
1200                && block_capacity >= MAX_BLOCK_SIZE as usize
1201                && crate::encoding::match_generator::level_pre_split(self.compression_level)
1202                    .is_some()
1203            {
1204                warm_presplit_window(&input[start..start + MAX_BLOCK_SIZE as usize]);
1205            }
1206            let block_len = optimal_block_size(
1207                self.compression_level,
1208                &input[start..],
1209                input.len() - start,
1210                block_capacity,
1211                savings,
1212            );
1213            let end = (start + block_len).min(input.len());
1214            let block = &input[start..end];
1215            let last_block = end == input.len();
1216            #[cfg(feature = "hash")]
1217            if self.content_checksum {
1218                self.hasher.write(block);
1219            }
1220            crate::encoding::levels::compress_block_encoded_borrowed(
1221                &mut self.state,
1222                self.compression_level,
1223                last_block,
1224                block,
1225                start,
1226                end,
1227                out,
1228                #[cfg(feature = "lsm")]
1229                Some(&mut self.block_decompressed_sizes),
1230                #[cfg(all(feature = "lsm", feature = "hash"))]
1231                self.block_checksums.as_mut(),
1232            );
1233            start = end;
1234        }
1235        // `_clear_guard` drops here, clearing the borrowed window.
1236        total_uncompressed
1237    }
1238}
1239
1240impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
1241    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
1242    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
1243        Self {
1244            uncompressed_data: None,
1245            compressed_data: None,
1246            dictionary: None,
1247            dictionary_entropy_cache: None,
1248            source_size_hint: None,
1249            state: CompressState {
1250                matcher,
1251                last_huff_table: None,
1252                huff_table_spare: None,
1253                fse_tables: FseTables::new(),
1254                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
1255                offset_hist: [1, 4, 8],
1256                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
1257                    compression_level,
1258                ),
1259            },
1260            compression_level,
1261            magicless: false,
1262            content_checksum: false,
1263            content_size_flag: true,
1264            dict_id_flag: true,
1265            target_block_size: None,
1266            #[cfg(feature = "hash")]
1267            hasher: XxHash64::with_seed(0),
1268            #[cfg(feature = "lsm")]
1269            frame_emit_info: None,
1270            #[cfg(all(feature = "lsm", feature = "hash"))]
1271            per_block_checksums_enabled: false,
1272            #[cfg(all(feature = "lsm", feature = "hash"))]
1273            block_checksums: None,
1274            #[cfg(feature = "lsm")]
1275            block_decompressed_sizes: alloc::vec::Vec::new(),
1276            strategy_override: None,
1277        }
1278    }
1279
1280    /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
1281    ///
1282    /// When set to `true`, emitted frames omit the 4-byte magic number
1283    /// prefix. The matching decoder must be configured to expect a
1284    /// magicless stream — wire-format only round-trips with a
1285    /// magicless-aware decoder.
1286    pub fn set_magicless(&mut self, magicless: bool) {
1287        self.magicless = magicless;
1288    }
1289
1290    /// Enable or disable the trailing XXH64 content checksum
1291    /// (semantics of upstream `ZSTD_c_checksumFlag`). Default `false`,
1292    /// matching the upstream library default (`ZSTD_c_checksumFlag = 0`)
1293    /// so out-of-the-box frames carry the same layout and pay the same
1294    /// costs as the reference implementation.
1295    ///
1296    /// When `false`, emitted frames set `Content_Checksum_flag = 0` and carry
1297    /// no trailing digest; such frames are valid (RFC 8878) and decode
1298    /// correctly in any [`ContentChecksum`](crate::decoding::ContentChecksum)
1299    /// mode. Without the `hash` feature no checksum is emitted regardless of
1300    /// this setting.
1301    pub fn set_content_checksum(&mut self, emit: bool) {
1302        self.content_checksum = emit;
1303    }
1304
1305    /// Enable or disable recording `Frame_Content_Size` in the frame header
1306    /// when the total size is known (semantics of upstream
1307    /// `ZSTD_c_contentSizeFlag`). Default `true`, matching upstream. With
1308    /// the flag off the header carries a window descriptor instead (and the
1309    /// single-segment layout, which requires an FCS, is disabled).
1310    pub fn set_content_size_flag(&mut self, emit: bool) {
1311        self.content_size_flag = emit;
1312    }
1313
1314    /// Enable or disable recording the dictionary ID in the frame header
1315    /// when a dictionary is attached (semantics of upstream
1316    /// `ZSTD_c_dictIDFlag`). Default `true`, matching upstream. Frames
1317    /// emitted with the flag off still decode when the decoder is handed
1318    /// the dictionary explicitly.
1319    pub fn set_dictionary_id_flag(&mut self, emit: bool) {
1320        self.dict_id_flag = emit;
1321    }
1322
1323    /// Set an upper bound on emitted block sizes (semantics of upstream
1324    /// `ZSTD_c_targetCBlockSize`): every physical block's payload is capped
1325    /// at `target` bytes (+3-byte block header on the wire), trading some
1326    /// ratio for bounded per-block latency. The value is clamped to
1327    /// `[MIN_TARGET_BLOCK_SIZE, MAX_BLOCK_SIZE]` (the upstream bounds).
1328    /// `None` removes the target.
1329    pub fn set_target_block_size(&mut self, target: Option<u32>) {
1330        self.target_block_size = target.map(|t| {
1331            t.clamp(
1332                crate::common::MIN_TARGET_BLOCK_SIZE,
1333                crate::common::MAX_BLOCK_SIZE,
1334            )
1335        });
1336    }
1337
1338    /// The active block-size cap: the configured target, or the format's
1339    /// 128 KiB block ceiling.
1340    fn block_capacity(&self) -> usize {
1341        self.target_block_size
1342            .map_or(crate::common::MAX_BLOCK_SIZE as usize, |t| t as usize)
1343    }
1344
1345    /// Before calling [FrameCompressor::compress] you need to set the source.
1346    ///
1347    /// This is the data that is compressed and written into the drain.
1348    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
1349        self.uncompressed_data.replace(uncompressed_data)
1350    }
1351
1352    /// Before calling [FrameCompressor::compress] you need to set the drain.
1353    ///
1354    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
1355    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
1356        self.compressed_data.replace(compressed_data)
1357    }
1358
1359    /// Provide a hint about the total uncompressed size for the next frame.
1360    ///
1361    /// When set, the encoder selects smaller hash tables and windows for
1362    /// small inputs, matching the C zstd source-size-class behavior.
1363    ///
1364    /// This hint applies only to frame payload bytes (`size`). Dictionary
1365    /// history is primed separately and does not inflate the hinted size or
1366    /// advertised frame window.
1367    /// Must be called before [`compress`](Self::compress).
1368    pub fn set_source_size_hint(&mut self, size: u64) {
1369        self.source_size_hint = Some(size);
1370    }
1371
1372    /// Total heap bytes this compressor's allocations hold, excluding the
1373    /// inline struct: the match-finder tables / history / recycled buffers and
1374    /// the primed-dictionary snapshot (via the matcher), the retained
1375    /// Huffman tables (active + recycled spare), the retained dictionary
1376    /// content, the cached dictionary entropy tables (literals Huffman +
1377    /// LL/ML/OF FSE), and the per-block sidecar buffers. Lets a context
1378    /// report its true footprint through `ZSTD_sizeof_CCtx`.
1379    pub fn heap_size(&self) -> usize {
1380        let mut total = self.state.matcher.heap_size();
1381        total += self
1382            .state
1383            .last_huff_table
1384            .as_ref()
1385            .map_or(0, |table| table.heap_size());
1386        total += self
1387            .state
1388            .huff_table_spare
1389            .as_ref()
1390            .map_or(0, |table| table.heap_size());
1391        total += self
1392            .dictionary
1393            .as_ref()
1394            .map_or(0, |d| d.inner.dict_content.capacity());
1395        total += self
1396            .dictionary_entropy_cache
1397            .as_ref()
1398            .map_or(0, CachedDictionaryEntropy::heap_size);
1399        #[cfg(all(feature = "lsm", feature = "hash"))]
1400        {
1401            total += self
1402                .block_checksums
1403                .as_ref()
1404                .map_or(0, |v| v.capacity() * core::mem::size_of::<u32>());
1405        }
1406        #[cfg(feature = "lsm")]
1407        {
1408            total += self.block_decompressed_sizes.capacity() * core::mem::size_of::<u32>();
1409        }
1410        total
1411    }
1412
1413    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
1414    ///
1415    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
1416    /// All compressed blocks are buffered in memory so that the frame header can include the
1417    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
1418    /// entire frame — header, blocks, and optional checksum — is then written to the drain
1419    /// at the end. This means peak memory usage is O(compressed_size).
1420    ///
1421    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
1422    /// [Read::take] function
1423    /// Per-frame setup values resolved by [`Self::prepare_frame`] and
1424    /// consumed by the block loop + [`Self::finish_frame`]. Lets the
1425    /// owned `compress()` and the borrowed one-shot path share the exact
1426    /// same reset / dict-prime / entropy-seed setup and frame tail.
1427    pub fn compress(&mut self) {
1428        let prep = self.prepare_frame();
1429        // Take the reader out so `run_owned_block_loop` can borrow it
1430        // mutably alongside `&mut self` (the rest of the loop touches
1431        // `self.state` / `self.hasher`, disjoint from the reader). Restored
1432        // before the frame tail so a reused compressor keeps its source.
1433        //
1434        // Deliberately NOT restored on unwind: if the block loop panics the
1435        // source has been partially consumed, so handing it back would let a
1436        // `catch_unwind` caller "successfully" compress the remaining tail
1437        // from an arbitrary midpoint — silent data corruption. Leaving the
1438        // slot empty makes any post-panic reuse fail loudly at the `expect`
1439        // below (matcher/entropy state is equally unre-usable after an
1440        // unwind; the reference implementation likewise requires a context
1441        // reset after an error).
1442        let mut source = self
1443            .uncompressed_data
1444            .take()
1445            .expect("source must be set via set_source before compress()");
1446        // Streaming drain: the content size is only known at EOF, so the
1447        // frame header can't precede the blocks — accumulate them in a local
1448        // buffer and let `finish_frame` write header + blocks to the drain.
1449        let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap(
1450            prep.initial_size_hint,
1451            self.block_capacity(),
1452        ));
1453        let mut block_source = ReaderBlockSource(&mut source);
1454        let total_uncompressed = self.run_owned_block_loop(
1455            &mut block_source,
1456            prep.initial_size_hint,
1457            false,
1458            &mut all_blocks,
1459        );
1460        self.uncompressed_data = Some(source);
1461        self.finish_frame(all_blocks, total_uncompressed, &prep);
1462    }
1463
1464    fn prepare_frame(&mut self) -> FramePrep {
1465        // Reset per-frame introspection state so a re-used compressor
1466        // doesn't carry over the previous frame's layout/checksums.
1467        #[cfg(feature = "lsm")]
1468        {
1469            self.frame_emit_info = None;
1470            // Always captured under lsm (drives `decompressed_byte_range`);
1471            // clear, keep the allocation for a reused compressor.
1472            self.block_decompressed_sizes.clear();
1473        }
1474        #[cfg(all(feature = "lsm", feature = "hash"))]
1475        {
1476            if self.per_block_checksums_enabled {
1477                self.block_checksums = Some(alloc::vec::Vec::new());
1478            } else {
1479                self.block_checksums = None;
1480            }
1481        }
1482        let initial_size_hint = self.source_size_hint;
1483        let source_size_hint_known = initial_size_hint.is_some();
1484        let use_dictionary_state =
1485            !matches!(self.compression_level, CompressionLevel::Uncompressed)
1486                && self.state.matcher.supports_dictionary_priming()
1487                && self.dictionary.is_some();
1488        if let Some(size_hint) = self.source_size_hint.take() {
1489            // Keep source-size hint scoped to payload bytes; dictionary priming
1490            // is applied separately and should not force larger matcher sizing.
1491            self.state.matcher.set_source_size_hint(size_hint);
1492        }
1493        // Hand the matcher the dictionary's content size so its binary-tree /
1494        // hash-chain tables shrink to the dictionary's cParams tier (donor CDict
1495        // economics: the dictionary supplies long matches, so a source-sized live
1496        // table is wasted peak memory). The eviction window stays source-sized so
1497        // the dictionary bytes remain referenceable. Set before `reset` (which
1498        // consumes it) and only when a dictionary will actually be primed.
1499        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1500            self.state
1501                .matcher
1502                .set_dictionary_size_hint(dict.inner.dict_content.len());
1503        }
1504        // Clearing buffers to allow re-using of the compressor
1505        self.state.matcher.reset(self.compression_level);
1506        self.state.offset_hist = [1, 4, 8];
1507        // Sync `state.strategy_tag` to the level resolved at this reset so
1508        // the literal-compression gates (`min_literals_to_compress` /
1509        // `min_gain` in `encoding::blocks::compressed`) see the correct
1510        // strategy for the next frame. Frame-by-frame level changes go
1511        // through this same `compress()` entry point, so re-syncing here
1512        // covers level switches without touching the matcher dispatch.
1513        // A public-parameter strategy override (#27) wins over the level's
1514        // derived tag so the literal-compression gates and dict-attach
1515        // cutoff below see the strategy the matcher actually runs.
1516        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
1517            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
1518        });
1519        let cached_entropy = if use_dictionary_state {
1520            self.dictionary_entropy_cache.as_ref()
1521        } else {
1522            None
1523        };
1524        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1525            // This state drives sequence encoding, while matcher priming below updates
1526            // the match generator's internal repeat-offset history for match finding.
1527            self.state.offset_hist = dict.inner.offset_hist;
1528            // Donor `ZSTD_shouldAttachDict` (`zstd_compress.c`): a
1529            // precomputed-dictionary table is COPIED into the working context
1530            // only when the source is larger than a per-strategy cutoff; at or
1531            // below it (and for unknown size) the donor ATTACHES the dictionary
1532            // tables by reference (no per-frame table touch at all). We don't
1533            // have an attach-by-reference path yet, so:
1534            //   - large source (> cutoff): reuse the captured prime snapshot
1535            //     (a table copy) instead of re-hashing the dictionary — the
1536            //     donor COPY regime, where the copy is cheaper than re-priming;
1537            //   - small / unknown source: re-prime (the snapshot copy of the
1538            //     whole table would cost MORE than the sparse re-prime here,
1539            //     which is exactly why the donor attaches by reference instead).
1540            // `attachDictSizeCutoffs` per strategy: fast 8K, dfast 16K,
1541            // greedy/lazy/btopt 32K, btultra/btultra2 8K. Expressed as the
1542            // ceil-log bucket (8K = 2^13, 16K = 2^14, 32K = 2^15) so the
1543            // decision uses the SAME bucketed representation as the driver's
1544            // attach/copy gate (`reset_size_log`) — comparing
1545            // `source_size_ceil_log(hint)` on the full u64 avoids the `as usize`
1546            // truncation that could diverge from the driver on 32-bit targets.
1547            // For a power-of-two cutoff `2^k`, `ceil_log2(hint) > k` is exactly
1548            // `hint > 2^k`, so this is identical to the raw `hint > cutoff` on
1549            // 64-bit.
1550            let cutoff_log = match self.state.strategy_tag {
1551                crate::encoding::strategy::StrategyTag::Fast
1552                | crate::encoding::strategy::StrategyTag::BtUltra
1553                | crate::encoding::strategy::StrategyTag::BtUltra2 => 13,
1554                crate::encoding::strategy::StrategyTag::Dfast => 14,
1555                crate::encoding::strategy::StrategyTag::Greedy
1556                | crate::encoding::strategy::StrategyTag::Lazy
1557                | crate::encoding::strategy::StrategyTag::Btlazy2
1558                | crate::encoding::strategy::StrategyTag::BtOpt => 15,
1559            };
1560            let prefer_copy_snapshot = initial_size_hint.is_some_and(|s| {
1561                crate::encoding::match_generator::source_size_ceil_log(s) > cutoff_log
1562            });
1563            let restored = prefer_copy_snapshot
1564                && self
1565                    .state
1566                    .matcher
1567                    .restore_primed_dictionary(self.compression_level);
1568            if !restored {
1569                self.state.matcher.prime_with_dictionary(
1570                    dict.inner.dict_content.as_slice(),
1571                    dict.inner.offset_hist,
1572                );
1573                if prefer_copy_snapshot {
1574                    self.state
1575                        .matcher
1576                        .capture_primed_dictionary(self.compression_level);
1577                }
1578            }
1579        }
1580        if let Some(cache) = cached_entropy {
1581            // Refill an empty slot from the recycled spare before
1582            // `clone_from`: `Option::clone_from(None ← Some)` falls back to
1583            // a fresh clone (two Vec allocations), while `Some ← Some`
1584            // delegates to the table's buffer-reusing `clone_from`. Frames
1585            // whose last block cleared the table would otherwise re-clone
1586            // the dict seed every frame.
1587            match &cache.huff {
1588                Some(src) => {
1589                    if self.state.last_huff_table.is_none() {
1590                        self.state.last_huff_table = self.state.huff_table_spare.take();
1591                    }
1592                    match &mut self.state.last_huff_table {
1593                        Some(dst) => dst.clone_from(src),
1594                        slot => *slot = Some(src.clone()),
1595                    }
1596                }
1597                None => self.state.clear_huff_table(),
1598            }
1599        } else {
1600            self.state.clear_huff_table();
1601        }
1602        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
1603        // reusing existing allocations where possible instead of reallocating every frame.
1604        if let Some(cache) = cached_entropy {
1605            self.state
1606                .fse_tables
1607                .ll_previous
1608                .clone_from(&cache.ll_previous);
1609            self.state
1610                .fse_tables
1611                .ml_previous
1612                .clone_from(&cache.ml_previous);
1613            self.state
1614                .fse_tables
1615                .of_previous
1616                .clone_from(&cache.of_previous);
1617        } else {
1618            self.state.fse_tables.ll_previous = None;
1619            self.state.fse_tables.ml_previous = None;
1620            self.state.fse_tables.of_previous = None;
1621        }
1622        let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
1623            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1624            _ => None,
1625        });
1626        let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
1627            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1628            _ => None,
1629        });
1630        let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
1631            Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1632            _ => None,
1633        });
1634        self.state.matcher.seed_dictionary_entropy(
1635            self.state.last_huff_table.as_ref(),
1636            ll_entropy,
1637            ml_entropy,
1638            of_entropy,
1639        );
1640        #[cfg(feature = "hash")]
1641        {
1642            self.hasher = XxHash64::with_seed(0);
1643        }
1644        let window_size = self.state.matcher.window_size();
1645        assert!(
1646            window_size != 0,
1647            "matcher reported window_size == 0, which is invalid"
1648        );
1649        FramePrep {
1650            window_size,
1651            use_dictionary_state,
1652            source_size_hint_known,
1653            initial_size_hint,
1654        }
1655    }
1656
1657    /// Owned streaming block loop: reads blocks from the caller-provided
1658    /// `source` reader, optionally pre-splits, hashes for the content
1659    /// checksum, and emits each block via `compress_block_encoded`,
1660    /// accumulating the block bytes. Returns `(all_blocks,
1661    /// total_uncompressed)`. The source is passed in (rather than read
1662    /// from `self.uncompressed_data`) so the streaming `compress` path can
1663    /// feed the configured reader while the slice paths
1664    /// (`compress_oneshot_borrowed`, `compress_independent_frame`) feed an
1665    /// in-place `&[u8]` cursor without baking its lifetime into the
1666    /// compressor type.
1667    fn run_owned_block_loop<S: OwnedBlockSource>(
1668        &mut self,
1669        source: &mut S,
1670        initial_size_hint: Option<u64>,
1671        // Whether `initial_size_hint` is the input's exact length (the
1672        // one-shot slice paths) or a caller-provided estimate (the streaming
1673        // `Read` path, where `set_source_size_hint` is advisory). An exact
1674        // hint drives the one-shot ratio reservation; an estimate is only
1675        // trusted up to a small lookahead past the bytes actually read.
1676        hint_is_exact: bool,
1677        out: &mut Vec<u8>,
1678    ) -> u64 {
1679        // Compressed blocks are appended to `out` from its current end. The
1680        // streaming drain path passes a fresh buffer (the frame header is
1681        // written to the drain afterward, since Frame_Content_Size is only
1682        // known once the reader hits EOF); the one-shot compress-into-Vec
1683        // path passes `out` already holding the header. The donor split
1684        // `savings` gate below accumulates block-relative (`before_len`)
1685        // output deltas, so a header prefix never skews it.
1686        let blocks_start = out.len();
1687        let mut total_uncompressed: u64 = 0;
1688        let mut pending_input: Vec<u8> = Vec::new();
1689        let mut reached_eof = false;
1690        let mut savings = 0i64;
1691        // Compress block by block
1692        loop {
1693            // Read up to one donor block. When the pre-block splitter keeps a
1694            // suffix, top it back up before compressing the next block, matching
1695            // ZSTD_compress_frameChunk() over a contiguous input buffer.
1696            let block_capacity = self.block_capacity();
1697            // Always draw the block buffer from the matcher's recycled pool
1698            // (its capacity already covers the block size, so the resize below
1699            // stays in-place). Any carried pre-split suffix is copied in, and
1700            // `pending_input` is retained as a reusable carry buffer. The prior
1701            // approach `split_off`'d a fresh suffix Vec per pre-split and
1702            // `reserve_exact`-grew it to `block_capacity` every block; on a
1703            // heavily pre-split frame that churned one block-sized allocation
1704            // per split (~12 MB over ~90 splits on a 1 MiB corpus input).
1705            let mut uncompressed_data = self.state.matcher.get_next_space();
1706            uncompressed_data.clear();
1707            uncompressed_data.extend_from_slice(&pending_input);
1708            pending_input.clear();
1709            if !reached_eof {
1710                // Remaining-bytes expectation for the reader source's sizing
1711                // (`None` = unknown, or an inexact hint already met by prior
1712                // blocks). The slice source appends directly and ignores it.
1713                let size_hint_remaining = match initial_size_hint {
1714                    Some(hint) if hint > total_uncompressed => Some(hint - total_uncompressed),
1715                    _ => None,
1716                };
1717                let (appended, eof) =
1718                    source.fill_block(&mut uncompressed_data, block_capacity, size_hint_remaining);
1719                total_uncompressed += appended as u64;
1720                reached_eof = eof;
1721            }
1722            let mut last_block = reached_eof;
1723            let remaining_for_split = if reached_eof {
1724                uncompressed_data.len()
1725            } else {
1726                block_capacity
1727            };
1728            if !matches!(self.compression_level, CompressionLevel::Uncompressed)
1729                && uncompressed_data.len() == block_capacity
1730            {
1731                let block_len = optimal_block_size(
1732                    self.compression_level,
1733                    &uncompressed_data,
1734                    remaining_for_split,
1735                    block_capacity,
1736                    savings,
1737                );
1738                if block_len < uncompressed_data.len() {
1739                    // Carry the kept suffix into the reusable `pending_input`
1740                    // buffer (cleared, capacity retained) instead of allocating
1741                    // a fresh Vec via `split_off`. Next iteration copies it back
1742                    // into a pooled block buffer. The block currently being
1743                    // compressed is truncated to the chosen split length.
1744                    pending_input.clear();
1745                    pending_input.extend_from_slice(&uncompressed_data[block_len..]);
1746                    uncompressed_data.truncate(block_len);
1747                    last_block = false;
1748                }
1749            }
1750            // As we read, hash that data too (skipped when the content
1751            // checksum is disabled).
1752            #[cfg(feature = "hash")]
1753            if self.content_checksum {
1754                self.hasher.write(&uncompressed_data);
1755            }
1756            // Per-physical-block XXH64 (low 32 bits) for the optional
1757            // per-block checksum sidecar. Hashing happens INSIDE the
1758            // block emitters (RLE / Raw fast-path / Compressed /
1759            // post-split partitions), so the digests vector has
1760            // exactly one entry per physical Block_Header written to
1761            // `all_blocks` — 1:1 with `FrameEmitInfo.blocks`. See
1762            // `enable_per_block_checksums` rustdoc.
1763            // Size the output ahead of this block's emission from the ratio
1764            // observed so far (see `reserve_for_next_block`); with no usable
1765            // size hint, ensure one block's worst case and let the doubling
1766            // growth policy amortize across blocks.
1767            let emitted =
1768                total_uncompressed - uncompressed_data.len() as u64 - pending_input.len() as u64;
1769            match initial_size_hint {
1770                Some(hint) if hint >= total_uncompressed => {
1771                    // An advisory hint (streaming path) is only trusted up to
1772                    // a small lookahead past the bytes actually read: a hint
1773                    // far above the real input would otherwise reserve the
1774                    // whole phantom remainder up front.
1775                    let hint_remaining = hint - emitted;
1776                    let remaining = if hint_is_exact {
1777                        hint_remaining
1778                    } else {
1779                        let buffered = total_uncompressed - emitted;
1780                        const HINT_LOOKAHEAD: u64 = 64 * 1024;
1781                        hint_remaining.min(buffered + HINT_LOOKAHEAD)
1782                    };
1783                    reserve_for_next_block(
1784                        out,
1785                        blocks_start,
1786                        emitted,
1787                        remaining as usize,
1788                        self.block_capacity(),
1789                    );
1790                }
1791                _ => {
1792                    out.reserve(uncompressed_data.len() + 3 + 16);
1793                }
1794            }
1795            // Special handling is needed for compression of a totally empty file
1796            if uncompressed_data.is_empty() {
1797                let header = BlockHeader {
1798                    last_block: true,
1799                    block_type: crate::blocks::block::BlockType::Raw,
1800                    block_size: 0,
1801                };
1802                header.serialize(out);
1803                #[cfg(feature = "lsm")]
1804                self.block_decompressed_sizes.push(0);
1805                #[cfg(all(feature = "lsm", feature = "hash"))]
1806                if let Some(checksums) = self.block_checksums.as_mut() {
1807                    checksums.push(xxh64_block_low32(&[]));
1808                }
1809                break;
1810            }
1811
1812            match self.compression_level {
1813                CompressionLevel::Uncompressed => {
1814                    let header = BlockHeader {
1815                        last_block,
1816                        block_type: crate::blocks::block::BlockType::Raw,
1817                        block_size: uncompressed_data.len().try_into().unwrap(),
1818                    };
1819                    header.serialize(out);
1820                    #[cfg(feature = "lsm")]
1821                    self.block_decompressed_sizes
1822                        .push(uncompressed_data.len() as u32);
1823                    #[cfg(all(feature = "lsm", feature = "hash"))]
1824                    if let Some(checksums) = self.block_checksums.as_mut() {
1825                        checksums.push(xxh64_block_low32(&uncompressed_data));
1826                    }
1827                    out.extend_from_slice(&uncompressed_data);
1828                    savings +=
1829                        uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
1830                }
1831                CompressionLevel::Fastest
1832                | CompressionLevel::Default
1833                | CompressionLevel::Better
1834                | CompressionLevel::Best
1835                | CompressionLevel::Level(_) => {
1836                    let before_len = out.len();
1837                    let block_len = uncompressed_data.len();
1838                    // A primed dictionary makes "incompressible-looking"
1839                    // blocks matchable against the dict, so the raw-fast-
1840                    // path inside must be bypassed (it skips matching).
1841                    // Mirror prepare_frame's `use_dictionary_state`: a dict
1842                    // is only PRIMED (and thus matchable) when the matcher
1843                    // supports priming — a non-priming matcher ignores an
1844                    // attached dictionary, so the raw-fast-path must stay
1845                    // enabled for it. (This arm is already non-Uncompressed.)
1846                    let dict_active = self.dictionary.is_some()
1847                        && self.state.matcher.supports_dictionary_priming();
1848                    compress_block_encoded(
1849                        &mut self.state,
1850                        self.compression_level,
1851                        last_block,
1852                        uncompressed_data,
1853                        out,
1854                        dict_active,
1855                        #[cfg(feature = "lsm")]
1856                        Some(&mut self.block_decompressed_sizes),
1857                        #[cfg(all(feature = "lsm", feature = "hash"))]
1858                        self.block_checksums.as_mut(),
1859                    );
1860                    savings += block_len as i64 - (out.len() - before_len) as i64;
1861                }
1862            }
1863            if last_block && pending_input.is_empty() {
1864                break;
1865            }
1866        }
1867        total_uncompressed
1868    }
1869
1870    /// Append the frame header bytes onto `out` once the total payload size
1871    /// is known (so `Frame_Content_Size` / `single_segment` can be set).
1872    /// Appends rather than returns so the one-shot path serializes straight
1873    /// into the reused output buffer with no per-frame header `Vec`.
1874    fn append_frame_header(&self, total_uncompressed: u64, prep: &FramePrep, out: &mut Vec<u8>) {
1875        // Match the donor framing policy for pledged one-shot inputs: use a
1876        // single-segment frame whenever the source fits the active window.
1877        // A single-segment frame REQUIRES an FCS field, so suppressing the
1878        // content size (`content_size_flag` off) also forces the windowed
1879        // layout, mirroring upstream. Dictionary frames qualify (the
1880        // reference emits single-segment + dictionary-ID headers): the
1881        // dictionary is decoder setup state, not part of the regenerated
1882        // segment, so the frame keeps single-segment wire layout and
1883        // decoders keep their single-allocation paths (our own decoder
1884        // already caps reservation to min(window, FCS) either way).
1885        let single_segment = self.content_size_flag
1886            && prep.source_size_hint_known
1887            && total_uncompressed >= 512
1888            && total_uncompressed <= prep.window_size;
1889        let header = FrameHeader {
1890            frame_content_size: self.content_size_flag.then_some(total_uncompressed),
1891            single_segment,
1892            content_checksum: cfg!(feature = "hash") && self.content_checksum,
1893            dictionary_id: if prep.use_dictionary_state && self.dict_id_flag {
1894                self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
1895            } else {
1896                None
1897            },
1898            window_size: if single_segment {
1899                None
1900            } else {
1901                Some(prep.window_size)
1902            },
1903            magicless: self.magicless,
1904        };
1905        header.serialize(out);
1906    }
1907
1908    /// Write the frame header, accumulated block bytes, and optional
1909    /// trailing content checksum to the configured drain; populate
1910    /// `frame_emit_info` (lsm). Header and blocks are written separately to
1911    /// avoid shifting `all_blocks` to prepend the header. Used by
1912    /// `compress` and `compress_oneshot_borrowed`.
1913    fn finish_frame(&mut self, all_blocks: Vec<u8>, total_uncompressed: u64, prep: &FramePrep) {
1914        let mut header_buf: Vec<u8> = Vec::with_capacity(18);
1915        self.append_frame_header(total_uncompressed, prep, &mut header_buf);
1916        // Snapshot the checksum before borrowing the drain field so the
1917        // `self.hasher` read and the `self.compressed_data` write don't
1918        // both need `&mut self` simultaneously.
1919        #[cfg(feature = "hash")]
1920        let checksum_bytes = self
1921            .content_checksum
1922            .then(|| (self.hasher.finish() as u32).to_le_bytes());
1923        let drain = self.compressed_data.as_mut().unwrap();
1924        drain.write_all(&header_buf).unwrap();
1925        drain.write_all(&all_blocks).unwrap();
1926        // With the `hash` feature AND the content checksum enabled, the header
1927        // set `Content_Checksum_flag` and the 32-bit digest is written at the
1928        // end of the frame. Disabled => no trailing bytes, flag stays 0.
1929        #[cfg(feature = "hash")]
1930        if let Some(checksum_bytes) = checksum_bytes {
1931            drain.write_all(&checksum_bytes).unwrap();
1932        }
1933        #[cfg(feature = "lsm")]
1934        {
1935            let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1936            self.populate_frame_emit_info(header_buf.len(), &all_blocks, emit_checksum);
1937        }
1938    }
1939
1940    /// Assemble the frame (header + blocks + optional checksum) into the
1941    /// caller-provided `out` buffer, replacing its contents, and populate
1942    /// `frame_emit_info` (lsm). `out` is cleared first (its allocation is
1943    /// reused, the CCtx-equivalent zero-per-call-alloc output path) then
1944    /// grown once to the exact frame size. Used by
1945    /// `compress_independent_frame_into`. The single `all_blocks` copy into
1946    /// `out` is the same one copy `finish_frame` performs writing
1947    /// `all_blocks` into a `Vec` drain, no extra buffering vs the drain
1948    /// path.
1949    /// Walk `all_blocks` to recover per-block layout and store it in
1950    /// `frame_emit_info`. Each Block_Header is 3 bytes LE packing
1951    /// `(block_size << 3) | (block_type << 1) | last_block`. Physical body
1952    /// size differs by type: RLE bodies are always 1 byte (the repeated
1953    /// byte), Raw/Compressed bodies span `block_size`. `header_len` is the
1954    /// serialized frame-header length (frame offset of the first block).
1955    #[cfg(feature = "lsm")]
1956    fn populate_frame_emit_info(
1957        &mut self,
1958        header_len: usize,
1959        all_blocks: &[u8],
1960        emit_checksum: bool,
1961    ) {
1962        use crate::blocks::block::BlockType as BT;
1963        use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
1964        // All frame-offset arithmetic below is bounded by u32 on the wire
1965        // (Block_Size is a 21-bit field, frames bounded by MAX_BLOCK_SIZE *
1966        // #blocks). A pathologically large frame whose total emitted size
1967        // exceeds u32::MAX would overflow the cast; bail out by leaving
1968        // `frame_emit_info` at `None` rather than handing the caller a
1969        // silently-truncated layout. The overflow path is statically
1970        // unreachable on every realistic frame so the predictor amortises
1971        // the branch to zero cost.
1972        let frame_header_len: u32 = match u32::try_from(header_len) {
1973            Ok(v) => v,
1974            Err(_) => return,
1975        };
1976        let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
1977            Ok(v) => v,
1978            Err(_) => return,
1979        };
1980        let mut blocks: Vec<FrameBlock> = Vec::new();
1981        let mut cursor: usize = 0;
1982        while cursor + 3 <= all_blocks.len() {
1983            let mut header_u32 = [0u8; 4];
1984            header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
1985            let raw = u32::from_le_bytes(header_u32);
1986            let last_block = (raw & 1) != 0;
1987            let block_type = match (raw >> 1) & 0b11 {
1988                0 => BT::Raw,
1989                1 => BT::RLE,
1990                2 => BT::Compressed,
1991                _ => BT::Reserved,
1992            };
1993            let block_size_field = raw >> 3;
1994            // RLE bodies are always 1 byte physical on the wire (the single
1995            // repeated byte); the spec's Block_Size field carries the
1996            // logical repeat count. Raw and Compressed bodies physically
1997            // span block_size_field bytes. Store the physical length in
1998            // body_size so the 'offset + header + body_size' arithmetic
1999            // always lands on the next block boundary, and surface the raw
2000            // spec field separately as block_size_field.
2001            let physical_body: u32 = match block_type {
2002                BT::RLE => 1,
2003                _ => block_size_field,
2004            };
2005            let cursor_u32: u32 = match u32::try_from(cursor) {
2006                Ok(v) => v,
2007                Err(_) => return,
2008            };
2009            let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
2010                Some(v) => v,
2011                None => return,
2012            };
2013            // Decompressed (regenerated) size, captured per physical block
2014            // during emit (1:1 with the wire blocks scanned here). Raw/RLE are
2015            // wire-derivable (`block_size_field`), so a short sidecar still
2016            // yields the correct value for them. A Compressed block's size is
2017            // NOT on the wire: if the sidecar is missing its entry, fabricating
2018            // 0 would publish a silently-wrong `decompressed_byte_range`. Since
2019            // this metadata is the authoritative mapping for a successful
2020            // encode, bail out (leave `frame_emit_info` at `None`) rather than
2021            // hand back a corrupt layout; the 1:1 push invariant makes this
2022            // unreachable in practice (debug_assert catches a regression).
2023            let decompressed_size = match self.block_decompressed_sizes.get(blocks.len()).copied() {
2024                Some(size) => size,
2025                None if matches!(block_type, BT::Raw | BT::RLE) => block_size_field,
2026                None => {
2027                    debug_assert!(
2028                        false,
2029                        "missing decompressed-size sidecar entry for compressed block {}",
2030                        blocks.len()
2031                    );
2032                    return;
2033                }
2034            };
2035            blocks.push(FrameBlock {
2036                offset_in_frame,
2037                header_size: 3,
2038                body_size: physical_body,
2039                block_size_field,
2040                block_type,
2041                last_block,
2042                decompressed_size,
2043            });
2044            cursor += 3 + physical_body as usize;
2045            if last_block {
2046                break;
2047            }
2048        }
2049        // Fail closed on a structurally incomplete scan: the loop must have
2050        // consumed the whole block section AND ended on a parsed last block.
2051        // A premature `last_block` (bytes left over) or a run-off without any
2052        // last block would otherwise publish an invalid public `FrameEmitInfo`.
2053        // Unreachable for a well-formed self-produced frame (debug_assert
2054        // catches a regression); on release we bail, leaving `frame_emit_info`
2055        // at `None` rather than handing back a corrupt layout.
2056        if cursor != all_blocks.len() || !blocks.last().is_some_and(|b| b.last_block) {
2057            debug_assert!(
2058                false,
2059                "incomplete block scan in populate_frame_emit_info: cursor={} len={} last_block={:?}",
2060                cursor,
2061                all_blocks.len(),
2062                blocks.last().map(|b| b.last_block)
2063            );
2064            return;
2065        }
2066        let checksum_range = if emit_checksum {
2067            let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
2068                Some(v) => v,
2069                None => return,
2070            };
2071            let cs_end = match cs_start.checked_add(4) {
2072                Some(v) => v,
2073                None => return,
2074            };
2075            Some(cs_start..cs_end)
2076        } else {
2077            None
2078        };
2079        let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
2080            Some(v) => v,
2081            None => return,
2082        };
2083        let total_size = if checksum_range.is_some() {
2084            match body_total.checked_add(4) {
2085                Some(v) => v,
2086                None => return,
2087            }
2088        } else {
2089            body_total
2090        };
2091        self.frame_emit_info = Some(FrameEmitInfo {
2092            frame_header_range: 0..frame_header_len,
2093            blocks,
2094            checksum_range,
2095            total_size,
2096        });
2097    }
2098
2099    /// Layout of the most recently emitted frame.
2100    ///
2101    /// Returns `None` if [`compress`](Self::compress) has not been
2102    /// called yet on this compressor. After a successful `compress()`
2103    /// the returned `FrameEmitInfo` describes the frame header range,
2104    /// every emitted block's offset / size / type, and the optional
2105    /// trailing content-checksum range — all in frame-absolute byte
2106    /// offsets matching the bytes written to the drain.
2107    ///
2108    /// Behind the `lsm` Cargo feature.
2109    #[cfg(feature = "lsm")]
2110    pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
2111        self.frame_emit_info.as_ref()
2112    }
2113
2114    /// Opt in to per-block XXH64 checksum computation during
2115    /// [`compress`](Self::compress). Default off; zero cost when
2116    /// disabled. The captured digests are accessible via
2117    /// [`last_frame_block_checksums`](Self::last_frame_block_checksums).
2118    ///
2119    /// One checksum is emitted per physical FrameBlock written to
2120    /// the drain: 1:1 cardinality with
2121    /// [`last_frame_emit_info`](Self::last_frame_emit_info)'s
2122    /// `blocks` vector. On the post-split optimization path
2123    /// (Level 16-22 with large window) the per-partition decompressed
2124    /// range is hashed inside the partition loop so the digest count
2125    /// still matches the emitted block count. The decoder collects
2126    /// per-physical-block digests on the same granularity, so
2127    /// element-wise equality holds round-trip.
2128    ///
2129    /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
2130    /// primitive lives behind the `hash` feature, so this method only
2131    /// compiles when both are enabled.
2132    #[cfg(all(feature = "lsm", feature = "hash"))]
2133    pub fn enable_per_block_checksums(&mut self) {
2134        self.per_block_checksums_enabled = true;
2135    }
2136
2137    /// Per-block XXH64 (low 32 bits) digests captured during the most
2138    /// recent `compress()` call. `None` unless
2139    /// [`enable_per_block_checksums`](Self::enable_per_block_checksums)
2140    /// was called before `compress()`.
2141    ///
2142    /// Behind `all(feature = "lsm", feature = "hash")`.
2143    #[cfg(all(feature = "lsm", feature = "hash"))]
2144    pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
2145        self.block_checksums.as_deref()
2146    }
2147
2148    /// Get a mutable reference to the source
2149    pub fn source_mut(&mut self) -> Option<&mut R> {
2150        self.uncompressed_data.as_mut()
2151    }
2152
2153    /// Get a mutable reference to the drain
2154    pub fn drain_mut(&mut self) -> Option<&mut W> {
2155        self.compressed_data.as_mut()
2156    }
2157
2158    /// Get a reference to the source
2159    pub fn source(&self) -> Option<&R> {
2160        self.uncompressed_data.as_ref()
2161    }
2162
2163    /// Get a reference to the drain
2164    pub fn drain(&self) -> Option<&W> {
2165        self.compressed_data.as_ref()
2166    }
2167
2168    /// Retrieve the source
2169    pub fn take_source(&mut self) -> Option<R> {
2170        self.uncompressed_data.take()
2171    }
2172
2173    /// Retrieve the drain
2174    pub fn take_drain(&mut self) -> Option<W> {
2175        self.compressed_data.take()
2176    }
2177
2178    /// Before calling [FrameCompressor::compress] you can replace the matcher
2179    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
2180        core::mem::swap(&mut match_generator, &mut self.state.matcher);
2181        match_generator
2182    }
2183
2184    /// Before calling [FrameCompressor::compress] you can replace the compression level.
2185    ///
2186    /// This also clears any fine-grained parameter overrides installed via
2187    /// [`set_parameters`](Self::set_parameters): reverting to a bare level
2188    /// means plain level-based tuning, not the previous frame's customized
2189    /// strategy / LDM / log overrides. To keep overriding, call
2190    /// [`set_parameters`](Self::set_parameters) again with the new base level.
2191    pub fn set_compression_level(
2192        &mut self,
2193        compression_level: CompressionLevel,
2194    ) -> CompressionLevel {
2195        let old = self.compression_level;
2196        self.compression_level = compression_level;
2197        // Drop sticky overrides so the level switch yields plain geometry.
2198        self.strategy_override = None;
2199        self.state.matcher.clear_param_overrides();
2200        old
2201    }
2202
2203    /// Get the current compression level
2204    pub fn compression_level(&self) -> CompressionLevel {
2205        self.compression_level
2206    }
2207
2208    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
2209    ///
2210    /// In compressed modes, the dictionary id is written only when the active
2211    /// matcher supports dictionary priming.
2212    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
2213    /// at encode time.
2214    pub fn set_dictionary(
2215        &mut self,
2216        dictionary: crate::decoding::Dictionary,
2217    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2218        self.attach_dictionary(EncoderDictionary::from_dictionary(dictionary))
2219    }
2220
2221    /// Parse and attach a serialized dictionary blob.
2222    ///
2223    /// Parses with the encoder-only path (skips the FSE/HUF decode lookup-table
2224    /// build the encoder never reads); the entropy ENCODER tables — and thus
2225    /// the emitted frame — are identical to a full parse.
2226    pub fn set_dictionary_from_bytes(
2227        &mut self,
2228        raw_dictionary: &[u8],
2229    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2230        self.attach_dictionary(EncoderDictionary::from_bytes(raw_dictionary)?)
2231    }
2232
2233    /// Attach an already-parsed [`EncoderDictionary`] without reparsing a raw
2234    /// blob.
2235    ///
2236    /// Accepts an `EncoderDictionary` produced once via
2237    /// [`EncoderDictionary::from_bytes`] / [`EncoderDictionary::from_dictionary`]
2238    /// or handed back by [`Self::clear_dictionary`] / the `set_dictionary*`
2239    /// return value, so callers can reattach or reuse a prepared dictionary
2240    /// across compressions without re-running the dictionary parse each time.
2241    /// Returns the previously-attached dictionary, if any.
2242    pub fn set_encoder_dictionary(
2243        &mut self,
2244        dictionary: EncoderDictionary,
2245    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2246        self.attach_dictionary(dictionary)
2247    }
2248
2249    /// Remove the attached dictionary, returning it as an [`EncoderDictionary`].
2250    pub fn clear_dictionary(&mut self) -> Option<EncoderDictionary> {
2251        self.dictionary_entropy_cache = None;
2252        // Drop the CDict prime snapshot — it is keyed to the dictionary
2253        // being removed and must not be restored against a different (or no)
2254        // dictionary on the next frame.
2255        self.state.matcher.invalidate_primed_dictionary();
2256        self.dictionary.take()
2257    }
2258
2259    /// Validate `enc`, build the encoder entropy cache from it, store it, and
2260    /// return the previously-attached dictionary. Shared by every public
2261    /// attach entry point: `set_dictionary`, `set_dictionary_from_bytes`, and
2262    /// `set_encoder_dictionary`.
2263    fn attach_dictionary(
2264        &mut self,
2265        enc: EncoderDictionary,
2266    ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2267        let dictionary = &enc.inner;
2268        if dictionary.id == 0 {
2269            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
2270        }
2271        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
2272            return Err(
2273                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
2274                    index: index as u8,
2275                },
2276            );
2277        }
2278        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(dictionary));
2279        // A previously-captured CDict prime snapshot belongs to the OLD
2280        // dictionary; drop it so the first frame with the new dictionary
2281        // re-primes (and re-captures) instead of restoring stale tables.
2282        self.state.matcher.invalidate_primed_dictionary();
2283        Ok(self.dictionary.replace(enc))
2284    }
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289    // `format!` is used by ungated tests (e.g. the btlazy2 dict-reuse
2290    // byte-identity test), so the import must not be feature-gated — under
2291    // default features (no `dict_builder`) the gated form left `format!`
2292    // unresolved when the test module is compiled.
2293    use alloc::format;
2294    use alloc::vec;
2295
2296    use super::FrameCompressor;
2297    use crate::blocks::block::BlockType;
2298    use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
2299    use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
2300    use crate::encoding::{Matcher, Sequence};
2301    use alloc::vec::Vec;
2302
2303    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
2304        let mut state = seed;
2305        let mut data = Vec::with_capacity(len);
2306        for _ in 0..len {
2307            state = state
2308                .wrapping_mul(6364136223846793005)
2309                .wrapping_add(1442695040888963407);
2310            data.push((state >> 33) as u8);
2311        }
2312        data
2313    }
2314
2315    fn first_block_type(frame: &[u8]) -> BlockType {
2316        let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
2317        let mut decoder = block_decoder::new();
2318        let (header, _) = decoder
2319            .read_block_header(&frame[header_size as usize..])
2320            .expect("block header should parse");
2321        header.block_type
2322    }
2323
2324    /// Frame content size is written correctly and C zstd can decompress the output.
2325    #[cfg(feature = "std")]
2326    #[test]
2327    fn fcs_header_written_and_c_zstd_compatible() {
2328        let levels = [
2329            crate::encoding::CompressionLevel::Uncompressed,
2330            crate::encoding::CompressionLevel::Fastest,
2331            crate::encoding::CompressionLevel::Default,
2332            crate::encoding::CompressionLevel::Better,
2333            crate::encoding::CompressionLevel::Best,
2334        ];
2335        let fcs_2byte = vec![0xCDu8; 300]; // 300 bytes → 2-byte FCS (256..=65791 range)
2336        let large = vec![0xABu8; 100_000];
2337        let inputs: [&[u8]; 5] = [
2338            &[],
2339            &[0x00],
2340            b"abcdefghijklmnopqrstuvwxy\n",
2341            &fcs_2byte,
2342            &large,
2343        ];
2344        for level in levels {
2345            for data in &inputs {
2346                let compressed = crate::encoding::compress_to_vec(*data, level);
2347                // Verify FCS is present and correct
2348                let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
2349                    .unwrap()
2350                    .0;
2351                assert_eq!(
2352                    header.frame_content_size(),
2353                    data.len() as u64,
2354                    "FCS mismatch for len={} level={:?}",
2355                    data.len(),
2356                    level,
2357                );
2358                // Confirm the FCS field is actually present in the header
2359                // (not just the decoder returning 0 for absent FCS).
2360                assert_ne!(
2361                    header.descriptor.frame_content_size_bytes().unwrap(),
2362                    0,
2363                    "FCS field must be present for len={} level={:?}",
2364                    data.len(),
2365                    level,
2366                );
2367                // Verify C zstd can decompress
2368                let mut decoded = Vec::new();
2369                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2370                    |e| {
2371                        panic!(
2372                            "C zstd decode failed for len={} level={level:?}: {e}",
2373                            data.len()
2374                        )
2375                    },
2376                );
2377                assert_eq!(
2378                    decoded.as_slice(),
2379                    *data,
2380                    "C zstd roundtrip failed for len={}",
2381                    data.len()
2382                );
2383            }
2384        }
2385    }
2386
2387    #[cfg(feature = "std")]
2388    #[test]
2389    fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
2390        let data = vec![0xAB; 2047];
2391        let compressed = {
2392            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2393            compressor.set_source_size_hint(data.len() as u64);
2394            compressor.set_source(data.as_slice());
2395            let mut out = Vec::new();
2396            compressor.set_drain(&mut out);
2397            compressor.compress();
2398            out
2399        };
2400
2401        let mut decoded = Vec::new();
2402        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2403        assert_eq!(decoded, data);
2404    }
2405
2406    #[cfg(feature = "std")]
2407    #[test]
2408    fn small_hinted_default_frame_uses_single_segment_header() {
2409        let data = generate_data(0xD15E_A5ED, 1024);
2410        let compressed = {
2411            let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
2412            compressor.set_source_size_hint(data.len() as u64);
2413            compressor.set_source(data.as_slice());
2414            let mut out = Vec::new();
2415            compressor.set_drain(&mut out);
2416            compressor.compress();
2417            out
2418        };
2419
2420        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2421        assert!(
2422            frame_header.descriptor.single_segment_flag(),
2423            "small hinted default frames should use single-segment header for Rust/FFI parity"
2424        );
2425        assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2426        let mut decoded = Vec::new();
2427        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
2428            .expect("ffi decoder must accept single-segment small hinted default frame");
2429        assert_eq!(decoded, data);
2430    }
2431
2432    #[cfg(feature = "std")]
2433    #[test]
2434    fn small_hinted_numeric_default_levels_use_single_segment_header() {
2435        let data = generate_data(0xA11C_E003, 1024);
2436        for level in [
2437            super::CompressionLevel::Level(0),
2438            super::CompressionLevel::Level(3),
2439        ] {
2440            let compressed = {
2441                let mut compressor = FrameCompressor::new(level);
2442                compressor.set_source_size_hint(data.len() as u64);
2443                compressor.set_source(data.as_slice());
2444                let mut out = Vec::new();
2445                compressor.set_drain(&mut out);
2446                compressor.compress();
2447                out
2448            };
2449
2450            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2451            assert!(
2452                frame_header.descriptor.single_segment_flag(),
2453                "small hinted numeric default level frames should use single-segment header (level={level:?})"
2454            );
2455            assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2456            let mut decoded = Vec::new();
2457            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
2458                panic!(
2459                    "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
2460                )
2461            });
2462            assert_eq!(decoded, data);
2463        }
2464    }
2465
2466    #[cfg(feature = "std")]
2467    #[test]
2468    fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
2469        let levels = [
2470            super::CompressionLevel::Fastest,
2471            super::CompressionLevel::Default,
2472            super::CompressionLevel::Better,
2473            super::CompressionLevel::Best,
2474            super::CompressionLevel::Level(-1),
2475            super::CompressionLevel::Level(2),
2476            super::CompressionLevel::Level(3),
2477            super::CompressionLevel::Level(4),
2478            super::CompressionLevel::Level(11),
2479        ];
2480        let sizes = [
2481            511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
2482        ];
2483
2484        for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
2485            for &size in &sizes {
2486                let data = generate_data(seed + seed_idx as u64, size);
2487                for &level in &levels {
2488                    let compressed = {
2489                        let mut compressor = FrameCompressor::new(level);
2490                        compressor.set_source_size_hint(data.len() as u64);
2491                        compressor.set_source(data.as_slice());
2492                        let mut out = Vec::new();
2493                        compressor.set_drain(&mut out);
2494                        compressor.compress();
2495                        out
2496                    };
2497                    if matches!(size, 511 | 512) {
2498                        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2499                        assert_eq!(
2500                            frame_header.descriptor.single_segment_flag(),
2501                            size == 512,
2502                            "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
2503                        );
2504                    }
2505
2506                    let mut decoded = Vec::new();
2507                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2508                        |e| {
2509                            panic!(
2510                                "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
2511                                seed + seed_idx as u64
2512                            )
2513                        },
2514                    );
2515                    assert_eq!(
2516                        decoded,
2517                        data,
2518                        "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
2519                        seed + seed_idx as u64
2520                    );
2521                }
2522            }
2523        }
2524    }
2525
2526    #[cfg(feature = "std")]
2527    #[test]
2528    fn hinted_levels_use_single_segment_header_symmetrically() {
2529        let levels = [
2530            super::CompressionLevel::Fastest,
2531            super::CompressionLevel::Default,
2532            super::CompressionLevel::Better,
2533            super::CompressionLevel::Best,
2534            super::CompressionLevel::Level(0),
2535            super::CompressionLevel::Level(2),
2536            super::CompressionLevel::Level(3),
2537            super::CompressionLevel::Level(4),
2538            super::CompressionLevel::Level(11),
2539        ];
2540        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
2541            let size = 1024 + seed_idx * 97;
2542            let data = generate_data(seed, size);
2543            for &level in &levels {
2544                let compressed = {
2545                    let mut compressor = FrameCompressor::new(level);
2546                    compressor.set_source_size_hint(data.len() as u64);
2547                    compressor.set_source(data.as_slice());
2548                    let mut out = Vec::new();
2549                    compressor.set_drain(&mut out);
2550                    compressor.compress();
2551                    out
2552                };
2553                let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2554                assert!(
2555                    frame_header.descriptor.single_segment_flag(),
2556                    "hinted frame should be single-segment for level={level:?} size={}",
2557                    data.len()
2558                );
2559                assert_eq!(frame_header.frame_content_size(), data.len() as u64);
2560                let mut decoded = Vec::new();
2561                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
2562                    panic!(
2563                        "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
2564                        data.len()
2565                    )
2566                });
2567                assert_eq!(decoded, data);
2568            }
2569        }
2570    }
2571
2572    #[cfg(feature = "std")]
2573    #[test]
2574    fn hinted_levels_pin_511_512_single_segment_boundary() {
2575        let levels = [
2576            super::CompressionLevel::Fastest,
2577            super::CompressionLevel::Default,
2578            super::CompressionLevel::Better,
2579            super::CompressionLevel::Best,
2580            super::CompressionLevel::Level(0),
2581            super::CompressionLevel::Level(2),
2582            super::CompressionLevel::Level(3),
2583            super::CompressionLevel::Level(4),
2584            super::CompressionLevel::Level(11),
2585        ];
2586        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
2587            for &size in &[511usize, 512] {
2588                let data = generate_data(seed + seed_idx as u64, size);
2589                for &level in &levels {
2590                    let compressed = {
2591                        let mut compressor = FrameCompressor::new(level);
2592                        compressor.set_source_size_hint(data.len() as u64);
2593                        compressor.set_source(data.as_slice());
2594                        let mut out = Vec::new();
2595                        compressor.set_drain(&mut out);
2596                        compressor.compress();
2597                        out
2598                    };
2599                    let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2600                    assert_eq!(
2601                        frame_header.descriptor.single_segment_flag(),
2602                        size == 512,
2603                        "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
2604                    );
2605                    let mut decoded = Vec::new();
2606                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
2607                        |e| {
2608                            panic!(
2609                                "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
2610                                seed + seed_idx as u64
2611                            )
2612                        },
2613                    );
2614                    assert_eq!(decoded, data);
2615                }
2616            }
2617        }
2618    }
2619
2620    #[cfg(feature = "std")]
2621    #[test]
2622    fn fastest_random_block_uses_raw_fast_path() {
2623        let data = generate_data(0xC0FF_EE11, 10 * 1024);
2624        let compressed =
2625            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
2626
2627        assert_eq!(first_block_type(&compressed), BlockType::Raw);
2628
2629        let mut decoded = Vec::new();
2630        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2631        assert_eq!(decoded, data);
2632    }
2633
2634    #[cfg(feature = "std")]
2635    #[test]
2636    fn default_random_block_uses_raw_fast_path() {
2637        let data = generate_data(0xD15E_A5ED, 10 * 1024);
2638        let compressed =
2639            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
2640
2641        assert_eq!(first_block_type(&compressed), BlockType::Raw);
2642
2643        let mut decoded = Vec::new();
2644        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2645        assert_eq!(decoded, data);
2646    }
2647
2648    #[cfg(feature = "std")]
2649    #[test]
2650    fn best_random_block_uses_raw_fast_path() {
2651        let data = generate_data(0xB35C_AFE1, 10 * 1024);
2652        let compressed =
2653            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
2654
2655        assert_eq!(first_block_type(&compressed), BlockType::Raw);
2656
2657        let mut decoded = Vec::new();
2658        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2659        assert_eq!(decoded, data);
2660    }
2661
2662    #[cfg(feature = "std")]
2663    #[test]
2664    fn level2_random_block_uses_raw_fast_path() {
2665        let data = generate_data(0xA11C_E222, 10 * 1024);
2666        let compressed =
2667            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
2668
2669        assert_eq!(first_block_type(&compressed), BlockType::Raw);
2670
2671        let mut decoded = Vec::new();
2672        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2673        assert_eq!(decoded, data);
2674    }
2675
2676    #[cfg(feature = "std")]
2677    #[test]
2678    fn better_random_block_uses_raw_fast_path() {
2679        let data = generate_data(0xBE77_E111, 10 * 1024);
2680        let compressed =
2681            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
2682
2683        assert_eq!(first_block_type(&compressed), BlockType::Raw);
2684
2685        let mut decoded = Vec::new();
2686        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2687        assert_eq!(decoded, data);
2688    }
2689
2690    #[cfg(feature = "std")]
2691    #[test]
2692    fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
2693        let mut data = Vec::with_capacity(16 * 1024);
2694        const LINE: &[u8] =
2695            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
2696        while data.len() < 16 * 1024 {
2697            let remaining = 16 * 1024 - data.len();
2698            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
2699        }
2700
2701        fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
2702            let compressed = crate::encoding::compress_to_vec(data, level);
2703            assert_ne!(first_block_type(&compressed), BlockType::Raw);
2704            assert!(
2705                compressed.len() < data.len(),
2706                "compressible input should remain compressible for level={level:?}"
2707            );
2708            let mut decoded = Vec::new();
2709            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
2710            assert_eq!(decoded, data);
2711        }
2712
2713        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
2714        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
2715        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
2716        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
2717        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
2718    }
2719
2720    #[cfg(feature = "std")]
2721    #[test]
2722    fn hinted_small_compressible_frames_use_single_segment_across_levels() {
2723        let mut data = Vec::with_capacity(4 * 1024);
2724        const LINE: &[u8] =
2725            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
2726        while data.len() < 4 * 1024 {
2727            let remaining = 4 * 1024 - data.len();
2728            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
2729        }
2730
2731        for level in [
2732            super::CompressionLevel::Fastest,
2733            super::CompressionLevel::Default,
2734            super::CompressionLevel::Better,
2735            super::CompressionLevel::Best,
2736            super::CompressionLevel::Level(0),
2737            super::CompressionLevel::Level(3),
2738            super::CompressionLevel::Level(4),
2739            super::CompressionLevel::Level(11),
2740        ] {
2741            let compressed = {
2742                let mut compressor = FrameCompressor::new(level);
2743                compressor.set_source_size_hint(data.len() as u64);
2744                compressor.set_source(data.as_slice());
2745                let mut out = Vec::new();
2746                compressor.set_drain(&mut out);
2747                compressor.compress();
2748                out
2749            };
2750            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
2751            assert!(
2752                frame_header.descriptor.single_segment_flag(),
2753                "hinted small compressible frame should use single-segment (level={level:?})"
2754            );
2755            assert_ne!(
2756                first_block_type(&compressed),
2757                BlockType::Raw,
2758                "compressible hinted frame should stay off raw fast path (level={level:?})"
2759            );
2760            assert!(
2761                compressed.len() < data.len(),
2762                "compressible hinted frame should still shrink (level={level:?})"
2763            );
2764            let mut decoded = Vec::new();
2765            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
2766                .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
2767            assert_eq!(decoded, data);
2768        }
2769    }
2770
2771    struct NoDictionaryMatcher {
2772        last_space: Vec<u8>,
2773        window_size: u64,
2774    }
2775
2776    impl NoDictionaryMatcher {
2777        fn new(window_size: u64) -> Self {
2778            Self {
2779                last_space: Vec::new(),
2780                window_size,
2781            }
2782        }
2783    }
2784
2785    impl Matcher for NoDictionaryMatcher {
2786        fn get_next_space(&mut self) -> Vec<u8> {
2787            vec![0; self.window_size as usize]
2788        }
2789
2790        fn get_last_space(&mut self) -> &[u8] {
2791            self.last_space.as_slice()
2792        }
2793
2794        fn commit_space(&mut self, space: Vec<u8>) {
2795            self.last_space = space;
2796        }
2797
2798        fn skip_matching(&mut self) {}
2799
2800        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
2801            handle_sequence(Sequence::Literals {
2802                literals: self.last_space.as_slice(),
2803            });
2804        }
2805
2806        fn reset(&mut self, _level: super::CompressionLevel) {
2807            self.last_space.clear();
2808        }
2809
2810        fn window_size(&self) -> u64 {
2811            self.window_size
2812        }
2813    }
2814
2815    #[test]
2816    fn frame_starts_with_magic_num() {
2817        let mock_data = [1_u8, 2, 3].as_slice();
2818        let mut output: Vec<u8> = Vec::new();
2819        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2820        compressor.set_source(mock_data);
2821        compressor.set_drain(&mut output);
2822
2823        compressor.compress();
2824        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
2825    }
2826
2827    #[test]
2828    fn very_simple_raw_compress() {
2829        let mock_data = [1_u8, 2, 3].as_slice();
2830        let mut output: Vec<u8> = Vec::new();
2831        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2832        compressor.set_source(mock_data);
2833        compressor.set_drain(&mut output);
2834
2835        compressor.compress();
2836    }
2837
2838    #[test]
2839    fn very_simple_compress() {
2840        let mut mock_data = vec![0; 1 << 17];
2841        mock_data.extend(vec![1; (1 << 17) - 1]);
2842        mock_data.extend(vec![2; (1 << 18) - 1]);
2843        mock_data.extend(vec![2; 1 << 17]);
2844        mock_data.extend(vec![3; (1 << 17) - 1]);
2845        let mut output: Vec<u8> = Vec::new();
2846        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2847        compressor.set_source(mock_data.as_slice());
2848        compressor.set_drain(&mut output);
2849
2850        compressor.compress();
2851
2852        let mut decoder = FrameDecoder::new();
2853        let mut decoded = Vec::with_capacity(mock_data.len());
2854        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2855        assert_eq!(mock_data, decoded);
2856
2857        let mut decoded = Vec::new();
2858        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
2859        assert_eq!(mock_data, decoded);
2860    }
2861
2862    #[test]
2863    fn rle_compress() {
2864        let mock_data = vec![0; 1 << 19];
2865        let mut output: Vec<u8> = Vec::new();
2866        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2867        compressor.set_source(mock_data.as_slice());
2868        compressor.set_drain(&mut output);
2869
2870        compressor.compress();
2871
2872        let mut decoder = FrameDecoder::new();
2873        let mut decoded = Vec::with_capacity(mock_data.len());
2874        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2875        assert_eq!(mock_data, decoded);
2876    }
2877
2878    #[test]
2879    fn aaa_compress() {
2880        let mock_data = vec![0, 1, 3, 4, 5];
2881        let mut output: Vec<u8> = Vec::new();
2882        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2883        compressor.set_source(mock_data.as_slice());
2884        compressor.set_drain(&mut output);
2885
2886        compressor.compress();
2887
2888        let mut decoder = FrameDecoder::new();
2889        let mut decoded = Vec::with_capacity(mock_data.len());
2890        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2891        assert_eq!(mock_data, decoded);
2892
2893        let mut decoded = Vec::new();
2894        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
2895        assert_eq!(mock_data, decoded);
2896    }
2897
2898    #[test]
2899    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
2900        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2901        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2902        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2903
2904        let mut data = Vec::new();
2905        for _ in 0..8 {
2906            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
2907        }
2908
2909        let mut with_dict = Vec::new();
2910        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2911        let previous = compressor
2912            .set_dictionary_from_bytes(dict_raw)
2913            .expect("dictionary bytes should parse");
2914        assert!(
2915            previous.is_none(),
2916            "first dictionary insert should return None"
2917        );
2918        assert_eq!(
2919            compressor
2920                .set_dictionary(dict_for_encoder)
2921                .expect("valid dictionary should attach")
2922                .expect("set_dictionary_from_bytes inserted previous dictionary")
2923                .id(),
2924            dict_for_decoder.id
2925        );
2926        compressor.set_source(data.as_slice());
2927        compressor.set_drain(&mut with_dict);
2928        compressor.compress();
2929
2930        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2931            .expect("encoded stream should have a frame header");
2932        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
2933
2934        let mut decoder = FrameDecoder::new();
2935        let mut missing_dict_target = Vec::with_capacity(data.len());
2936        let err = decoder
2937            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
2938            .unwrap_err();
2939        assert!(
2940            matches!(
2941                &err,
2942                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
2943            ),
2944            "dict-compressed stream should require dictionary id, got: {err:?}"
2945        );
2946
2947        let mut decoder = FrameDecoder::new();
2948        decoder.add_dict(dict_for_decoder).unwrap();
2949        let mut decoded = Vec::with_capacity(data.len());
2950        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2951        assert_eq!(decoded, data);
2952
2953        let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
2954        let mut ffi_decoded = Vec::with_capacity(data.len());
2955        let ffi_written = ffi_decoder
2956            .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
2957            .unwrap();
2958        assert_eq!(ffi_written, data.len());
2959        assert_eq!(ffi_decoded, data);
2960    }
2961
2962    #[cfg(all(feature = "dict_builder", feature = "std"))]
2963    #[test]
2964    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
2965        use std::io::Cursor;
2966
2967        let mut training = Vec::new();
2968        for idx in 0..256u32 {
2969            training.extend_from_slice(
2970                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
2971            );
2972        }
2973        let mut raw_dict = Vec::new();
2974        crate::dictionary::create_raw_dict_from_source(
2975            Cursor::new(training.as_slice()),
2976            training.len(),
2977            &mut raw_dict,
2978            4096,
2979        )
2980        .expect("dict_builder training should succeed");
2981        assert!(
2982            !raw_dict.is_empty(),
2983            "dict_builder produced an empty dictionary"
2984        );
2985
2986        let dict_id = 0xD1C7_0008;
2987        let encoder_dict =
2988            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2989        let decoder_dict =
2990            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2991
2992        let mut payload = Vec::new();
2993        for idx in 0..96u32 {
2994            payload.extend_from_slice(
2995                format!(
2996                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
2997                )
2998                .as_bytes(),
2999            );
3000        }
3001
3002        let mut without_dict = Vec::new();
3003        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
3004        baseline.set_source(payload.as_slice());
3005        baseline.set_drain(&mut without_dict);
3006        baseline.compress();
3007
3008        let mut with_dict = Vec::new();
3009        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3010        compressor
3011            .set_dictionary(encoder_dict)
3012            .expect("valid dict_builder dictionary should attach");
3013        compressor.set_source(payload.as_slice());
3014        compressor.set_drain(&mut with_dict);
3015        compressor.compress();
3016
3017        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
3018            .expect("encoded stream should have a frame header");
3019        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
3020        let mut decoder = FrameDecoder::new();
3021        decoder.add_dict(decoder_dict).unwrap();
3022        let mut decoded = Vec::with_capacity(payload.len());
3023        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
3024        assert_eq!(decoded, payload);
3025        assert!(
3026            with_dict.len() < without_dict.len(),
3027            "trained dictionary should improve compression for this small payload"
3028        );
3029    }
3030
3031    #[test]
3032    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
3033        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3034        let mut output = Vec::new();
3035        let input = b"";
3036
3037        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3038        let previous = compressor
3039            .set_dictionary_from_bytes(dict_raw)
3040            .expect("dictionary bytes should parse");
3041        assert!(previous.is_none());
3042
3043        compressor.set_source(input.as_slice());
3044        compressor.set_drain(&mut output);
3045        compressor.compress();
3046
3047        assert!(
3048            compressor.state.last_huff_table.is_some(),
3049            "dictionary entropy should seed previous huffman table before first block"
3050        );
3051        assert!(
3052            compressor.state.fse_tables.ll_previous.is_some(),
3053            "dictionary entropy should seed previous ll table before first block"
3054        );
3055        assert!(
3056            compressor.state.fse_tables.ml_previous.is_some(),
3057            "dictionary entropy should seed previous ml table before first block"
3058        );
3059        assert!(
3060            compressor.state.fse_tables.of_previous.is_some(),
3061            "dictionary entropy should seed previous of table before first block"
3062        );
3063    }
3064
3065    // `set_content_size_flag(false)`: the header must omit the FCS field
3066    // (and the single-segment layout that requires it) while the frame
3067    // still round-trips through our decoder.
3068    #[test]
3069    fn content_size_flag_off_omits_fcs_and_roundtrips() {
3070        let payload = alloc::vec![0x42u8; 4096];
3071
3072        let mut compressor: FrameCompressor =
3073            FrameCompressor::new(super::CompressionLevel::Fastest);
3074        let mut with_fcs = Vec::new();
3075        compressor.compress_independent_frame_into(&payload, &mut with_fcs);
3076
3077        compressor.set_content_size_flag(false);
3078        let mut without_fcs = Vec::new();
3079        compressor.compress_independent_frame_into(&payload, &mut without_fcs);
3080
3081        let parsed_with = crate::decoding::frame::read_frame_header(with_fcs.as_slice())
3082            .expect("flag-on frame header must parse")
3083            .0;
3084        assert_eq!(parsed_with.frame_content_size(), 4096);
3085
3086        let parsed_without = crate::decoding::frame::read_frame_header(without_fcs.as_slice())
3087            .expect("flag-off frame header must parse")
3088            .0;
3089        // 0 is the decoder's "unknown content size" sentinel...
3090        assert_eq!(
3091            parsed_without.frame_content_size(),
3092            0,
3093            "FCS must be omitted with the content-size flag off"
3094        );
3095        // ...and the descriptor must confirm the field is ABSENT (0 bytes),
3096        // not present with an explicit zero value.
3097        assert_eq!(
3098            parsed_without
3099                .descriptor
3100                .frame_content_size_bytes()
3101                .expect("descriptor must parse"),
3102            0,
3103            "the FCS field itself must be omitted, not written as zero"
3104        );
3105
3106        let mut decoder = crate::decoding::FrameDecoder::new();
3107        // `decode_all_to_vec` fills existing capacity (no FCS to pre-size
3108        // from with the flag off), so reserve the expected payload upfront.
3109        let mut decoded = Vec::with_capacity(payload.len() + 64);
3110        decoder
3111            .decode_all_to_vec(&without_fcs, &mut decoded)
3112            .expect("flag-off frame must decode");
3113        assert_eq!(decoded, payload);
3114    }
3115
3116    // `set_dictionary_id_flag(false)`: a dict-compressed frame must omit
3117    // the dictionary ID and still decode when the dictionary is handed to
3118    // the decoder explicitly.
3119    #[test]
3120    fn dict_id_flag_off_omits_dictionary_id_and_roundtrips() {
3121        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3122        let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(8);
3123
3124        let mut compressor: FrameCompressor =
3125            FrameCompressor::new(super::CompressionLevel::Fastest);
3126        compressor
3127            .set_dictionary_from_bytes(dict_raw)
3128            .expect("dictionary bytes should parse");
3129        compressor.set_dictionary_id_flag(false);
3130        let mut frame = Vec::new();
3131        compressor.compress_independent_frame_into(&payload, &mut frame);
3132
3133        let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
3134            .expect("frame header must parse")
3135            .0;
3136        assert_eq!(
3137            parsed.dictionary_id(),
3138            None,
3139            "dictionary id must be omitted with the dict-id flag off"
3140        );
3141
3142        // With the ID omitted the decoder cannot look the dictionary up by
3143        // header; hand it explicitly (the `reset_with_dict_handle` path).
3144        let mut sd = crate::decoding::StreamingDecoder::new_with_dictionary_bytes(
3145            frame.as_slice(),
3146            dict_raw,
3147        )
3148        .expect("decoder must accept the dictionary");
3149        let mut dec = Vec::new();
3150        std::io::Read::read_to_end(&mut sd, &mut dec)
3151            .expect("frame must decode with the dictionary handed explicitly");
3152        assert_eq!(dec, payload);
3153    }
3154
3155    // The output reservation must track the observed compression ratio, not
3156    // the whole-input `compress_bound`: a multi-MiB compressible stream's
3157    // output buffer stays at output scale (the old up-front bound held an
3158    // input-sized allocation for the whole frame). Incompressible input may
3159    // still re-estimate to ~the full bound — that is the genuine worst case.
3160    #[test]
3161    fn compressible_stream_output_capacity_stays_at_output_scale() {
3162        // 4 MiB of highly repetitive log-like lines.
3163        let line = b"ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo\n";
3164        let mut input = Vec::with_capacity(4 << 20);
3165        while input.len() < (4 << 20) {
3166            let take = line.len().min((4 << 20) - input.len());
3167            input.extend_from_slice(&line[..take]);
3168        }
3169
3170        let mut compressor: FrameCompressor =
3171            FrameCompressor::new(super::CompressionLevel::Fastest);
3172        let mut out = Vec::new();
3173        compressor.compress_independent_frame_into(&input, &mut out);
3174
3175        assert!(!out.is_empty());
3176        assert!(
3177            out.capacity() < input.len() / 4,
3178            "capacity {} must stay at output scale (input {}, output {})",
3179            out.capacity(),
3180            input.len(),
3181            out.len()
3182        );
3183
3184        // Round-trip: the adaptive reservation must not affect the bytes.
3185        let mut decoder = crate::decoding::FrameDecoder::new();
3186        let mut decoded = Vec::with_capacity(input.len() + 64);
3187        decoder
3188            .decode_all_to_vec(&out, &mut decoded)
3189            .expect("frame must decode");
3190        assert_eq!(decoded, input);
3191    }
3192
3193    // A dictionary frame with a known content size that fits the window
3194    // must take the single-segment layout (reference parity): the
3195    // dictionary is decoder setup state, not part of the regenerated
3196    // segment, so it must not force the windowed multi-segment layout.
3197    #[test]
3198    fn dict_frame_with_known_size_is_single_segment() {
3199        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3200        let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(64);
3201
3202        let mut compressor: FrameCompressor =
3203            FrameCompressor::new(super::CompressionLevel::Fastest);
3204        compressor
3205            .set_dictionary_from_bytes(dict_raw)
3206            .expect("dictionary bytes should parse");
3207        let mut frame = Vec::new();
3208        compressor.compress_independent_frame_into(&payload, &mut frame);
3209
3210        let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
3211            .expect("frame header must parse")
3212            .0;
3213        assert!(
3214            parsed.descriptor.single_segment_flag(),
3215            "dict frame with known size <= window must be single-segment"
3216        );
3217        assert!(parsed.dictionary_id().is_some());
3218        assert_eq!(parsed.frame_content_size(), payload.len() as u64);
3219
3220        // Round-trip through our own decoder with the dictionary.
3221        let mut decoder = crate::decoding::FrameDecoder::new();
3222        decoder
3223            .add_dict_from_bytes(dict_raw)
3224            .expect("decoder must accept the dictionary");
3225        let mut decoded = Vec::with_capacity(payload.len() + 64);
3226        decoder
3227            .decode_all_to_vec(&frame, &mut decoded)
3228            .expect("single-segment dict frame must decode");
3229        assert_eq!(decoded, payload);
3230
3231        // Reference-decoder cross-check: our serializer and parser could
3232        // share a bug and still self-roundtrip, so the on-wire
3233        // single-segment + dictionary-ID layout must also decode through
3234        // the C implementation.
3235        let ffi_decoded = zstd::bulk::Decompressor::with_dictionary(dict_raw)
3236            .expect("reference decompressor must accept the dictionary")
3237            .decompress(&frame, payload.len() + 64)
3238            .expect("reference zstd must accept the single-segment dict frame");
3239        assert_eq!(ffi_decoded, payload);
3240    }
3241
3242    // Regression test: `heap_size()` must count the retained Huffman tables
3243    // (the active `last_huff_table` and the recycled `huff_table_spare`).
3244    // A reused context that parks a table would otherwise under-report its
3245    // footprint through the public size API.
3246    #[test]
3247    fn heap_size_counts_active_and_spare_huffman_tables() {
3248        let mut compressor: FrameCompressor =
3249            FrameCompressor::new(super::CompressionLevel::Fastest);
3250        let base = compressor.heap_size();
3251
3252        let active = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
3253            b"abacabadabacabaeabacabadabacaba",
3254        );
3255        let active_bytes = active.heap_size();
3256        assert!(active_bytes > 0, "built table must own heap buffers");
3257        compressor.state.last_huff_table = Some(active);
3258        assert_eq!(
3259            compressor.heap_size(),
3260            base + active_bytes,
3261            "heap_size must include the active last_huff_table"
3262        );
3263
3264        let spare = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
3265            b"the quick brown fox jumps over the lazy dog",
3266        );
3267        let spare_bytes = spare.heap_size();
3268        assert!(spare_bytes > 0, "built table must own heap buffers");
3269        compressor.state.huff_table_spare = Some(spare);
3270        assert_eq!(
3271            compressor.heap_size(),
3272            base + active_bytes + spare_bytes,
3273            "heap_size must include the parked huff_table_spare"
3274        );
3275    }
3276
3277    #[test]
3278    fn set_encoder_dictionary_reattaches_prepared_dict_without_reparse() {
3279        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3280        let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3281              tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3282
3283        // Prepare the EncoderDictionary once, then attach it via the prepared-
3284        // dictionary API (no raw-blob reparse at attach time).
3285        let prepared =
3286            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3287        let dict_id = prepared.id();
3288
3289        let mut with_dict = Vec::new();
3290        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3291        let previous = compressor
3292            .set_encoder_dictionary(prepared)
3293            .expect("prepared dictionary should attach");
3294        assert!(previous.is_none());
3295        compressor.set_source(payload.as_slice());
3296        compressor.set_drain(&mut with_dict);
3297        compressor.compress();
3298        // clear_dictionary hands the prepared dictionary back (last use of
3299        // `compressor`, so its `&mut with_dict` drain borrow ends here).
3300        let returned = compressor
3301            .clear_dictionary()
3302            .expect("dictionary was attached");
3303        assert_eq!(returned.id(), dict_id);
3304
3305        // The reattached dictionary drives the frame: its id is advertised and
3306        // the stream round-trips through a decoder primed with the same dict.
3307        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
3308            .expect("encoded stream should have a frame header");
3309        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
3310        let decoder_dict = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
3311        let mut decoder = FrameDecoder::new();
3312        decoder.add_dict(decoder_dict).unwrap();
3313        let mut decoded = Vec::with_capacity(payload.len());
3314        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
3315        assert_eq!(decoded.as_slice(), payload.as_slice());
3316
3317        // The dictionary handed back by clear_dictionary reattaches to another
3318        // compressor without touching the raw bytes again, producing an
3319        // identical frame.
3320        let mut with_dict2 = Vec::new();
3321        let mut compressor2 = FrameCompressor::new(super::CompressionLevel::Fastest);
3322        compressor2
3323            .set_encoder_dictionary(returned)
3324            .expect("returned dictionary should reattach");
3325        compressor2.set_source(payload.as_slice());
3326        compressor2.set_drain(&mut with_dict2);
3327        compressor2.compress();
3328        assert_eq!(
3329            with_dict2, with_dict,
3330            "reattached prepared dict must produce an identical frame"
3331        );
3332    }
3333
3334    #[test]
3335    fn dict_primed_matcher_snapshot_reused_across_frames_is_byte_identical() {
3336        // CDict-equivalent: a compressor reused across frames with the same
3337        // dictionary restores the primed matcher snapshot on frames 2..N
3338        // (a table copy) instead of re-hashing the dictionary. The restored
3339        // state must reproduce the first-frame (freshly-primed) output
3340        // byte-for-byte, and every frame must round-trip through a
3341        // dict-primed decoder.
3342        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3343        // Source must exceed the Fast strategy's 8 KiB attach cutoff so the
3344        // copy-snapshot (restore) path is taken on frame 2 — at or below the
3345        // cutoff the donor attaches by reference and we fall back to re-prime,
3346        // which would not exercise restore.
3347        let mut payload = Vec::new();
3348        while payload.len() < 16 * 1024 {
3349            payload.extend_from_slice(
3350                b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n",
3351            );
3352        }
3353
3354        let prepared =
3355            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3356        let dict_id = prepared.id();
3357        let mut compressor: FrameCompressor =
3358            FrameCompressor::new(super::CompressionLevel::Fastest);
3359        compressor
3360            .set_encoder_dictionary(prepared)
3361            .expect("prepared dictionary should attach");
3362
3363        // Frame 1 primes + captures the snapshot; frame 2 restores it.
3364        let frame1 = compressor.compress_independent_frame(payload.as_slice());
3365        let frame2 = compressor.compress_independent_frame(payload.as_slice());
3366        assert_eq!(
3367            frame1, frame2,
3368            "restored prime snapshot must reproduce the freshly-primed frame byte-for-byte"
3369        );
3370
3371        // Both frames advertise the dict id and round-trip through a
3372        // dict-primed decoder.
3373        for frame in [&frame1, &frame2] {
3374            let (hdr, _) =
3375                crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3376            assert_eq!(hdr.dictionary_id(), Some(dict_id));
3377            let mut decoder = FrameDecoder::new();
3378            decoder
3379                .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3380                .unwrap();
3381            let mut decoded = Vec::with_capacity(payload.len());
3382            decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3383            assert_eq!(decoded.as_slice(), payload.as_slice());
3384        }
3385    }
3386
3387    #[test]
3388    fn dict_primed_matcher_cache_reused_across_small_attach_frames_is_byte_identical() {
3389        // CDict-equivalent ATTACH path (small source, at/below the Fast 8 KiB
3390        // attach cutoff): frames 2..N re-prime — re-committing the dict bytes
3391        // to history — but reuse the already-built dict table instead of
3392        // re-hashing it. The cached-table frame must reproduce the
3393        // freshly-primed first frame byte-for-byte, and a fresh single-frame
3394        // compressor (no prior dict cache) must produce the identical bytes
3395        // too, proving the cache changes timing, not output.
3396        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3397        // Stay under the 8 KiB cutoff so the attach (re-prime) path is taken
3398        // every frame rather than the copy-snapshot restore.
3399        let mut payload = Vec::new();
3400        while payload.len() < 2 * 1024 {
3401            payload.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3402        }
3403
3404        let prepared =
3405            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3406        let dict_id = prepared.id();
3407        let mut compressor: FrameCompressor =
3408            FrameCompressor::new(super::CompressionLevel::Fastest);
3409        compressor
3410            .set_encoder_dictionary(prepared)
3411            .expect("prepared dictionary should attach");
3412
3413        // Frame 1 builds + marks the dict table; frame 2 reuses it.
3414        let frame1 = compressor.compress_independent_frame(payload.as_slice());
3415        let frame2 = compressor.compress_independent_frame(payload.as_slice());
3416        assert_eq!(
3417            frame1, frame2,
3418            "reused dict table (attach path) must reproduce the freshly-built frame byte-for-byte"
3419        );
3420
3421        // A fresh compressor (cold dict cache) must emit the same bytes — the
3422        // cache is a timing optimization, never a content change.
3423        let fresh_prepared =
3424            super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3425        let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3426        fresh
3427            .set_encoder_dictionary(fresh_prepared)
3428            .expect("prepared dictionary should attach");
3429        let fresh_frame = fresh.compress_independent_frame(payload.as_slice());
3430        assert_eq!(
3431            fresh_frame, frame1,
3432            "cold-cache compressor must match the warm-cache frame byte-for-byte"
3433        );
3434
3435        for frame in [&frame1, &frame2] {
3436            let (hdr, _) =
3437                crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3438            assert_eq!(hdr.dictionary_id(), Some(dict_id));
3439            let mut decoder = FrameDecoder::new();
3440            decoder
3441                .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3442                .unwrap();
3443            let mut decoded = Vec::with_capacity(payload.len());
3444            decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3445            assert_eq!(decoded.as_slice(), payload.as_slice());
3446        }
3447    }
3448
3449    #[test]
3450    fn dict_fast_epoch_reset_many_frames_and_attach_copy_alternation_byte_identical() {
3451        // The Fast attach path invalidates the main hash table between
3452        // frames with an epoch-bias advance instead of a memset. Two things
3453        // need proving against a fresh-compressor reference:
3454        // 1. the bias accumulates across MANY reused frames without ever
3455        //    letting a stale entry through (every frame byte-identical);
3456        // 2. crossing the 8 KiB attach/copy cutoff in both directions
3457        //    (attach → copy clears the bias for the raw-slice kernel,
3458        //    copy → attach re-enters epoch mode) stays byte-identical.
3459        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3460        let mut small = Vec::new();
3461        while small.len() < 2 * 1024 {
3462            small.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3463        }
3464        // Over the Fast 8 KiB attach cutoff → copy-mode frame.
3465        let mut large = Vec::new();
3466        while large.len() < 64 * 1024 {
3467            large.extend_from_slice(b"tenant=demo op=scan range=[k0,k9) limit=500 order=asc\n");
3468        }
3469
3470        let mut reused: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3471        reused
3472            .set_encoder_dictionary(
3473                super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse"),
3474            )
3475            .expect("prepared dictionary should attach");
3476
3477        let reference = |payload: &[u8]| -> alloc::vec::Vec<u8> {
3478            let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3479            fresh
3480                .set_encoder_dictionary(
3481                    super::EncoderDictionary::from_bytes(dict_raw)
3482                        .expect("dict bytes should parse"),
3483                )
3484                .expect("prepared dictionary should attach");
3485            fresh.compress_independent_frame(payload)
3486        };
3487
3488        let small_expected = reference(&small);
3489        let large_expected = reference(&large);
3490
3491        // 1. Long attach-only run: every frame advances the epoch bias.
3492        for i in 0..32 {
3493            let frame = reused.compress_independent_frame(small.as_slice());
3494            assert_eq!(
3495                frame, small_expected,
3496                "attach frame {i} diverged from the fresh-compressor reference"
3497            );
3498        }
3499        // 2. Cutoff alternation: attach → copy → attach → copy.
3500        for i in 0..4 {
3501            let frame = reused.compress_independent_frame(large.as_slice());
3502            assert_eq!(
3503                frame, large_expected,
3504                "copy frame {i} diverged from the fresh-compressor reference"
3505            );
3506            let frame = reused.compress_independent_frame(small.as_slice());
3507            assert_eq!(
3508                frame, small_expected,
3509                "attach frame after copy {i} diverged from the fresh-compressor reference"
3510            );
3511        }
3512    }
3513
3514    #[test]
3515    fn dict_primed_btlazy2_reused_across_attach_and_copy_boundary_is_byte_identical() {
3516        // Btlazy2 (Level 15) uses the 32 KiB dict attach/copy cutoff in
3517        // prepare_frame. Exercise BOTH sides of that boundary on a reused
3518        // compressor: a sub-cutoff payload (re-prime/attach path) and an
3519        // over-cutoff payload (copy-snapshot restore path). In each case the
3520        // warm-cache second frame must reproduce the cold-cache first frame
3521        // byte-for-byte (the dict cache is a timing optimization, never a
3522        // content change), and every frame must round-trip.
3523        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3524        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3525            .expect("dict bytes should parse")
3526            .id();
3527        // Distinct lines so the payload does not trivially self-compress; the
3528        // BT finder + dict dual-probe both get exercised.
3529        let make_payload = |target: usize| {
3530            let mut p = Vec::with_capacity(target);
3531            let mut i = 0u64;
3532            while p.len() < target {
3533                p.extend_from_slice(
3534                    format!(
3535                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3536                        i % 97
3537                    )
3538                    .as_bytes(),
3539                );
3540                i += 1;
3541            }
3542            p
3543        };
3544        // Below the 32 KiB cutoff (attach/re-prime) and above it (copy-snapshot).
3545        for target in [16 * 1024usize, 64 * 1024usize] {
3546            let payload = make_payload(target);
3547            let mut warm: FrameCompressor =
3548                FrameCompressor::new(super::CompressionLevel::Level(15));
3549            warm.set_encoder_dictionary(
3550                super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3551            )
3552            .expect("dict attach");
3553            // Frame 1 builds + marks the dict tables; frame 2 reuses them.
3554            let frame1 = warm.compress_independent_frame(payload.as_slice());
3555            let frame2 = warm.compress_independent_frame(payload.as_slice());
3556            assert_eq!(
3557                frame1, frame2,
3558                "reused dict cache must reproduce the freshly-primed frame byte-for-byte \
3559                 (Level 15, target={target})"
3560            );
3561            // Cold-cache compressor: must match the warm-cache bytes.
3562            let mut cold: FrameCompressor =
3563                FrameCompressor::new(super::CompressionLevel::Level(15));
3564            cold.set_encoder_dictionary(
3565                super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3566            )
3567            .expect("dict attach");
3568            let cold_frame = cold.compress_independent_frame(payload.as_slice());
3569            assert_eq!(
3570                cold_frame, frame1,
3571                "cold-cache compressor must match warm-cache frame (Level 15, target={target})"
3572            );
3573            // Round-trip through a decoder primed with the same dict.
3574            for frame in [&frame1, &frame2] {
3575                let (hdr, _) = crate::decoding::frame::read_frame_header(frame.as_slice())
3576                    .expect("frame header");
3577                assert_eq!(hdr.dictionary_id(), Some(dict_id));
3578                let mut decoder = FrameDecoder::new();
3579                decoder
3580                    .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3581                    .unwrap();
3582                let mut decoded = Vec::with_capacity(payload.len());
3583                decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3584                assert_eq!(decoded.as_slice(), payload.as_slice());
3585            }
3586        }
3587    }
3588
3589    #[test]
3590    fn dict_primed_btultra2_restore_is_floor_safe_and_byte_identical() {
3591        // Regression guard for the dictionary primed-snapshot RESTORE path on
3592        // the binary-tree (btultra2 / Level 22) backend — the path a minimal /
3593        // decoupled prepared-dict refactor rewrites.
3594        //
3595        // The trap it pins: a reused compressor compresses frame A (which fills
3596        // the live hash/chain tables with frame-A positions and advances the
3597        // window floor), then frame B of the SAME resolved shape (same size →
3598        // same PrimedKey → the snapshot RESTORE path) but DIFFERENT content. The
3599        // restore must reinstate the clean post-prime dict state with NO live
3600        // frame-A entries surviving above the restored floor; a restore that
3601        // leaks stale frame-A positions would surface FALSE matches and produce
3602        // a different (or undecodable) frame B. The invariant: a snapshot
3603        // restore is a pure timing optimization and MUST be byte-identical to a
3604        // cold compressor compressing frame B from scratch, and must round-trip.
3605        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3606        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3607            .expect("dict bytes should parse")
3608            .id();
3609        // 48 KiB > the btultra2 8 KiB attach cutoff → the copy-snapshot
3610        // capture/restore path. Two distinct payloads of the SAME size so frame
3611        // B resolves to frame A's snapshot key and takes the restore path.
3612        let make_payload = |seed: u64, target: usize| {
3613            let mut p = Vec::with_capacity(target);
3614            let mut i = seed;
3615            while p.len() < target {
3616                p.extend_from_slice(
3617                    format!(
3618                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3619                        i % 89
3620                    )
3621                    .as_bytes(),
3622                );
3623                i = i.wrapping_add(1);
3624            }
3625            p.truncate(target);
3626            p
3627        };
3628        let size = 48 * 1024usize;
3629        let frame_a = make_payload(0, size);
3630        let frame_b = make_payload(1_000_000, size);
3631
3632        let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3633        warm.set_encoder_dictionary(
3634            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3635        )
3636        .expect("dict attach");
3637        // Frame A: cold cache — primes the dict + captures the snapshot, and
3638        // fills the live tables with frame-A positions.
3639        let _wa = warm.compress_independent_frame(frame_a.as_slice());
3640        // Frame B: warm cache — takes the snapshot RESTORE path (same size).
3641        let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3642
3643        // Cold compressor compressing frame B from scratch: the ground truth.
3644        let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3645        cold.set_encoder_dictionary(
3646            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3647        )
3648        .expect("dict attach");
3649        let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3650
3651        assert_eq!(
3652            warm_b, cold_b,
3653            "frame B via snapshot restore must be byte-identical to a cold compress \
3654             (a restore that leaks frame-A live-table entries would diverge here)"
3655        );
3656
3657        // Round-trip frame B through a dict-primed decoder.
3658        let (hdr, _) =
3659            crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3660        assert_eq!(hdr.dictionary_id(), Some(dict_id));
3661        let mut decoder = FrameDecoder::new();
3662        decoder
3663            .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3664            .unwrap();
3665        let mut decoded = Vec::with_capacity(frame_b.len());
3666        decoder
3667            .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3668            .unwrap();
3669        assert_eq!(decoded.as_slice(), frame_b.as_slice());
3670    }
3671
3672    #[test]
3673    fn dict_primed_btultra2_ldm_restore_is_byte_identical() {
3674        // Same restore-path byte-identity guard as
3675        // `dict_primed_btultra2_restore_is_floor_safe_and_byte_identical`, but
3676        // with long-distance matching ENABLED. The BtMatcher's LDM producer is
3677        // part of the snapshot; a refactor that decouples it (so the snapshot
3678        // does not retain the empty LDM table) must reinstate an equivalent
3679        // empty producer on restore. This pins that the warm-cache (restore)
3680        // frame stays byte-identical to a cold compress when LDM is on.
3681        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3682        let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3683            .expect("dict bytes should parse")
3684            .id();
3685        let make_payload = |seed: u64, target: usize| {
3686            let mut p = Vec::with_capacity(target);
3687            let mut i = seed;
3688            while p.len() < target {
3689                p.extend_from_slice(
3690                    format!(
3691                        "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3692                        i % 89
3693                    )
3694                    .as_bytes(),
3695                );
3696                i = i.wrapping_add(1);
3697            }
3698            p.truncate(target);
3699            p
3700        };
3701        let ldm_params =
3702            crate::encoding::CompressionParameters::builder(super::CompressionLevel::Level(22))
3703                .enable_long_distance_matching(true)
3704                .build()
3705                .expect("LDM-only params build");
3706        let size = 48 * 1024usize;
3707        let frame_a = make_payload(0, size);
3708        let frame_b = make_payload(1_000_000, size);
3709
3710        let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3711        warm.set_parameters(&ldm_params);
3712        warm.set_encoder_dictionary(
3713            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3714        )
3715        .expect("dict attach");
3716        let _wa = warm.compress_independent_frame(frame_a.as_slice());
3717        let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3718
3719        let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3720        cold.set_parameters(&ldm_params);
3721        cold.set_encoder_dictionary(
3722            super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3723        )
3724        .expect("dict attach");
3725        let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3726
3727        assert_eq!(
3728            warm_b, cold_b,
3729            "LDM-on frame B via snapshot restore must be byte-identical to a cold compress"
3730        );
3731
3732        let (hdr, _) =
3733            crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3734        assert_eq!(hdr.dictionary_id(), Some(dict_id));
3735        let mut decoder = FrameDecoder::new();
3736        decoder
3737            .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3738            .unwrap();
3739        let mut decoded = Vec::with_capacity(frame_b.len());
3740        decoder
3741            .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3742            .unwrap();
3743        assert_eq!(decoded.as_slice(), frame_b.as_slice());
3744    }
3745
3746    #[test]
3747    fn set_dictionary_from_bytes_matches_full_decode_byte_for_byte() {
3748        // The encoder-only dict parse (`decode_dict_for_encoding`, used by
3749        // `set_dictionary_from_bytes`) skips the FSE/HUF decoder-table build and
3750        // the enrich passes. The encoder entropy tables are derived purely from
3751        // the symbol probabilities / Huffman weights, so the compressed output
3752        // MUST be byte-identical to the full-decode path. This pins the
3753        // load-bearing equivalence so a future FSE/HUF parsing refactor that
3754        // still round-trips but silently diverges on the probabilities/weights
3755        // fails loudly here instead of producing a different (but valid) frame.
3756        let dict_raw = include_bytes!("../../dict_tests/dictionary");
3757        let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3758              tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3759
3760        // Path A: encoder-only parse straight from the raw blob.
3761        let mut from_bytes_out = Vec::new();
3762        {
3763            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3764            compressor
3765                .set_dictionary_from_bytes(dict_raw)
3766                .expect("dictionary bytes should parse");
3767            compressor.set_source(payload.as_slice());
3768            compressor.set_drain(&mut from_bytes_out);
3769            compressor.compress();
3770        }
3771
3772        // Path B: full decode (builds the decoder tables too), then attach for
3773        // encoding via the `Dictionary` setter.
3774        let full_decode = crate::decoding::Dictionary::decode_dict(dict_raw)
3775            .expect("dictionary bytes should fully decode");
3776        let mut full_decode_out = Vec::new();
3777        {
3778            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3779            compressor
3780                .set_dictionary(full_decode)
3781                .expect("full-decode dictionary should attach");
3782            compressor.set_source(payload.as_slice());
3783            compressor.set_drain(&mut full_decode_out);
3784            compressor.compress();
3785        }
3786
3787        assert_eq!(
3788            from_bytes_out, full_decode_out,
3789            "encoder-only dict parse must produce byte-identical output to the full decode"
3790        );
3791    }
3792
3793    #[test]
3794    fn set_dictionary_rejects_zero_dictionary_id() {
3795        let invalid = crate::decoding::Dictionary {
3796            id: 0,
3797            fse: crate::decoding::scratch::FSEScratch::new(),
3798            huf: crate::decoding::scratch::HuffmanScratch::new(),
3799            dict_content: vec![1, 2, 3],
3800            offset_hist: [1, 4, 8],
3801        };
3802
3803        let mut compressor: FrameCompressor<
3804            &[u8],
3805            Vec<u8>,
3806            crate::encoding::match_generator::MatchGeneratorDriver,
3807        > = FrameCompressor::new(super::CompressionLevel::Fastest);
3808        let result = compressor.set_dictionary(invalid);
3809        assert!(matches!(
3810            result,
3811            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
3812        ));
3813    }
3814
3815    #[test]
3816    fn set_dictionary_rejects_zero_repeat_offsets() {
3817        let invalid = crate::decoding::Dictionary {
3818            id: 1,
3819            fse: crate::decoding::scratch::FSEScratch::new(),
3820            huf: crate::decoding::scratch::HuffmanScratch::new(),
3821            dict_content: vec![1, 2, 3],
3822            offset_hist: [0, 4, 8],
3823        };
3824
3825        let mut compressor: FrameCompressor<
3826            &[u8],
3827            Vec<u8>,
3828            crate::encoding::match_generator::MatchGeneratorDriver,
3829        > = FrameCompressor::new(super::CompressionLevel::Fastest);
3830        let result = compressor.set_dictionary(invalid);
3831        assert!(matches!(
3832            result,
3833            Err(
3834                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
3835                    index: 0
3836                }
3837            )
3838        ));
3839    }
3840
3841    #[test]
3842    fn uncompressed_mode_does_not_require_dictionary() {
3843        let dict_id = 0xABCD_0001;
3844        let dict =
3845            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
3846                .expect("raw dictionary should be valid");
3847
3848        let payload = b"plain-bytes-that-should-stay-raw";
3849        let mut output = Vec::new();
3850        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
3851        compressor
3852            .set_dictionary(dict)
3853            .expect("dictionary should attach in uncompressed mode");
3854        compressor.set_source(payload.as_slice());
3855        compressor.set_drain(&mut output);
3856        compressor.compress();
3857
3858        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3859            .expect("encoded frame should have a header");
3860        assert_eq!(
3861            frame_header.dictionary_id(),
3862            None,
3863            "raw/uncompressed frames must not advertise dictionary dependency"
3864        );
3865
3866        let mut decoder = FrameDecoder::new();
3867        let mut decoded = Vec::with_capacity(payload.len());
3868        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3869        assert_eq!(decoded, payload);
3870    }
3871
3872    #[test]
3873    fn default_level_tiny_raw_dict_compresses_cleanly() {
3874        // Coverage for the dfast dict-attach fast path with a
3875        // sub-min-match raw-content dictionary: the dict-table probe in
3876        // `start_matching_fast_loop` is gated on the dict table actually
3877        // existing (`table().is_some()`), not merely on `is_attached()`,
3878        // so a dictionary whose hashable region is shorter than the
3879        // short-hash lookahead (where `prime_dict_tables_for_range`
3880        // returns before allocating the tables) never dereferences a
3881        // null dict pointer. Compressing at the default (dfast) level
3882        // with such a dict must succeed.
3883        let dict_id = 0xABCD_0009;
3884        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3885            .expect("raw dictionary should be valid");
3886        let payload = b"the quick brown fox jumps over the lazy dog, repeatedly and at length";
3887        let mut output = Vec::new();
3888        let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
3889        compressor
3890            .set_dictionary(dict)
3891            .expect("tiny raw dictionary should attach");
3892        compressor.set_source(payload.as_slice());
3893        compressor.set_drain(&mut output);
3894        compressor.compress();
3895        assert!(!output.is_empty(), "compression should produce a frame");
3896
3897        // The emitted frame must advertise the attached dictionary id, proving
3898        // the tiny-dict path stayed active (the payload round-trips either way,
3899        // so without this the test would also pass on a silent no-dict frame).
3900        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3901            .expect("encoded frame should have a readable header");
3902        assert_eq!(
3903            frame_header.dictionary_id(),
3904            Some(dict_id),
3905            "tiny raw dict frame should still advertise its dictionary id",
3906        );
3907
3908        // Full roundtrip: decode the dict-compressed frame with the SAME
3909        // dictionary attached and confirm byte-exact recovery — proves the
3910        // tiny-dict fast path produces a correct frame, not just a non-empty
3911        // one.
3912        let decode_dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3913            .expect("raw dictionary should be valid");
3914        let mut decoder = FrameDecoder::new();
3915        decoder
3916            .add_dict(decode_dict)
3917            .expect("decoder dict should attach");
3918        let mut decoded = Vec::with_capacity(payload.len());
3919        decoder
3920            .decode_all_to_vec(&output, &mut decoded)
3921            .expect("dict roundtrip should decode");
3922        assert_eq!(decoded, payload, "tiny-dict roundtrip mismatch");
3923    }
3924
3925    /// Exercises the dictionary dual-probe (live + immutable dict tables)
3926    /// in the Fast / dfast / Row match finders with a dict whose content
3927    /// the payload actually reuses, so each backend's dict long/short
3928    /// probe (and the dfast `ip+1` dict-long retry) is reached and the
3929    /// dict-compressed frame round-trips through a decoder primed with the
3930    /// same dict. The 3-byte-dict test above only proves the null-table
3931    /// guard; this proves the full attach path produces correct frames.
3932    #[test]
3933    fn dict_attach_roundtrips_across_backends_with_matching_payload() {
3934        let dict_id = 0xD1C7_0001;
3935        // Distinct lines so the payload does NOT self-compress: each line
3936        // appears exactly once in the payload, so without the dictionary there
3937        // are no in-frame back-references to exploit. The dictionary holds the
3938        // SAME lines, so the only way the output shrinks is if the dict probe
3939        // actually fires. A no-dict baseline below pins that the dict path ran
3940        // (self-compressible payloads would round-trip + stay small via
3941        // in-frame matches alone, proving nothing).
3942        let line = |i: u32| {
3943            alloc::format!(
3944                "ts=2026-03-26T21:{:02}:{:02}Z level=INFO msg=\"event {i:05}\" tenant=t{i} region=eu\n",
3945                i / 60 % 60,
3946                i % 60,
3947            )
3948            .into_bytes()
3949        };
3950        let mut dict_content = Vec::new();
3951        for i in 0..256u32 {
3952            dict_content.extend_from_slice(&line(i));
3953        }
3954        // Payload = the same distinct lines in a different (stride) order, each
3955        // once → no self-repeats, every line is a dictionary match.
3956        let mut payload = Vec::new();
3957        let mut i = 0u32;
3958        for _ in 0..256u32 {
3959            payload.extend_from_slice(&line(i));
3960            i = (i + 97) % 256; // coprime stride → permutation, no adjacency
3961        }
3962
3963        let compress_at = |level, dict: Option<Vec<u8>>| -> Vec<u8> {
3964            let mut compressor = FrameCompressor::new(level);
3965            if let Some(bytes) = dict {
3966                let d = crate::decoding::Dictionary::from_raw_content(dict_id, bytes)
3967                    .expect("raw dictionary should be valid");
3968                compressor
3969                    .set_dictionary(d)
3970                    .expect("dictionary should attach");
3971            }
3972            let mut out = Vec::new();
3973            compressor.set_source(payload.as_slice());
3974            compressor.set_drain(&mut out);
3975            compressor.compress();
3976            out
3977        };
3978
3979        for level in [
3980            super::CompressionLevel::Level(-5), // Fast (negative)
3981            super::CompressionLevel::Level(1),  // Fast
3982            super::CompressionLevel::Default,   // dfast (L3)
3983            super::CompressionLevel::Level(8),  // Row-backed lazy2
3984        ] {
3985            let out = compress_at(level, Some(dict_content.clone()));
3986            let no_dict = compress_at(level, None);
3987            // The dict path MUST measurably beat no-dict on this
3988            // non-self-compressible payload — otherwise the dict probe never
3989            // fired and the roundtrip below would prove nothing.
3990            assert!(
3991                out.len() < no_dict.len(),
3992                "level {level:?}: dict-primed output ({}) must beat no-dict ({}) — dict probe did not fire",
3993                out.len(),
3994                no_dict.len(),
3995            );
3996
3997            let ddict =
3998                crate::decoding::Dictionary::from_raw_content(dict_id, dict_content.clone())
3999                    .expect("raw dictionary should be valid");
4000            let mut decoder = FrameDecoder::new();
4001            decoder.add_dict(ddict).expect("decoder dict should attach");
4002            let mut decoded = Vec::with_capacity(payload.len());
4003            decoder
4004                .decode_all_to_vec(&out, &mut decoded)
4005                .unwrap_or_else(|e| panic!("level {level:?}: dict roundtrip decode failed: {e:?}"));
4006            assert_eq!(decoded, payload, "level {level:?}: dict roundtrip mismatch");
4007        }
4008    }
4009
4010    /// Reusing one compressor across independent frames with DIFFERENT
4011    /// dictionaries must drop the per-backend dict cache on each swap
4012    /// (Simple/Dfast/Row keep the attach index across frames). Without the
4013    /// invalidation a later frame would reuse the previous dict's rows.
4014    /// Each frame round-trips through a decoder primed with its own dict.
4015    #[test]
4016    fn dict_swap_across_reused_compressor_roundtrips() {
4017        // Distinct lines per dict (not a single repeated line) so payloads do
4018        // NOT self-compress: each line appears once, so a frame only shrinks if
4019        // the dict probe fires, and — crucially for the invalidation check — if
4020        // frame B reused dict A's stale rows it would emit offsets into A's
4021        // distinct content, which decode under dict B reconstructs as WRONG
4022        // bytes (caught by the roundtrip). A single repeated line would hide
4023        // pollution behind in-frame matches.
4024        let lines = |tag: &str| -> (Vec<u8>, Vec<u8>) {
4025            let line =
4026                |i: u32| alloc::format!("{tag} record {i:05} field=value{i} end\n").into_bytes();
4027            let mut dict = Vec::new();
4028            for i in 0..256u32 {
4029                dict.extend_from_slice(&line(i));
4030            }
4031            let mut payload = Vec::new();
4032            let mut i = 0u32;
4033            for _ in 0..256u32 {
4034                payload.extend_from_slice(&line(i));
4035                i = (i + 97) % 256;
4036            }
4037            (dict, payload)
4038        };
4039        let (dict_a, payload_a) = lines("alpha");
4040        let (dict_b, payload_b) = lines("bravo");
4041
4042        for level in [
4043            super::CompressionLevel::Default,
4044            super::CompressionLevel::Level(8),
4045        ] {
4046            let no_dict = |payload: &[u8]| -> usize {
4047                let mut c: FrameCompressor = FrameCompressor::new(level);
4048                c.compress_independent_frame(payload).len()
4049            };
4050            let no_dict_a = no_dict(&payload_a);
4051            let no_dict_b = no_dict(&payload_b);
4052
4053            let mut compressor: FrameCompressor = FrameCompressor::new(level);
4054            for (dict_bytes, payload, no_dict_len) in [
4055                (&dict_a, &payload_a, no_dict_a),
4056                (&dict_b, &payload_b, no_dict_b),
4057            ] {
4058                let dict =
4059                    crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
4060                        .expect("raw dictionary should be valid");
4061                compressor
4062                    .set_dictionary(dict)
4063                    .expect("dictionary should attach");
4064                let out = compressor.compress_independent_frame(payload.as_slice());
4065                assert!(
4066                    out.len() < no_dict_len,
4067                    "level {level:?}: dict frame ({}) must beat no-dict ({}) — dict probe did not fire",
4068                    out.len(),
4069                    no_dict_len,
4070                );
4071
4072                let ddict =
4073                    crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
4074                        .expect("raw dictionary should be valid");
4075                let mut decoder = FrameDecoder::new();
4076                decoder.add_dict(ddict).expect("decoder dict should attach");
4077                let mut decoded = Vec::with_capacity(payload.len());
4078                decoder
4079                    .decode_all_to_vec(&out, &mut decoded)
4080                    .unwrap_or_else(|e| panic!("level {level:?}: dict-swap decode failed: {e:?}"));
4081                assert_eq!(
4082                    decoded, *payload,
4083                    "level {level:?}: dict-swap roundtrip mismatch (stale dict rows?)"
4084                );
4085            }
4086        }
4087    }
4088
4089    #[test]
4090    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
4091        use crate::encoding::match_generator::MatchGeneratorDriver;
4092
4093        let dict_id = 0xABCD_0002;
4094        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4095            .expect("raw dictionary should be valid");
4096        let dict_for_decoder =
4097            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4098                .expect("raw dictionary should be valid");
4099
4100        // Payload must exceed the encoder's advertised window (512 KiB
4101        // for Fastest after `window_log = 19` alignment with donor's
4102        // L1 fast row in `clevels.h`) so the test actually exercises
4103        // cross-window-boundary behavior.
4104        let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
4105        let matcher = MatchGeneratorDriver::new(1024, 1);
4106
4107        let mut no_dict_output = Vec::new();
4108        let mut no_dict_compressor =
4109            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4110        no_dict_compressor.set_source(payload.as_slice());
4111        no_dict_compressor.set_drain(&mut no_dict_output);
4112        no_dict_compressor.compress();
4113        let (no_dict_frame_header, _) =
4114            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
4115                .expect("baseline frame should have a header");
4116        let no_dict_window = no_dict_frame_header
4117            .window_size()
4118            .expect("window size should be present");
4119
4120        let mut output = Vec::new();
4121        let matcher = MatchGeneratorDriver::new(1024, 1);
4122        let mut compressor =
4123            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4124        compressor
4125            .set_dictionary(dict)
4126            .expect("dictionary should attach");
4127        compressor.set_source(payload.as_slice());
4128        compressor.set_drain(&mut output);
4129        compressor.compress();
4130
4131        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4132            .expect("encoded frame should have a header");
4133        let advertised_window = frame_header
4134            .window_size()
4135            .expect("window size should be present");
4136        assert_eq!(
4137            advertised_window, no_dict_window,
4138            "dictionary priming must not inflate advertised window size"
4139        );
4140        assert!(
4141            payload.len() > advertised_window as usize,
4142            "test must cross the advertised window boundary"
4143        );
4144
4145        let mut decoder = FrameDecoder::new();
4146        decoder.add_dict(dict_for_decoder).unwrap();
4147        let mut decoded = Vec::with_capacity(payload.len());
4148        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4149        assert_eq!(decoded, payload);
4150    }
4151
4152    #[test]
4153    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
4154        let dict_id = 0xABCD_0004;
4155        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
4156        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
4157        let dict_for_decoder =
4158            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
4159        let payload = b"abcdabcdabcdabcd".repeat(128);
4160
4161        let mut hinted_output = Vec::new();
4162        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
4163        hinted.set_dictionary(dict).unwrap();
4164        hinted.set_source_size_hint(1);
4165        hinted.set_source(payload.as_slice());
4166        hinted.set_drain(&mut hinted_output);
4167        hinted.compress();
4168
4169        let mut no_hint_output = Vec::new();
4170        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
4171        no_hint
4172            .set_dictionary(
4173                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
4174                    .unwrap(),
4175            )
4176            .unwrap();
4177        no_hint.set_source(payload.as_slice());
4178        no_hint.set_drain(&mut no_hint_output);
4179        no_hint.compress();
4180
4181        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4182            .expect("encoded frame should have a header")
4183            .0
4184            .window_size()
4185            .expect("window size should be present");
4186        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4187            .expect("encoded frame should have a header")
4188            .0
4189            .window_size()
4190            .expect("window size should be present");
4191        assert!(
4192            hinted_window <= no_hint_window,
4193            "source-size hint should not increase advertised window with dictionary priming",
4194        );
4195
4196        let mut decoder = FrameDecoder::new();
4197        decoder.add_dict(dict_for_decoder).unwrap();
4198        let mut decoded = Vec::with_capacity(payload.len());
4199        decoder
4200            .decode_all_to_vec(&hinted_output, &mut decoded)
4201            .unwrap();
4202        assert_eq!(decoded, payload);
4203    }
4204
4205    /// A dictionary segment embedded ONCE in otherwise-incompressible
4206    /// input must be matched against the dictionary. Before the fix the
4207    /// raw-fast-path (which skips matching) fired on the
4208    /// incompressible-looking block and the dictionary was never searched,
4209    /// so `with_dict` came out the same size as `no_dict` (the embedded
4210    /// match was lost). Now the block compresses against the dict.
4211    #[test]
4212    fn dictionary_segment_in_incompressible_input_is_matched() {
4213        // Deterministic LCG bytes: high-entropy, so the only compressible
4214        // content is the embedded dictionary segment.
4215        fn lcg(seed: u64, n: usize) -> alloc::vec::Vec<u8> {
4216            let mut s = seed;
4217            (0..n)
4218                .map(|_| {
4219                    s = s
4220                        .wrapping_mul(6364136223846793005)
4221                        .wrapping_add(1442695040888963407);
4222                    (s >> 56) as u8
4223                })
4224                .collect()
4225        }
4226        let dict_id = 0x00DC_7777;
4227        let r = lcg(1, 512); // the dictionary content
4228        let mut payload = lcg(2, 2000); // incompressible filler before
4229        payload.extend_from_slice(&r); // the single dict-matchable segment
4230        payload.extend_from_slice(&lcg(3, 1500)); // filler after
4231
4232        // Precondition: the payload must actually look incompressible so
4233        // that the raw-fast-path WOULD fire (and skip matching) without
4234        // the fix. If the heuristic ever changes and this no longer holds,
4235        // the test below would pass vacuously — assert it up front.
4236        assert!(
4237            crate::encoding::incompressible::block_looks_incompressible(&payload),
4238            "test payload must look incompressible to exercise the raw-fast-path",
4239        );
4240
4241        let compress =
4242            |level: super::CompressionLevel, dict: Option<&[u8]>| -> alloc::vec::Vec<u8> {
4243                let mut out = alloc::vec::Vec::new();
4244                let mut c = FrameCompressor::new(level);
4245                if let Some(d) = dict {
4246                    c.set_dictionary(
4247                        crate::decoding::Dictionary::from_raw_content(dict_id, d.to_vec()).unwrap(),
4248                    )
4249                    .unwrap();
4250                }
4251                c.set_source(payload.as_slice());
4252                c.set_drain(&mut out);
4253                c.compress();
4254                out
4255            };
4256
4257        for lvl in [
4258            super::CompressionLevel::Level(2),
4259            super::CompressionLevel::Level(6),
4260            super::CompressionLevel::Level(19),
4261        ] {
4262            let with_dict = compress(lvl, Some(&r));
4263            let no_dict = compress(lvl, None);
4264            // The 512-byte dict segment should be matched, saving most of
4265            // its length (generous slack for sequence/header coding).
4266            assert!(
4267                with_dict.len() + 300 < no_dict.len(),
4268                "{lvl:?}: dict segment not matched (with_dict={}, no_dict={})",
4269                with_dict.len(),
4270                no_dict.len(),
4271            );
4272            // The dict-compressed frame must round-trip through the decoder.
4273            let mut decoder = FrameDecoder::new();
4274            decoder
4275                .add_dict(
4276                    crate::decoding::Dictionary::from_raw_content(dict_id, r.clone()).unwrap(),
4277                )
4278                .unwrap();
4279            let mut decoded = Vec::with_capacity(payload.len());
4280            decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
4281            assert_eq!(decoded, payload, "{lvl:?}: dict round-trip mismatch");
4282
4283            // A dictionary that does NOT appear in the input must not make
4284            // the output larger than the no-dict (raw) encoding: the
4285            // post-compress raw fallback covers incompressible-with-dict.
4286            let unrelated = lcg(99, 512);
4287            let with_bad_dict = compress(lvl, Some(&unrelated));
4288            assert!(
4289                with_bad_dict.len() <= no_dict.len() + 16,
4290                "{lvl:?}: unhelpful dict expanded output (with={}, no_dict={})",
4291                with_bad_dict.len(),
4292                no_dict.len(),
4293            );
4294        }
4295    }
4296
4297    #[test]
4298    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
4299        let dict_id = 0xABCD_0005;
4300        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
4301        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
4302        let dict_for_decoder =
4303            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
4304        let payload = b"abcd".repeat(1024); // 4 KiB payload
4305        let payload_len = payload.len() as u64;
4306
4307        let mut hinted_output = Vec::new();
4308        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
4309        hinted.set_dictionary(dict).unwrap();
4310        hinted.set_source_size_hint(payload_len);
4311        hinted.set_source(payload.as_slice());
4312        hinted.set_drain(&mut hinted_output);
4313        hinted.compress();
4314
4315        let mut no_hint_output = Vec::new();
4316        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
4317        no_hint
4318            .set_dictionary(
4319                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
4320                    .unwrap(),
4321            )
4322            .unwrap();
4323        no_hint.set_source(payload.as_slice());
4324        no_hint.set_drain(&mut no_hint_output);
4325        no_hint.compress();
4326
4327        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4328            .expect("encoded frame should have a header")
4329            .0
4330            .window_size()
4331            .expect("window size should be present");
4332        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4333            .expect("encoded frame should have a header")
4334            .0
4335            .window_size()
4336            .expect("window size should be present");
4337        assert!(
4338            hinted_window <= no_hint_window,
4339            "source-size hint should not increase advertised window with dictionary priming",
4340        );
4341
4342        let mut decoder = FrameDecoder::new();
4343        decoder.add_dict(dict_for_decoder).unwrap();
4344        let mut decoded = Vec::with_capacity(payload.len());
4345        decoder
4346            .decode_all_to_vec(&hinted_output, &mut decoded)
4347            .unwrap();
4348        assert_eq!(decoded, payload);
4349    }
4350
4351    #[test]
4352    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
4353        let dict_id = 0xABCD_0003;
4354        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4355            .expect("raw dictionary should be valid");
4356        let payload = b"abcdefghabcdefgh";
4357
4358        let mut output = Vec::new();
4359        let matcher = NoDictionaryMatcher::new(64);
4360        let mut compressor =
4361            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4362        compressor
4363            .set_dictionary(dict)
4364            .expect("dictionary should attach");
4365        compressor.set_source(payload.as_slice());
4366        compressor.set_drain(&mut output);
4367        compressor.compress();
4368
4369        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4370            .expect("encoded frame should have a header");
4371        assert_eq!(
4372            frame_header.dictionary_id(),
4373            None,
4374            "matchers that do not support dictionary priming must not advertise dictionary dependency"
4375        );
4376
4377        let mut decoder = FrameDecoder::new();
4378        let mut decoded = Vec::with_capacity(payload.len());
4379        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4380        assert_eq!(decoded, payload);
4381    }
4382
4383    #[cfg(feature = "hash")]
4384    #[test]
4385    fn checksum_two_frames_reused_compressor() {
4386        // Compress the same data twice using the same compressor and verify that:
4387        // 1. The checksum written in each frame matches what the decoder calculates.
4388        // 2. The hasher is correctly reset between frames (no cross-contamination).
4389        //    If the hasher were NOT reset, the second frame's calculated checksum
4390        //    would differ from the one stored in the frame data, causing assert_eq to fail.
4391        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
4392
4393        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
4394
4395        // --- Frame 1 ---
4396        let mut compressed1 = Vec::new();
4397        compressor.set_source(data.as_slice());
4398        compressor.set_drain(&mut compressed1);
4399        compressor.compress();
4400
4401        // --- Frame 2 (reuse the same compressor) ---
4402        let mut compressed2 = Vec::new();
4403        compressor.set_source(data.as_slice());
4404        compressor.set_drain(&mut compressed2);
4405        compressor.compress();
4406
4407        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
4408            let mut decoder = FrameDecoder::new();
4409            let mut source = compressed;
4410            decoder.reset(&mut source).unwrap();
4411            while !decoder.is_finished() {
4412                decoder
4413                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4414                    .unwrap();
4415            }
4416            let mut decoded = Vec::new();
4417            decoder.collect_to_writer(&mut decoded).unwrap();
4418            (
4419                decoded,
4420                decoder.get_checksum_from_data(),
4421                decoder.get_calculated_checksum(),
4422            )
4423        }
4424
4425        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
4426        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
4427        assert_eq!(
4428            chksum_from_data1, chksum_calculated1,
4429            "frame 1: checksum mismatch"
4430        );
4431
4432        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
4433        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
4434        assert_eq!(
4435            chksum_from_data2, chksum_calculated2,
4436            "frame 2: checksum mismatch"
4437        );
4438
4439        // Same data compressed twice must produce the same checksum.
4440        // If state leaked across frames, the second calculated checksum would differ.
4441        assert_eq!(
4442            chksum_from_data1, chksum_from_data2,
4443            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
4444        );
4445    }
4446
4447    #[cfg(feature = "lsm")]
4448    #[test]
4449    fn frame_emit_info_decompressed_ranges_match_decoded_output() {
4450        // Part A correctness: the per-block `decompressed_size` captured during
4451        // encode (and the `decompressed_byte_range` prefix sum derived from it)
4452        // must describe the real decoded output exactly — one entry per
4453        // physical block, contiguous, summing to the full decompressed length.
4454        // A multi-block compressible payload exercises the Compressed-block
4455        // path (whose regenerated size is NOT on the wire, so it relies on the
4456        // encode-side capture this test guards).
4457        let data = emit_info_fixture_data();
4458
4459        // Cover both the single-block-per-chunk path (Default) and the
4460        // Level(16..=22) post-split path (multiple physical partitions per
4461        // input chunk), since lsm-tree compresses at zstd:22 and post-split
4462        // is the riskiest capture site (per-partition `src_size`).
4463        for level in [
4464            super::CompressionLevel::Default,
4465            super::CompressionLevel::Level(22),
4466        ] {
4467            let mut compressed = Vec::new();
4468            let mut compressor = FrameCompressor::new(level);
4469            // Pledge the source size so the high-level (22) window shrinks to
4470            // fit the payload, keeping the frame compact (no oversized window
4471            // descriptor for a small input). Still >= 128 KiB, so post-split
4472            // eligibility is preserved.
4473            compressor.set_source_size_hint(data.len() as u64);
4474            compressor.set_source(data.as_slice());
4475            compressor.set_drain(&mut compressed);
4476            compressor.compress();
4477
4478            let info = compressor
4479                .last_frame_emit_info()
4480                .expect("emit info populated after compress")
4481                .clone();
4482
4483            // Reference: full decode of the same frame.
4484            let mut decoder = FrameDecoder::new();
4485            let mut source = compressed.as_slice();
4486            decoder.reset(&mut source).unwrap();
4487            while !decoder.is_finished() {
4488                decoder
4489                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4490                    .unwrap();
4491            }
4492            let mut decoded = Vec::new();
4493            decoder.collect_to_writer(&mut decoded).unwrap();
4494            assert_eq!(decoded, data, "sanity: frame must round-trip ({level:?})");
4495
4496            assert!(
4497                info.blocks.len() >= 2,
4498                "fixture must span multiple blocks to exercise the mapping ({level:?}, got {})",
4499                info.blocks.len()
4500            );
4501            assert!(
4502                info.blocks.last().unwrap().last_block,
4503                "final block must carry last_block ({level:?})"
4504            );
4505
4506            // Pin the Level(22) post-split path: the owned loop feeds the
4507            // encoder MAX_BLOCK_SIZE input chunks, so without post-split the
4508            // block count cannot exceed the chunk count. More blocks than
4509            // chunks proves at least one chunk was split into multiple physical
4510            // partitions (the per-partition `src_size` capture under test).
4511            if matches!(level, super::CompressionLevel::Level(22)) {
4512                let max_block = crate::common::MAX_BLOCK_SIZE as usize;
4513                let n_chunks = data.len().div_ceil(max_block);
4514                assert!(
4515                    info.blocks.len() > n_chunks,
4516                    "Level(22) must exercise post-split: {} blocks for {} input chunks",
4517                    info.blocks.len(),
4518                    n_chunks
4519                );
4520            }
4521
4522            // Per-block ranges: contiguous, zero-based, summing to the full output.
4523            let mut expected_start = 0u64;
4524            for i in 0..info.blocks.len() {
4525                let range = info
4526                    .decompressed_byte_range(i)
4527                    .expect("in-bounds block has a range");
4528                assert_eq!(
4529                    range.start, expected_start,
4530                    "block {i} range must start where the previous ended ({level:?})"
4531                );
4532                assert_eq!(
4533                    u64::from(info.blocks[i].decompressed_size),
4534                    range.end - range.start,
4535                    "block {i} decompressed_size must equal its range width ({level:?})"
4536                );
4537                // Validate the mapping against REAL per-block bytes, not just
4538                // prefix-sum consistency: decode block `i` alone and require it
4539                // to equal the corresponding slice of the full decode. A
4540                // sidecar that swapped sizes between adjacent blocks (same sum,
4541                // same contiguity) would fail here.
4542                let mut psrc = compressed.as_slice();
4543                let mut pdec = FrameDecoder::new();
4544                pdec.reset(&mut psrc).unwrap();
4545                let pd = pdec
4546                    .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4547                    .unwrap();
4548                assert!(
4549                    pd.stopped_at.is_none(),
4550                    "block {i} must decode cleanly ({level:?})"
4551                );
4552                assert_eq!(
4553                    pd.data.as_slice(),
4554                    &decoded[range.start as usize..range.end as usize],
4555                    "block {i} partial-decode bytes must equal the full-decode slice ({level:?})"
4556                );
4557                expected_start = range.end;
4558            }
4559            assert_eq!(
4560                expected_start,
4561                decoded.len() as u64,
4562                "block decompressed sizes must sum to the full decoded length ({level:?})"
4563            );
4564            assert_eq!(
4565                info.decompressed_byte_range(info.blocks.len()),
4566                None,
4567                "out-of-range index yields None ({level:?})"
4568            );
4569        }
4570    }
4571
4572    /// ~400 KiB semi-repetitive payload (long runs interleaved with a stride
4573    /// phrase) that compresses into several multi-block frames across levels.
4574    #[cfg(feature = "lsm")]
4575    fn emit_info_fixture_data() -> Vec<u8> {
4576        let mut data: Vec<u8> = Vec::with_capacity(400 * 1024);
4577        let mut x = 0x9E37_79B9u32;
4578        while data.len() < 400 * 1024 {
4579            x ^= x << 13;
4580            x ^= x >> 17;
4581            x ^= x << 5;
4582            let run = 16 + (x as usize % 48);
4583            let byte = (x >> 24) as u8;
4584            for _ in 0..run {
4585                data.push(byte);
4586            }
4587            data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4588        }
4589        data
4590    }
4591
4592    #[cfg(feature = "lsm")]
4593    #[test]
4594    fn frame_emit_info_decompressed_ranges_match_on_borrowed_oneshot_path() {
4595        // The borrowed one-shot path (`compress_independent_frame` ->
4596        // `run_borrowed_block_loop` -> `compress_block_encoded_borrowed`)
4597        // threads the decompressed-size sidecar through a DIFFERENT emit site
4598        // than the owned/streaming loop, so it needs its own per-block mapping
4599        // check. A Fast level keeps the encoder on the borrowed-eligible
4600        // (Simple matcher) path.
4601        let data = emit_info_fixture_data();
4602
4603        let mut compressor: FrameCompressor =
4604            FrameCompressor::new(super::CompressionLevel::Fastest);
4605        let compressed = compressor.compress_independent_frame(data.as_slice());
4606        let info = compressor
4607            .last_frame_emit_info()
4608            .expect("emit info populated after compress_independent_frame")
4609            .clone();
4610        // Pin the compressed-block path: without this the fixture could regress
4611        // into the raw-fast fallback and still pass via the Raw wire-size
4612        // fallback in populate_frame_emit_info, never exercising the borrowed
4613        // compressed-block sidecar capture this test targets.
4614        assert!(
4615            info.blocks
4616                .iter()
4617                .any(|b| matches!(b.block_type, crate::blocks::block::BlockType::Compressed)),
4618            "borrowed-path fixture must emit at least one compressed block"
4619        );
4620        assert!(
4621            info.blocks.len() >= 2,
4622            "borrowed fixture must span multiple blocks (got {})",
4623            info.blocks.len()
4624        );
4625        assert!(info.blocks.last().unwrap().last_block);
4626
4627        // Full decode reference.
4628        let mut decoder = FrameDecoder::new();
4629        let mut source = compressed.as_slice();
4630        decoder.reset(&mut source).unwrap();
4631        while !decoder.is_finished() {
4632            decoder
4633                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4634                .unwrap();
4635        }
4636        let mut decoded = Vec::new();
4637        decoder.collect_to_writer(&mut decoded).unwrap();
4638        assert_eq!(decoded, data, "borrowed one-shot frame must round-trip");
4639
4640        // Each block's mapping must match real per-block bytes.
4641        let mut expected_start = 0u64;
4642        for i in 0..info.blocks.len() {
4643            let range = info.decompressed_byte_range(i).unwrap();
4644            assert_eq!(range.start, expected_start, "block {i} range contiguity");
4645            let mut psrc = compressed.as_slice();
4646            let mut pdec = FrameDecoder::new();
4647            pdec.reset(&mut psrc).unwrap();
4648            let pd = pdec
4649                .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4650                .unwrap();
4651            assert!(pd.stopped_at.is_none(), "block {i} must decode cleanly");
4652            assert_eq!(
4653                pd.data.as_slice(),
4654                &decoded[range.start as usize..range.end as usize],
4655                "borrowed block {i} partial-decode bytes must equal the full-decode slice"
4656            );
4657            expected_start = range.end;
4658        }
4659        assert_eq!(
4660            expected_start,
4661            decoded.len() as u64,
4662            "ranges sum to full length"
4663        );
4664    }
4665
4666    #[cfg(feature = "std")]
4667    #[test]
4668    fn fuzz_targets() {
4669        use std::io::Read;
4670        fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
4671            let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
4672            let mut result: Vec<u8> = Vec::new();
4673            decoder.read_to_end(&mut result).expect("Decoding failed");
4674            result
4675        }
4676
4677        fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
4678            let mut decoder = crate::decoding::FrameDecoder::new();
4679            decoder.reset(&mut data).unwrap();
4680            let mut result = vec![];
4681            while !decoder.is_finished() || decoder.can_collect() > 0 {
4682                decoder
4683                    .decode_blocks(
4684                        &mut data,
4685                        crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
4686                    )
4687                    .unwrap();
4688                decoder.collect_to_writer(&mut result).unwrap();
4689            }
4690            result
4691        }
4692
4693        fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
4694            zstd::stream::encode_all(std::io::Cursor::new(data), 3)
4695        }
4696
4697        fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
4698            let mut input = Vec::new();
4699            data.read_to_end(&mut input).unwrap();
4700
4701            crate::encoding::compress_to_vec(
4702                input.as_slice(),
4703                crate::encoding::CompressionLevel::Uncompressed,
4704            )
4705        }
4706
4707        fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
4708            let mut input = Vec::new();
4709            data.read_to_end(&mut input).unwrap();
4710
4711            crate::encoding::compress_to_vec(
4712                input.as_slice(),
4713                crate::encoding::CompressionLevel::Fastest,
4714            )
4715        }
4716
4717        fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
4718            let mut output = Vec::new();
4719            zstd::stream::copy_decode(data, &mut output)?;
4720            Ok(output)
4721        }
4722        if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
4723            for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
4724                if file.as_ref().unwrap().file_type().unwrap().is_file() {
4725                    let data = std::fs::read(file.unwrap().path()).unwrap();
4726                    let data = data.as_slice();
4727                    // Decoding
4728                    let compressed = encode_zstd(data).unwrap();
4729                    let decoded = decode_szstd(&mut compressed.as_slice());
4730                    let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
4731                    assert!(
4732                        decoded == data,
4733                        "Decoded data did not match the original input during decompression"
4734                    );
4735                    assert_eq!(
4736                        decoded2, data,
4737                        "Decoded data did not match the original input during decompression"
4738                    );
4739
4740                    // Encoding
4741                    // Uncompressed encoding
4742                    let mut input = data;
4743                    let compressed = encode_szstd_uncompressed(&mut input);
4744                    let decoded = decode_zstd(&compressed).unwrap();
4745                    assert_eq!(
4746                        decoded, data,
4747                        "Decoded data did not match the original input during compression"
4748                    );
4749                    // Compressed encoding
4750                    let mut input = data;
4751                    let compressed = encode_szstd_compressed(&mut input);
4752                    let decoded = decode_zstd(&compressed).unwrap();
4753                    assert_eq!(
4754                        decoded, data,
4755                        "Decoded data did not match the original input during compression"
4756                    );
4757                }
4758            }
4759        }
4760    }
4761
4762    /// Homogeneous input — every byte the same — must NOT be split:
4763    /// both border histograms are identical (all 512 hits on a single
4764    /// slot), so `presplit_fingerprints_differ` returns `false` and the
4765    /// function takes the early-return path at
4766    /// `zstd_preSplit.c:214` returning `blockSize`.
4767    #[test]
4768    fn split_block_from_borders_keeps_homogeneous_block() {
4769        let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
4770        let split = super::split_block_from_borders(&block);
4771        assert_eq!(split, MAX_BLOCK_SIZE as usize);
4772    }
4773
4774    /// Heterogeneous input — first half all zeros, second half a
4775    /// counter sequence — has clearly distinguishable border
4776    /// histograms, so the borders heuristic decides to split.
4777    ///
4778    /// The transition sits at exactly the block midpoint, so the
4779    /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
4780    /// zeros + half counter values. That makes it roughly
4781    /// equidistant from both border fingerprints — the
4782    /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
4783    /// branch fires and the heuristic returns the midpoint (64 KiB)
4784    /// per `zstd_preSplit.c:222`. The test asserts the exact value
4785    /// rather than just "one of {32K, 64K, 96K}" so a regression
4786    /// to a different quantised arm cannot silently slip through.
4787    #[test]
4788    fn split_block_from_borders_returns_midpoint_for_centred_transition() {
4789        let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
4790        for (i, byte) in block
4791            .iter_mut()
4792            .enumerate()
4793            .skip(MAX_BLOCK_SIZE as usize / 2)
4794        {
4795            *byte = (i % 251 + 1) as u8;
4796        }
4797        let split = super::split_block_from_borders(&block);
4798        assert_eq!(
4799            split,
4800            64 * 1024,
4801            "centred-transition fixture must take the symmetric \
4802             midpoint arm (`abs_diff < min_distance`), got {split}"
4803        );
4804    }
4805
4806    /// `level_pre_split` resolves the per-level split knob through the
4807    /// `LevelParams` table, mirroring the donor `splitLevels[]` by strategy
4808    /// (`ZSTD_optimalBlockSize`): fast → 0 (from-borders), dfast → 1,
4809    /// greedy/lazy → 2, lazy2/btlazy2 (Lazy tag at depth 2) → 3,
4810    /// btopt/btultra/btultra2 → 4. `Uncompressed` has no numeric level so it
4811    /// stays `None`.
4812    #[test]
4813    fn pre_split_level_dispatches_by_compression_level() {
4814        use crate::encoding::CompressionLevel;
4815        use crate::encoding::match_generator::level_pre_split;
4816        assert_eq!(level_pre_split(CompressionLevel::Uncompressed), None);
4817        // Fastest = level 1 (fast) → 0 (from-borders).
4818        assert_eq!(level_pre_split(CompressionLevel::Fastest), Some(0));
4819        // Default = level 3 (dfast) → 1.
4820        assert_eq!(level_pre_split(CompressionLevel::Default), Some(1));
4821        // Better is a pure alias for level 7 (lazy): same as Level(7).
4822        assert_eq!(
4823            level_pre_split(CompressionLevel::Better),
4824            level_pre_split(CompressionLevel::Level(7)),
4825        );
4826        // Best resolves to the level-13 table row (btlazy2): pin it to that
4827        // numeric route so the named path can't drift from the pre-split
4828        // table.
4829        assert_eq!(
4830            level_pre_split(CompressionLevel::Best),
4831            level_pre_split(CompressionLevel::Level(13)),
4832        );
4833        assert_eq!(level_pre_split(CompressionLevel::Level(2)), Some(0)); // fast
4834        assert_eq!(level_pre_split(CompressionLevel::Level(4)), Some(1)); // dfast
4835        assert_eq!(level_pre_split(CompressionLevel::Level(5)), Some(2)); // greedy
4836        assert_eq!(level_pre_split(CompressionLevel::Level(7)), Some(2)); // lazy (depth 1)
4837        // lazy2 / btlazy2 use the rate-1 full-scan splitter (4), not the
4838        // rate-5 sampler (3): the sampler phantom-splits homogeneous periodic
4839        // input (see `pre_split` comment + `periodic_stream_not_oversplit`).
4840        assert_eq!(level_pre_split(CompressionLevel::Level(8)), Some(4)); // lazy2 lower bound
4841        assert_eq!(level_pre_split(CompressionLevel::Level(11)), Some(4)); // lazy2 (depth 2)
4842        assert_eq!(level_pre_split(CompressionLevel::Level(12)), Some(4)); // lazy2 upper bound
4843        assert_eq!(level_pre_split(CompressionLevel::Level(13)), Some(4)); // btlazy2 lower bound
4844        assert_eq!(level_pre_split(CompressionLevel::Level(15)), Some(4)); // btlazy2 (depth 2)
4845        assert_eq!(level_pre_split(CompressionLevel::Level(16)), Some(4)); // btopt
4846        assert_eq!(level_pre_split(CompressionLevel::Level(22)), Some(4)); // btultra2
4847    }
4848
4849    /// Regression: a homogeneous but periodic multi-block stream must not be
4850    /// pre-split into tiny blocks at the lazy2 / btlazy2 levels. The rate-5
4851    /// chunk sampler used to phantom-split such input at every 8 KB chunk,
4852    /// cascading a large stream into hundreds of tiny blocks whose per-block
4853    /// headers ballooned the output (~5x vs the lazy level next door). With
4854    /// the rate-1 full-scan splitter the periodic stream is seen as uniform
4855    /// and stays a few full blocks. We assert the lazy2 (L8) and btlazy2 (L15)
4856    /// outputs stay within 2x of the lazy (L7) output on the same input, and
4857    /// that every output round-trips.
4858    #[test]
4859    fn periodic_stream_not_oversplit() {
4860        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4861        const LINES: &[&str] = &[
4862            "ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo table=orders region=eu-west\n",
4863            "ts=2026-03-26T21:39:29Z level=INFO msg=\"rotate segment\" tenant=demo table=orders region=eu-west\n",
4864            "ts=2026-03-26T21:39:30Z level=INFO msg=\"compact level\" tenant=demo table=orders region=eu-west\n",
4865            "ts=2026-03-26T21:39:31Z level=INFO msg=\"write block\" tenant=demo table=orders region=eu-west\n",
4866        ];
4867        // 512 KB = 4 donor blocks, enough for the cascade to manifest.
4868        let target = 512 * 1024usize;
4869        let mut data = Vec::with_capacity(target);
4870        let mut i = 0;
4871        while data.len() < target {
4872            let line = LINES[i % LINES.len()].as_bytes();
4873            let take = line.len().min(target - data.len());
4874            data.extend_from_slice(&line[..take]);
4875            i += 1;
4876        }
4877        let l7 = compress_slice_to_vec(&data, CompressionLevel::Level(7)); // lazy depth1
4878        let l8 = compress_slice_to_vec(&data, CompressionLevel::Level(8)); // lazy2
4879        let l15 = compress_slice_to_vec(&data, CompressionLevel::Level(15)); // btlazy2
4880        assert!(
4881            l8.len() < l7.len() * 2,
4882            "lazy2 over-split periodic stream: l7={} l8={}",
4883            l7.len(),
4884            l8.len()
4885        );
4886        assert!(
4887            l15.len() < l7.len() * 2,
4888            "btlazy2 over-split periodic stream: l7={} l15={}",
4889            l7.len(),
4890            l15.len()
4891        );
4892        for out in [&l7, &l8, &l15] {
4893            let mut decoder = FrameDecoder::new();
4894            let mut round = Vec::with_capacity(data.len());
4895            decoder
4896                .decode_all_to_vec(out, &mut round)
4897                .expect("decode periodic stream");
4898            assert_eq!(round, data, "periodic stream roundtrip mismatch");
4899        }
4900    }
4901
4902    /// End-to-end: a 256 KB payload whose SECOND 128 KB donor block carries
4903    /// an intra-block fingerprint transition, compressed at Level(5)
4904    /// (greedy, the pre-split path this revision routes through the cheap
4905    /// chunk splitter), round-trips through the crate's own decoder.
4906    ///
4907    /// The transition lives in the second block on purpose: the donor
4908    /// `savings < 3` gate skips splitting the first block (savings start at
4909    /// 0), so the first block is a homogeneous compressible run that banks
4910    /// savings, and the second block is the one whose intra-block transition
4911    /// `split_block_by_chunks()` resolves into a sub-block boundary (the
4912    /// `pending_input.split_off(...)` path). The test asserts that split
4913    /// decision directly so it cannot silently stop exercising the path if
4914    /// the fixture or params drift, then proves the emitted split frame
4915    /// round-trips. Level 13 (lazy) no longer pre-splits, hence Level 5.
4916    #[test]
4917    fn greedy_chunk_split_roundtrips_through_own_decoder() {
4918        use crate::encoding::CompressionLevel;
4919        let mut data = vec![0u8; 256 * 1024];
4920        // First 128 KB: homogeneous low-entropy run (compressible, banks
4921        // the savings the donor gate needs). Second 128 KB: low-entropy run
4922        // for its first half, then a counter sequence: a clear intra-block
4923        // fingerprint transition at the 192 KB midpoint for the chunk
4924        // splitter to find.
4925        for (i, byte) in data.iter_mut().enumerate() {
4926            *byte = if i < 192 * 1024 {
4927                (i & 0x07) as u8
4928            } else {
4929                (i % 251 + 1) as u8
4930            };
4931        }
4932
4933        // Directly assert the chunk splitter resolves the second block's
4934        // intra-block transition into a sub-block boundary once savings have
4935        // accrued (the compressible first block banks well over the gate).
4936        let second_block = &data[128 * 1024..];
4937        let split = super::optimal_block_size(
4938            CompressionLevel::Level(5),
4939            second_block,
4940            second_block.len(),
4941            MAX_BLOCK_SIZE as usize,
4942            100,
4943        );
4944        assert!(
4945            split < MAX_BLOCK_SIZE as usize,
4946            "second donor block must chunk-split at its intra-block transition, got {split}",
4947        );
4948
4949        let mut compressed = Vec::new();
4950        let mut compressor = FrameCompressor::new(CompressionLevel::Level(5));
4951        compressor.set_source(data.as_slice());
4952        compressor.set_drain(&mut compressed);
4953        compressor.compress();
4954
4955        let mut decoder = FrameDecoder::new();
4956        let mut source = compressed.as_slice();
4957        decoder
4958            .reset(&mut source)
4959            .expect("frame header should parse");
4960        while !decoder.is_finished() {
4961            decoder
4962                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4963                .expect("decode should succeed");
4964        }
4965        let mut decoded = Vec::with_capacity(data.len());
4966        decoder.collect_to_writer(&mut decoded).unwrap();
4967        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4968    }
4969
4970    /// Outside-diff coverage for the FAST one-shot path.
4971    /// `compress_slice_to_vec` / `compress_independent_frame` on a Fast level
4972    /// routes through `run_borrowed_block_loop` (not the owned loop the test
4973    /// above covers), which must honour `optimal_block_size` and emit a
4974    /// sub-`MAX_BLOCK_SIZE` boundary rather than fixed 128 KiB blocks. A
4975    /// 256 KiB input is two 128 KiB blocks when unsplit; a chunk boundary in
4976    /// the second block yields >= 3 decoded blocks, asserted on the round-trip.
4977    #[test]
4978    fn fast_oneshot_borrowed_split_emits_subblock() {
4979        use crate::encoding::CompressionLevel;
4980        // First 192 KiB: homogeneous zero run (banks the savings the split
4981        // gate needs). The second 128 KiB block flips to a counter sequence
4982        // at its 64 KiB midpoint (the 192 KiB mark) — a fingerprint
4983        // transition the Fast from-borders splitter (split level 0) resolves
4984        // into a sub-block boundary.
4985        let mut data = vec![0u8; 256 * 1024];
4986        for (i, byte) in data.iter_mut().enumerate() {
4987            if i >= 192 * 1024 {
4988                *byte = (i % 251 + 1) as u8;
4989            }
4990        }
4991
4992        // Pin the splitter decision for the Fast path directly (mirrors the
4993        // greedy test): the second donor block must resolve to a sub-block
4994        // boundary, so the >= 3 block count below cannot pass vacuously.
4995        let second_block = &data[128 * 1024..];
4996        assert!(
4997            super::optimal_block_size(
4998                CompressionLevel::Fastest,
4999                second_block,
5000                second_block.len(),
5001                MAX_BLOCK_SIZE as usize,
5002                100,
5003            ) < MAX_BLOCK_SIZE as usize,
5004            "fixture must resolve to a sub-block split in the second donor block",
5005        );
5006
5007        // Drive the borrowed one-shot route explicitly (Fast level ->
5008        // run_borrowed_block_loop via compress_independent_frame).
5009        let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5010        let frame = compressor.compress_independent_frame(&data);
5011
5012        let mut decoder = FrameDecoder::new();
5013        let mut source = frame.as_slice();
5014        decoder
5015            .reset(&mut source)
5016            .expect("frame header should parse");
5017        while !decoder.is_finished() {
5018            decoder
5019                .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
5020                .expect("decode should succeed");
5021        }
5022        let mut decoded = Vec::with_capacity(data.len());
5023        decoder.collect_to_writer(&mut decoded).unwrap();
5024        assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
5025        assert!(
5026            decoder.blocks_decoded() >= 3,
5027            "fast one-shot borrowed path must split the second donor block \
5028             (256 KiB unsplit = 2 blocks), got {} blocks",
5029            decoder.blocks_decoded(),
5030        );
5031    }
5032
5033    /// Regression: `set_compression_level` followed by `compress()` must
5034    /// refresh `state.strategy_tag` through the reset-time sync so the
5035    /// literal-compression gates (`min_literals_to_compress`,
5036    /// `min_gain`) use the NEW level's strategy. Picks a level pair
5037    /// that genuinely crosses strategy bands — `Fastest` resolves to
5038    /// `Fast`, `Level(20)` resolves to `BtUltra2` — so a missed sync
5039    /// would leave the construction-time tag visible and trip the
5040    /// assertion. `CompressionLevel::Best` would also pass type-wise
5041    /// but resolves to `Lazy` today, which keeps `min_literals_to_compress`
5042    /// in the same `shift=3 → 64-byte` band as `Fast` and weakens the
5043    /// signal that the gate floor actually moved.
5044    #[cfg(feature = "std")]
5045    #[test]
5046    fn set_compression_level_then_compress_refreshes_strategy_tag() {
5047        use super::CompressionLevel;
5048        use crate::encoding::strategy::StrategyTag;
5049
5050        let data = vec![0xABu8; 256];
5051        let mut out = Vec::new();
5052        let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
5053        let initial_tag = compressor.state.strategy_tag;
5054        assert_eq!(
5055            initial_tag,
5056            StrategyTag::for_compression_level(CompressionLevel::Fastest),
5057            "construction-time strategy_tag must reflect initial level",
5058        );
5059
5060        // Switch to a level whose resolved strategy lives in a different
5061        // band, then run a full compress cycle — the matcher.reset()
5062        // inside `compress` is the only site that can refresh the tag.
5063        let new_level = CompressionLevel::Level(20);
5064        compressor.set_compression_level(new_level);
5065        compressor.set_source(data.as_slice());
5066        compressor.set_drain(&mut out);
5067        compressor.compress();
5068
5069        let new_tag = compressor.state.strategy_tag;
5070        let expected = StrategyTag::for_compression_level(new_level);
5071        assert_eq!(
5072            new_tag, expected,
5073            "strategy_tag must follow set_compression_level → compress, \
5074             got {new_tag:?} expected {expected:?}",
5075        );
5076        assert_eq!(
5077            expected,
5078            StrategyTag::BtUltra2,
5079            "test fixture invariant: Level(20) must resolve to BtUltra2 \
5080             so the post-switch tag visibly crosses the band boundary",
5081        );
5082        assert_ne!(
5083            new_tag, initial_tag,
5084            "test fixture invariant: chosen levels must resolve to \
5085             different StrategyTag variants",
5086        );
5087    }
5088
5089    /// Magicless mode (`ZSTD_f_zstd1_magicless`): encoded frame
5090    /// MUST NOT start with the 4-byte magic prefix, AND must
5091    /// round-trip through a magicless-aware decoder.
5092    #[test]
5093    fn magicless_frame_omits_magic_and_roundtrips() {
5094        use crate::common::MAGIC_NUM;
5095        let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
5096
5097        // Encode with magicless = true.
5098        let mut output: Vec<u8> = Vec::new();
5099        let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
5100        compressor.set_magicless(true);
5101        compressor.set_source(input.as_slice());
5102        compressor.set_drain(&mut output);
5103        compressor.compress();
5104
5105        // 1. Encoded output must NOT begin with the zstd magic number.
5106        assert!(
5107            !output.starts_with(&MAGIC_NUM.to_le_bytes()),
5108            "magicless frame must omit the 4-byte magic prefix",
5109        );
5110
5111        // 2. A magicless-aware decoder must round-trip the payload.
5112        let mut decoder = crate::decoding::FrameDecoder::new();
5113        decoder.set_magicless(true);
5114        let mut cursor: &[u8] = output.as_slice();
5115        decoder.init(&mut cursor).expect("magicless init");
5116        decoder
5117            .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
5118            .expect("decode_blocks");
5119        let mut decoded: Vec<u8> = Vec::new();
5120        decoder
5121            .collect_to_writer(&mut decoded)
5122            .expect("collect_to_writer");
5123        assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
5124
5125        // 3. A standard (magicful) decoder MUST reject a magicless
5126        //    frame at the header-read step — the first 4 bytes are
5127        //    the frame-header descriptor + window / dictionary / FCS
5128        //    metadata, not the magic. We accept either
5129        //    `BadMagicNumber` (typical case: first 4 bytes don't
5130        //    match `MAGIC_NUM` and don't fall in the skippable-frame
5131        //    magic range) or `SkipFrame` (rare: the first 4 bytes
5132        //    coincidentally land in `0x184D2A50..=0x184D2A5F`). Both
5133        //    prove the standard decoder did not treat the bytes as a
5134        //    real magicful frame.
5135        use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
5136        let mut std_decoder = crate::decoding::FrameDecoder::new();
5137        let std_init = std_decoder.init(output.as_slice());
5138        match std_init {
5139            Err(FrameDecoderError::ReadFrameHeaderError(
5140                ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
5141            )) => {}
5142            other => panic!(
5143                "standard decoder must reject a magicless frame with \
5144                 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
5145            ),
5146        }
5147    }
5148
5149    /// A reused `FrameCompressor` must emit byte-identical frames to a
5150    /// fresh compressor per input across both the borrowed (Fast) and
5151    /// owned (Dfast/Lazy/Greedy/Uncompressed) backends. This proves
5152    /// `prepare_frame` fully resets the per-frame state (matcher window,
5153    /// content hasher, FSE/Huffman seeds) between independent frames; a
5154    /// missed reset would corrupt frame N>=2's header checksum or matches.
5155    /// Each emitted frame must also round-trip.
5156    #[test]
5157    fn compress_independent_frame_reuse_matches_fresh_and_roundtrips() {
5158        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
5159        let levels = [
5160            CompressionLevel::Uncompressed,
5161            CompressionLevel::Fastest,
5162            CompressionLevel::Default,
5163            CompressionLevel::Better,
5164            CompressionLevel::Best,
5165            CompressionLevel::Level(5),
5166        ];
5167        let inputs: Vec<Vec<u8>> = vec![
5168            Vec::new(),
5169            vec![0x00],
5170            b"the quick brown fox jumps over the lazy dog\n".to_vec(),
5171            vec![0x7Eu8; 50_000],          // highly compressible
5172            generate_data(0xABCD, 70_000), // pseudo-random
5173            generate_data(0x1234, 200_000),
5174        ];
5175        for level in levels {
5176            let mut cctx: FrameCompressor = FrameCompressor::new(level);
5177            for data in &inputs {
5178                let reused = cctx.compress_independent_frame(data);
5179                let fresh = compress_slice_to_vec(data, level);
5180                assert_eq!(
5181                    reused,
5182                    fresh,
5183                    "reused frame != fresh frame for len={} level={:?}",
5184                    data.len(),
5185                    level,
5186                );
5187                let mut decoder = FrameDecoder::new();
5188                let mut decoded = Vec::with_capacity(data.len());
5189                decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
5190                assert_eq!(
5191                    decoded,
5192                    *data,
5193                    "roundtrip failed for len={} level={:?}",
5194                    data.len(),
5195                    level,
5196                );
5197            }
5198        }
5199    }
5200
5201    /// `compress_independent_frame_into` must replace (not append to) the
5202    /// caller's buffer each call, so a smaller frame after a larger one
5203    /// yields exactly the smaller frame, and the reused buffer's content
5204    /// matches a fresh compression of the same input.
5205    #[test]
5206    fn compress_independent_frame_into_replaces_buffer_contents() {
5207        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
5208        let large = vec![0x11u8; 40_000];
5209        let small = b"short payload".to_vec();
5210        let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
5211        let mut out = Vec::new();
5212        cctx.compress_independent_frame_into(&large, &mut out);
5213        let frame_large = out.clone();
5214        // Reusing the same buffer for a smaller frame must clear it first.
5215        cctx.compress_independent_frame_into(&small, &mut out);
5216        assert_eq!(
5217            out,
5218            compress_slice_to_vec(&small, CompressionLevel::Default),
5219            "reused buffer must hold exactly the second frame",
5220        );
5221        // The first frame, captured before reuse, still round-trips.
5222        let mut decoder = FrameDecoder::new();
5223        let mut decoded = Vec::with_capacity(large.len());
5224        decoder
5225            .decode_all_to_vec(&frame_large, &mut decoded)
5226            .unwrap();
5227        assert_eq!(decoded, large);
5228    }
5229
5230    /// A sticky dictionary set once on a reused compressor must be primed
5231    /// into every independent frame (mirroring `ZSTD_CCtx_loadDictionary`):
5232    /// each frame decodes with the dictionary and is byte-identical to a
5233    /// fresh compressor carrying the same dictionary. This proves
5234    /// `prepare_frame` re-primes the dictionary (matcher content + offset
5235    /// history + entropy seed) every call rather than only on the first.
5236    #[test]
5237    fn compress_independent_frame_reuses_sticky_dictionary() {
5238        use crate::encoding::CompressionLevel;
5239        let dict_raw = include_bytes!("../../dict_tests/dictionary");
5240        let dict_content = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
5241        let mut payload_a = Vec::new();
5242        for _ in 0..8 {
5243            payload_a.extend_from_slice(&dict_content.dict_content[..2048]);
5244        }
5245        let payload_b = b"a different second frame payload, still dict-attached".to_vec();
5246        let inputs = [payload_a, payload_b];
5247
5248        let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5249        cctx.set_dictionary_from_bytes(dict_raw)
5250            .expect("dictionary bytes should parse");
5251
5252        for data in &inputs {
5253            let reused = cctx.compress_independent_frame(data);
5254            // Fresh compressor carrying the same sticky dictionary.
5255            let mut fresh_enc: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
5256            fresh_enc
5257                .set_dictionary_from_bytes(dict_raw)
5258                .expect("dictionary bytes should parse");
5259            let fresh = fresh_enc.compress_independent_frame(data);
5260            assert_eq!(
5261                reused,
5262                fresh,
5263                "reused dict frame != fresh dict frame, len={}",
5264                data.len(),
5265            );
5266            // Round-trip with the dictionary on the decode side.
5267            let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
5268            let mut decoder = FrameDecoder::new();
5269            decoder.add_dict(dict_for_decoder).unwrap();
5270            let mut decoded = Vec::with_capacity(data.len());
5271            decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
5272            assert_eq!(&decoded, data, "dict roundtrip failed, len={}", data.len());
5273        }
5274    }
5275}