structured_zstd/encoding/frame_compressor.rs
1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::vec::Vec;
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12 CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13 match_generator::MatchGeneratorDriver,
14};
15use crate::common::MAX_BLOCK_SIZE;
16use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
17
18use crate::io::{Read, Write};
19
20/// A dictionary prepared for the ENCODER side, analogous to zstd's `CDict`
21/// (vs the decoder's [`Dictionary`](crate::decoding::Dictionary) / `DDict`).
22///
23/// It carries the entropy tables, content, and repeat-offset history the
24/// compressor needs, but is a distinct type with **no decode path**: there is
25/// no way to turn it into a [`DictionaryHandle`](crate::decoding::DictionaryHandle)
26/// or feed it to a [`FrameDecoder`](crate::decoding::FrameDecoder). That keeps
27/// the compress-only state (which may have been parsed without building the
28/// decode lookup tables, see
29/// [`set_dictionary_from_bytes`](FrameCompressor::set_dictionary_from_bytes))
30/// from ever reaching the decode side — the encoder/decoder dictionary split
31/// mirrors C zstd's `CDict` / `DDict`.
32#[derive(Clone)]
33pub struct EncoderDictionary {
34 pub(crate) inner: crate::decoding::Dictionary,
35}
36
37impl EncoderDictionary {
38 /// Wrap an already-parsed [`Dictionary`](crate::decoding::Dictionary) for
39 /// encoder use. A fully-decoded dictionary is valid here; only the encoder
40 /// entropy tables, content, and offset history are read.
41 pub fn from_dictionary(dictionary: crate::decoding::Dictionary) -> Self {
42 Self { inner: dictionary }
43 }
44
45 /// Parse a serialized dictionary blob for encoder use, skipping the decode
46 /// lookup-table build the encoder never reads (see
47 /// `Dictionary::decode_dict_for_encoding`). The encoder entropy tables — and
48 /// thus the emitted frame — are identical to a full parse.
49 pub fn from_bytes(
50 raw_dictionary: &[u8],
51 ) -> Result<Self, crate::decoding::errors::DictionaryDecodeError> {
52 Ok(Self {
53 inner: crate::decoding::Dictionary::decode_dict_for_encoding(raw_dictionary)?,
54 })
55 }
56
57 /// The dictionary id.
58 ///
59 /// A dictionary attached for encoding always has a non-zero id (the
60 /// `set_dictionary*` / `set_encoder_dictionary` attach path rejects a
61 /// zero id). This getter, however, reflects the wrapped dictionary as-is:
62 /// an `EncoderDictionary` built via [`Self::from_dictionary`] from a raw
63 /// `Dictionary` with `id == 0` reports `0` here until it is attached.
64 pub fn id(&self) -> u32 {
65 self.inner.id
66 }
67}
68
69/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
70///
71/// `FrameCompressor` will generally be used by:
72/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
73/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
74///
75/// # Examples
76/// ```
77/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
78/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
79/// let mut output = std::vec::Vec::new();
80/// // Initialize a compressor.
81/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
82/// compressor.set_source(mock_data);
83/// compressor.set_drain(&mut output);
84///
85/// // `compress` writes the compressed output into the provided buffer.
86/// compressor.compress();
87/// ```
88pub struct FrameCompressor<
89 R: Read = &'static [u8],
90 W: Write = Vec<u8>,
91 M: Matcher = MatchGeneratorDriver,
92> {
93 uncompressed_data: Option<R>,
94 compressed_data: Option<W>,
95 compression_level: CompressionLevel,
96 dictionary: Option<EncoderDictionary>,
97 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
98 source_size_hint: Option<u64>,
99 state: CompressState<M>,
100 /// When true, emitted frames omit the 4-byte magic number prefix
101 /// (`ZSTD_f_zstd1_magicless`). Default false. The caller is
102 /// responsible for ensuring the decoder is configured for the
103 /// matching format — wire-format only round-trips with a
104 /// magicless-aware decoder.
105 magicless: bool,
106 /// Whether to emit a trailing XXH64 content checksum and set the frame
107 /// header's `Content_Checksum_flag` (semantics of upstream
108 /// `ZSTD_c_checksumFlag`). Default `false`, matching the upstream
109 /// library default; combined with the `hash` feature at frame-build
110 /// time, so without `hash` no checksum is emitted regardless. Set via
111 /// [`Self::set_content_checksum`].
112 content_checksum: bool,
113 /// Whether to record `Frame_Content_Size` in the frame header when the
114 /// total size is known (semantics of upstream `ZSTD_c_contentSizeFlag`).
115 /// Default `true`, matching upstream. With the flag off the header
116 /// carries a window descriptor instead (single-segment requires an FCS,
117 /// so it is disabled too). Set via [`Self::set_content_size_flag`].
118 content_size_flag: bool,
119 /// Whether to record the dictionary ID in the frame header when a
120 /// dictionary is attached (semantics of upstream `ZSTD_c_dictIDFlag`).
121 /// Default `true`, matching upstream. Decoders can still decode the
122 /// frame by being handed the right dictionary explicitly. Set via
123 /// [`Self::set_dictionary_id_flag`].
124 dict_id_flag: bool,
125 /// Upper bound on emitted block sizes (semantics of upstream
126 /// `ZSTD_c_targetCBlockSize`): capping the RAW block length at the
127 /// target bounds every physical block's compressed payload at the
128 /// target too (a compressed block never exceeds its raw input — the
129 /// raw-block fallback fires otherwise), so blocks land at or under
130 /// `target + 3` header bytes on the wire. `None` = no target (full
131 /// 128 KiB blocks). Set via [`Self::set_target_block_size`].
132 target_block_size: Option<u32>,
133 #[cfg(feature = "hash")]
134 hasher: XxHash64,
135 /// Block-layout introspection populated at the end of every
136 /// successful `compress()`. `None` until the first call.
137 /// Behind the `lsm` feature gate.
138 #[cfg(feature = "lsm")]
139 frame_emit_info: Option<crate::encoding::frame_emit_info::FrameEmitInfo>,
140 /// When `true`, `compress()` XXH64-hashes each block's
141 /// uncompressed bytes and appends the low-32-bit digest to
142 /// `block_checksums`. Default `false` (zero cost). Gated on
143 /// `all(lsm, hash)` because XXH64 lives behind the `hash`
144 /// feature; an `lsm`-only build has no way to compute digests.
145 #[cfg(all(feature = "lsm", feature = "hash"))]
146 per_block_checksums_enabled: bool,
147 /// Per-block XXH64 (low 32 bits) digests captured during
148 /// `compress()` when `per_block_checksums_enabled` is set. Ordered
149 /// by block-emit order. `None` until the first call after enabling.
150 /// Gated on `all(lsm, hash)` (see `per_block_checksums_enabled`).
151 #[cfg(all(feature = "lsm", feature = "hash"))]
152 block_checksums: Option<alloc::vec::Vec<u32>>,
153 /// Per-physical-block decompressed (regenerated) sizes captured
154 /// during `compress()`, in block-emit order (1:1 with
155 /// `frame_emit_info.blocks`). Always captured under `lsm` (no
156 /// opt-in, unlike `block_checksums`) because `FrameEmitInfo` is
157 /// always built under `lsm` and `decompressed_byte_range` needs
158 /// the per-block sizes. Cleared and refilled per frame.
159 #[cfg(feature = "lsm")]
160 block_decompressed_sizes: alloc::vec::Vec<u32>,
161 /// Effective strategy tag when a public-parameter
162 /// [`Strategy`](crate::encoding::Strategy) override (#27) is active.
163 /// `Some` overrides the level-derived `state.strategy_tag` so the
164 /// literal-compression gates and dict-attach cutoff see the strategy
165 /// the matcher actually runs, not the base level's. `None` keeps the
166 /// level-derived tag.
167 strategy_override: Option<crate::encoding::strategy::StrategyTag>,
168}
169
170#[derive(Clone, Default)]
171pub(crate) struct CachedDictionaryEntropy {
172 pub(crate) huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
173 pub(crate) ll_previous: Option<PreviousFseTable>,
174 pub(crate) ml_previous: Option<PreviousFseTable>,
175 pub(crate) of_previous: Option<PreviousFseTable>,
176}
177
178impl CachedDictionaryEntropy {
179 /// Heap bytes the cached dictionary entropy holds: the literals Huffman
180 /// table plus any `Custom` LL/ML/OF FSE tables (the `Arc`-boxed `FSETable`
181 /// payload and its flat state array). `Default` / `Rle` variants own no heap.
182 pub(crate) fn heap_size(&self) -> usize {
183 let mut total = self.huff.as_ref().map_or(0, |h| h.heap_size());
184 for prev in [&self.ll_previous, &self.ml_previous, &self.of_previous] {
185 if let Some(PreviousFseTable::Custom(table)) = prev {
186 total +=
187 core::mem::size_of::<crate::fse::fse_encoder::FSETable>() + table.heap_size();
188 }
189 }
190 total
191 }
192
193 /// Derive the encoder-side entropy tables a dictionary seeds for the first
194 /// block of each frame (the upstream zstd `cdict->cBlockState`): the literals
195 /// Huffman table plus the literal-length / match-length / offset FSE
196 /// "previous" tables. Shared by [`FrameCompressor`] and
197 /// [`crate::encoding::StreamingEncoder`] so both seed identically.
198 pub(crate) fn from_dictionary(dictionary: &crate::decoding::Dictionary) -> Self {
199 Self {
200 huff: dictionary.huf.table.to_encoder_table(),
201 ll_previous: dictionary
202 .fse
203 .literal_lengths
204 .to_encoder_table()
205 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
206 ml_previous: dictionary
207 .fse
208 .match_lengths
209 .to_encoder_table()
210 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
211 of_previous: dictionary
212 .fse
213 .offsets
214 .to_encoder_table()
215 .map(|table| PreviousFseTable::Custom(SharedFseTable::new(table))),
216 }
217 }
218}
219
220/// Shared owner for a custom "previous" FSE encoder table. `Arc` on
221/// atomic-pointer targets, `Rc` otherwise (keeps `no_std` no-atomics
222/// builds compiling, single-thread there anyway), mirroring
223/// `decoding::dictionary::SharedDictionary`. Cloning the cached
224/// dictionary entropy into the per-frame state is then a refcount bump,
225/// not a full `FSETable` copy — the upstream zstd references `cdict->cBlockState`
226/// instead of rebuilding it per frame.
227#[cfg(target_has_atomic = "ptr")]
228pub(crate) type SharedFseTable = alloc::sync::Arc<FSETable>;
229#[cfg(not(target_has_atomic = "ptr"))]
230pub(crate) type SharedFseTable = alloc::rc::Rc<FSETable>;
231
232#[derive(Clone)]
233pub(crate) enum PreviousFseTable {
234 // Default tables are immutable and already stored alongside the state, so
235 // repeating them only needs a lightweight marker instead of cloning FSETable.
236 Default,
237 // Shared handle: cloning (per-frame dictionary entropy seed) is a refcount
238 // bump. The table is only ever read or REPLACED wholesale (a block that
239 // builds a new table swaps in a fresh `SharedFseTable`), never mutated in
240 // place, so sharing is sound.
241 Custom(SharedFseTable),
242 Rle(u8),
243}
244
245impl PreviousFseTable {
246 pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
247 match self {
248 Self::Default => Some(default),
249 Self::Custom(table) => Some(table),
250 Self::Rle(_) => None,
251 }
252 }
253}
254
255pub(crate) struct FseTables {
256 /// The three predefined LL/ML/OF tables are functions of
257 /// compile-time-constant distributions. The
258 /// [`fse_encoder::FseDefaultTable`] type alias resolves to
259 /// `&'static FSETable` when a process-wide cache is available
260 /// (atomic-pointer targets, or no-atomic targets with the
261 /// `critical-section` feature) and to `Box<FSETable>` on the
262 /// cache-less no-atomic path (one per-frame allocation, dropped
263 /// with the compressor — no `Box::leak`, no unbounded growth).
264 /// Both arms `Deref` to `FSETable`, so consumers in
265 /// `encoding/blocks/compressed.rs` borrow through `&` uniformly
266 /// without seeing the per-target divergence.
267 pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
268 pub(crate) ll_previous: Option<PreviousFseTable>,
269 pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
270 pub(crate) ml_previous: Option<PreviousFseTable>,
271 pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
272 pub(crate) of_previous: Option<PreviousFseTable>,
273}
274
275impl FseTables {
276 pub fn new() -> Self {
277 Self {
278 ll_default: default_ll_table(),
279 ll_previous: None,
280 ml_default: default_ml_table(),
281 ml_previous: None,
282 of_default: default_of_table(),
283 of_previous: None,
284 }
285 }
286
287 /// Borrow the LL default table as `&FSETable`. Abstracts the cfg
288 /// split in [`crate::fse::fse_encoder::FseDefaultTable`] —
289 /// `&'static FSETable` (atomic / `critical-section`) auto-derefs
290 /// directly; `Box<FSETable>` (cache-less no-atomic) derefs
291 /// through `Box`. Both arms yield `&FSETable` uniformly so
292 /// downstream consumers can stay cfg-agnostic.
293 #[inline]
294 #[allow(clippy::borrow_deref_ref)]
295 pub(crate) fn ll_default_ref(&self) -> &FSETable {
296 &*self.ll_default
297 }
298
299 /// Borrow the ML default table as `&FSETable`. See [`Self::ll_default_ref`].
300 #[inline]
301 #[allow(clippy::borrow_deref_ref)]
302 pub(crate) fn ml_default_ref(&self) -> &FSETable {
303 &*self.ml_default
304 }
305
306 /// Borrow the OF default table as `&FSETable`. See [`Self::ll_default_ref`].
307 #[inline]
308 #[allow(clippy::borrow_deref_ref)]
309 pub(crate) fn of_default_ref(&self) -> &FSETable {
310 &*self.of_default
311 }
312}
313
314const PRESPLIT_BLOCK_MIN: usize = 3500;
315const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
316const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
317const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
318const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
319const PRESPLIT_HASH_LOG_MAX: usize = 10;
320const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
321const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
322/// Upstream zstd `SEGMENT_SIZE` in `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:201`).
323/// Two `SEGMENT_SIZE`-byte fingerprints — one from the start, one from the end —
324/// drive the cheap border heuristic; a third one from the middle disambiguates
325/// where in the block the transition sits.
326const PRESPLIT_BORDERS_SEGMENT: usize = 512;
327
328#[derive(Clone)]
329struct PreSplitFingerprint {
330 events: [u32; PRESPLIT_HASH_TABLE_SIZE],
331 nb_events: usize,
332}
333
334impl Default for PreSplitFingerprint {
335 fn default() -> Self {
336 Self {
337 events: [0; PRESPLIT_HASH_TABLE_SIZE],
338 nb_events: 0,
339 }
340 }
341}
342
343/// Grow `out` ahead of the next block so block emission never lands on an
344/// amortized-doubling reallocation mid-frame (whose transient old+new copy
345/// spikes peak memory to ~3x the output), sizing the reservation from the
346/// compression ratio observed so far instead of the whole-input worst case.
347///
348/// `blocks_start` is where this frame's blocks begin in `out`, `consumed`
349/// the input bytes already emitted as blocks, `remaining` the input
350/// bytes still to compress (an estimate is fine: a low one only means one
351/// more re-estimate later), and `block_capacity` the active block-size cap
352/// (`FrameCompressor::block_capacity`) so a small `targetCBlockSize` does
353/// not keep a 128 KiB floor in the buffer or undercount header density.
354/// Incompressible input re-estimates to ~the full `compress_bound` after
355/// the first block — the old up-front policy's worst case — while
356/// compressible input stays at output scale.
357fn reserve_for_next_block(
358 out: &mut Vec<u8>,
359 blocks_start: usize,
360 consumed: u64,
361 remaining: usize,
362 block_capacity: usize,
363) {
364 // Worst-case single-block output: 3-byte header + raw payload, plus
365 // slack for the 4-byte frame checksum trailer and a few extra sub-block
366 // headers from the post-split emitters, so neither can reallocate.
367 let block_bound = remaining.min(block_capacity) + 3 + 16;
368 if out.capacity() - out.len() >= block_bound {
369 return;
370 }
371 let produced = (out.len() - blocks_start) as u64;
372 let estimate = if consumed == 0 {
373 // No ratio signal yet (capacity exhausted before the first block —
374 // only reachable with a caller-shrunk `out`): one block's bound.
375 block_bound
376 } else {
377 // remaining * observed ratio + per-block headers + 1/16 slack so a
378 // slightly-worsening tail doesn't force a reallocation per block.
379 // u128 keeps the product exact for multi-GiB frames.
380 let scaled = ((remaining as u128 * produced as u128) / consumed as u128) as u64;
381 let headers = (remaining as u64 / block_capacity.max(1) as u64 + 1) * 3;
382 usize::try_from(scaled + scaled / 16 + headers + 64).unwrap_or(usize::MAX)
383 };
384 // `reserve_exact`: the estimate already carries its own slack, and the
385 // whole-buffer doubling policy is exactly what this function exists to
386 // avoid. The `produced`-sized floor keeps growth geometric when the
387 // ratio estimate lands BELOW one block's bound (highly compressible
388 // input): without it every block would trigger a block-sized
389 // reallocation — O(blocks) buffer copies — while with it the buffer at
390 // least doubles its produced span per reallocation (O(log) copies) and
391 // the peak stays at output scale.
392 out.reserve_exact(estimate.max(block_bound + produced as usize));
393}
394
395fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
396 debug_assert!(hash_log >= 8);
397 if hash_log == 8 {
398 return bytes[0] as usize;
399 }
400 debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
401 let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
402 (value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
403}
404
405fn presplit_record_fingerprint(
406 fp: &mut PreSplitFingerprint,
407 src: &[u8],
408 sampling_rate: usize,
409 hash_log: usize,
410) {
411 fp.events.fill(0);
412 fp.nb_events = 0;
413 if src.len() < 2 {
414 return;
415 }
416 let limit = src.len() - 1;
417 let mut n = 0usize;
418 while n < limit {
419 fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
420 n += sampling_rate;
421 }
422 // Upstream zstd parity: zstd_preSplit.c records the integer division, not the
423 // rounded-up number of sampled events from the loop above.
424 fp.nb_events += limit / sampling_rate;
425}
426
427/// Single-byte histogram pass — matches upstream zstd `HIST_add` over a small
428/// segment with `hashLog == 8` (the `hash2` shortcut at
429/// `zstd_preSplit.c:36` returns the raw byte). The byChunks path uses
430/// 2-byte hashing for `hashLog >= 9`; this helper exists so the borders
431/// heuristic doesn't pay for that wider hash on its 512-byte windows.
432fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
433 fp.events.fill(0);
434 for &b in src {
435 fp.events[b as usize] += 1;
436 }
437 // Upstream zstd `HIST_add` returns the maximum symbol; the caller then sets
438 // `nbEvents = SEGMENT_SIZE` explicitly (see `zstd_preSplit.c:213`).
439 fp.nb_events = src.len();
440}
441
442fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
443 let slots = 1usize << hash_log;
444 let mut distance = 0u64;
445 for idx in 0..slots {
446 let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
447 let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
448 // Plain `+`: events/nb_events are per-block sample counts (<= block
449 // size), so each |left-right| <= (2^17)^2 and the sum over <= 2^hash_log
450 // slots stays far under u64::MAX — no overflow.
451 distance += left.abs_diff(right) as u64;
452 }
453 distance
454}
455
456fn presplit_fingerprints_differ(
457 reference: &PreSplitFingerprint,
458 new_fp: &PreSplitFingerprint,
459 penalty: i32,
460 hash_log: usize,
461) -> bool {
462 debug_assert!(reference.nb_events > 0);
463 debug_assert!(new_fp.nb_events > 0);
464 let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
465 let deviation = presplit_distance(reference, new_fp, hash_log);
466 // Plain `*`: p50 <= (block-sample-count)^2 and the (base+penalty) factor is
467 // a small constant, so the product stays well under u64::MAX.
468 let threshold =
469 p50 * (PRESPLIT_THRESHOLD_BASE + penalty as u64) / PRESPLIT_THRESHOLD_PENALTY_RATE;
470 deviation >= threshold
471}
472
473fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
474 // Plain `+`: `acc` accumulates only the chunks of a single block (caller
475 // loops within one block, <= MAX_BLOCK_SIZE), so the merged sample counts
476 // stay far under u32 / usize bounds — no overflow.
477 for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
478 acc.events[idx] += new_fp.events[idx];
479 }
480 acc.nb_events += new_fp.nb_events;
481}
482
483fn split_block_by_chunks(block: &[u8], level: usize) -> usize {
484 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
485 debug_assert!((1..=4).contains(&level));
486 let (sampling_rate, hash_log) = match level - 1 {
487 0 => (43, 8),
488 1 => (11, 9),
489 2 => (5, 10),
490 _ => (1, 10),
491 };
492
493 let mut past = PreSplitFingerprint::default();
494 let mut new_events = PreSplitFingerprint::default();
495 let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
496 presplit_record_fingerprint(
497 &mut past,
498 &block[..PRESPLIT_CHUNK_SIZE],
499 sampling_rate,
500 hash_log,
501 );
502 let mut pos = PRESPLIT_CHUNK_SIZE;
503 while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
504 presplit_record_fingerprint(
505 &mut new_events,
506 &block[pos..pos + PRESPLIT_CHUNK_SIZE],
507 sampling_rate,
508 hash_log,
509 );
510 if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
511 return pos;
512 }
513 presplit_merge_events(&mut past, &new_events);
514 if penalty > 0 {
515 penalty -= 1;
516 }
517 pos += PRESPLIT_CHUNK_SIZE;
518 }
519 block.len()
520}
521
522/// Upstream zstd port of `ZSTD_splitBlock_fromBorders` (`zstd_preSplit.c:198`).
523/// Records two 512-byte byte-histograms — one from each end of a 128 KB
524/// block — and a third from the middle as a tie-breaker; returns either
525/// a quantised split point (32 KB / 64 KB / 96 KB) or the full block
526/// size when the two ends look indistinguishable. Cheaper than the
527/// chunk-based path because it touches at most 1.5 KB of input
528/// regardless of block size.
529fn split_block_from_borders(block: &[u8]) -> usize {
530 debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
531 let block_size = block.len();
532 let mut past = PreSplitFingerprint::default();
533 let mut new_fp = PreSplitFingerprint::default();
534 presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
535 presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
536 // Upstream zstd uses `penalty = 0, hash_log = 8` — i.e. raw byte histogram
537 // distance with no threshold padding (`zstd_preSplit.c:214`).
538 if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
539 return block_size;
540 }
541
542 let mut middle = PreSplitFingerprint::default();
543 let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
544 presplit_record_byte_histogram(
545 &mut middle,
546 &block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
547 );
548
549 let dist_from_begin = presplit_distance(&past, &middle, 8);
550 let dist_from_end = presplit_distance(&new_fp, &middle, 8);
551 // Upstream zstd `SEGMENT_SIZE * SEGMENT_SIZE / 3` (`zstd_preSplit.c:221`):
552 // if the middle is roughly equidistant from both ends, the change
553 // sits near the centre — split at the midpoint.
554 let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
555 if dist_from_begin.abs_diff(dist_from_end) < min_distance {
556 return 64 * 1024;
557 }
558 // Larger `dist_from_begin` (i.e. `middle` farther from the head
559 // fingerprint, equivalently closer to the tail) means the new
560 // statistics already dominate the centre — the transition
561 // happened EARLY → emit a small 32 KB head and let the 96 KB
562 // tail absorb the rest. Inverse case: `dist_from_end` larger
563 // (middle still resembles the head) means the transition is
564 // LATE → emit a 96 KB head so the trailing 32 KB carries the
565 // new statistics alone.
566 if dist_from_begin > dist_from_end {
567 32 * 1024
568 } else {
569 96 * 1024
570 }
571}
572
573/// XXH64 (low 32 bits, seed 0) over `data`. Shared helper for the
574/// per-physical-block checksum sidecar so encoder and decoder hash
575/// the exact same byte ranges with the exact same parameters. Gated
576/// at `all(lsm, hash)` because the only consumer is the lsm-side
577/// `block_checksums` sidecar; non-lsm builds carry no reference to
578/// this helper at all.
579#[cfg(all(feature = "lsm", feature = "hash"))]
580#[inline]
581pub(crate) fn xxh64_block_low32(data: &[u8]) -> u32 {
582 let mut h = XxHash64::with_seed(0);
583 h.write(data);
584 h.finish() as u32
585}
586
587/// Bench-only entry point for the upstream zstd-parity comparator test in
588/// `tests/block_splitter_parity.rs`. Dispatches to the same
589/// `_from_borders` (split_level == 0) / `_by_chunks` (split_level ∈
590/// 1..=4) ports that `optimal_block_size` itself routes
591/// through. Caller is responsible for passing exactly
592/// `MAX_BLOCK_SIZE` bytes (per upstream zstd `ZSTD_splitBlock` contract —
593/// "@blockSize must be == 128 KB" in `zstd_preSplit.h`).
594#[cfg(feature = "bench_internals")]
595pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
596 assert_eq!(
597 block.len(),
598 MAX_BLOCK_SIZE as usize,
599 "block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
600 );
601 assert!(
602 split_level <= 4,
603 "block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
604 );
605 if split_level == 0 {
606 split_block_from_borders(block)
607 } else {
608 split_block_by_chunks(block, split_level)
609 }
610}
611
612/// Pull a pre-split window into cache with one bandwidth-bound sequential
613/// pass before the strided fingerprint histogram + match scan read it.
614///
615/// The borrowed (no-copy) over-window path matches in place on the caller's
616/// input, so the pre-split fingerprint is the FIRST touch of that 128 KiB
617/// region — a cache-cold read. `presplit_record_fingerprint` reads it with a
618/// `sampling_rate` stride and interleaved random writes into the 1 KiB events
619/// table, a latency-bound pattern that pays full DRAM miss latency per line
620/// (measured ~3x the cost of an ERMS streaming read of the same bytes). The
621/// owned path never hits this because its history-mirror copy already warmed
622/// the bytes; this restores that warmth without the copy's write half. One
623/// dependent load per 64-byte line (the i9 line size) streams under the
624/// hardware prefetcher, so the cold read is paid once at memory bandwidth and
625/// every subsequent strided sample lands in L1/L2. `black_box` keeps the loop
626/// from being optimized away as a dead read.
627#[inline]
628fn warm_presplit_window(window: &[u8]) {
629 let mut acc = 0u8;
630 let mut i = 0usize;
631 while i < window.len() {
632 acc ^= window[i];
633 i += 64;
634 }
635 core::hint::black_box(acc);
636}
637
638pub(crate) fn optimal_block_size(
639 level: CompressionLevel,
640 block: &[u8],
641 remaining_src_size: usize,
642 block_size_max: usize,
643 savings: i64,
644) -> usize {
645 let Some(split_level) = crate::encoding::match_generator::level_pre_split(level) else {
646 return remaining_src_size.min(block_size_max);
647 };
648 if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
649 return remaining_src_size.min(block_size_max);
650 }
651 if savings < 3 {
652 return MAX_BLOCK_SIZE as usize;
653 }
654 if block.len() < MAX_BLOCK_SIZE as usize {
655 return remaining_src_size.min(block_size_max);
656 }
657 // Upstream zstd `ZSTD_splitBlock` dispatch (`zstd_preSplit.c:234`):
658 // `split_level == 0` → cheap borders heuristic;
659 // `split_level == 1..=4` → byChunks with internal sampling level
660 // `split_level - 1`.
661 let raw_split = if split_level == 0 {
662 split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
663 } else {
664 split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
665 };
666 raw_split
667 .max(PRESPLIT_BLOCK_MIN)
668 .min(MAX_BLOCK_SIZE as usize)
669}
670
671pub(crate) struct CompressState<M: Matcher> {
672 pub(crate) matcher: M,
673 pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
674 /// Recycled `HuffmanTable` buffers: when a block clears or replaces
675 /// `last_huff_table`, the old table parks here instead of dropping, so
676 /// the next frame's dictionary entropy seed `clone_from`s into existing
677 /// allocations. Without this, every dict-seeded frame whose last block
678 /// ended raw/RLE paid a fresh two-Vec table clone per frame.
679 pub(crate) huff_table_spare: Option<crate::huff0::huff0_encoder::HuffmanTable>,
680 pub(crate) fse_tables: FseTables,
681 pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
682 /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
683 /// Initialized to [1, 4, 8] per RFC 8878 §3.1.2.5.
684 pub(crate) offset_hist: [u32; 3],
685 /// Strategy tag resolved from the current `CompressionLevel` at every
686 /// `matcher.reset()` call. Used by the literal-compression gates
687 /// (`min_literals_to_compress`, `min_gain`) in
688 /// `encoding::blocks::compressed` to mirror upstream zstd's strategy-aware
689 /// thresholds (`zstd_compress_literals.c:114-127, 187-188`).
690 ///
691 /// **Invariant (required of every construction site):** must be
692 /// initialized from the active `CompressionLevel` via
693 /// `StrategyTag::for_compression_level`, and re-synced from the
694 /// active level alongside every `matcher.reset()` call so the
695 /// level-aware gates stay correct after a level change. The two
696 /// reset sites that own this sync are `FrameCompressor::compress`
697 /// and `StreamingEncoder::ensure_frame_started`. There is no
698 /// `Default` impl — production constructors
699 /// (`FrameCompressor::new`, `new_with_matcher`, the streaming
700 /// encoder constructor) plumb this explicitly. Tests that build
701 /// `CompressState` by hand must also supply a value.
702 pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
703 /// Whether the HUF literal table build runs the #167 table-log search
704 /// (`true`) or the cheap single-build (`false`). For the Fast strategy the
705 /// search is a clean ratio win over upstream zstd but costs ~1.5 us per
706 /// literal section — negligible on large inputs, ~20% on small ones — and
707 /// the Fast matcher is byte-faithful to upstream zstd (so the cheap path
708 /// ties it). So it is gated ON only for large Fast frames; non-Fast
709 /// strategies always keep it (their matchers diverge, making the search
710 /// load-bearing for ratio — gating those is deferred). Set per frame
711 /// alongside `strategy_tag` via [`fast_huf_search_enabled`].
712 pub(crate) huf_optimal_search: bool,
713}
714
715/// Whether the Fast-strategy HUF literal build should run the #167 table-log
716/// search for a frame of `source_size` bytes (see
717/// [`CompressState::huf_optimal_search`]). Non-Fast strategies always search.
718/// For Fast, enable the search only above 128 KiB (one full block) — below
719/// that the per-frame setup dominates and the search's ~1.5 us is a large
720/// relative cost for a ratio gain the cheap path forgoes while still tying
721/// upstream zstd. Unknown size (streaming) keeps the search for conservative
722/// ratio.
723pub(crate) fn fast_huf_search_enabled(
724 strategy: crate::encoding::strategy::StrategyTag,
725 source_size: Option<u64>,
726) -> bool {
727 if strategy != crate::encoding::strategy::StrategyTag::Fast {
728 return true;
729 }
730 match source_size {
731 Some(size) => size > 128 * 1024,
732 None => true,
733 }
734}
735
736impl<M: Matcher> CompressState<M> {
737 /// Clears `last_huff_table`, parking the table's buffers in
738 /// `huff_table_spare` for reuse instead of dropping them.
739 #[inline]
740 pub(crate) fn clear_huff_table(&mut self) {
741 if let Some(table) = self.last_huff_table.take() {
742 self.huff_table_spare = Some(table);
743 }
744 }
745
746 /// Replaces `last_huff_table` with `table`, parking any displaced table
747 /// in `huff_table_spare` for reuse.
748 #[inline]
749 pub(crate) fn replace_huff_table(&mut self, table: crate::huff0::huff0_encoder::HuffmanTable) {
750 if let Some(old) = self.last_huff_table.replace(table) {
751 self.huff_table_spare = Some(old);
752 }
753 }
754}
755
756/// Per-frame setup resolved once by [`FrameCompressor::prepare_frame`] and
757/// consumed by the block loop + [`FrameCompressor::finish_frame`]. Lets the
758/// owned `compress()` and the borrowed one-shot path share identical
759/// reset / dict-prime / entropy-seed setup and frame-tail emission.
760struct FramePrep {
761 window_size: u64,
762 use_dictionary_state: bool,
763 source_size_hint_known: bool,
764 initial_size_hint: Option<u64>,
765}
766
767/// Initial capacity for the `all_blocks` accumulator, by source-size hint.
768/// The frame header is written only after all input is read (so
769/// Frame_Content_Size is known), so compressed blocks accumulate in memory
770/// first. Seed-size tiers (mirrors upstream zstd `ZSTD_CStreamOutSize` naming):
771/// - tiny (`<= 4 KiB` hint): payload-bound seed, `>=` anything a tiny input's
772/// compressed output could need.
773/// - small (`<= 64 KiB` hint): absorbs one or two `Vec::extend` doublings
774/// without over-allocating.
775/// - default (one upstream zstd block, `130 KiB`): the value the rest of the encoder
776/// is sized around; larger inputs amortise the first doublings cheaply and
777/// the residue is dominated by internal `compress_block_encoded` buffers.
778///
779/// Shared by the owned (`run_owned_block_loop`) and borrowed
780/// (`run_borrowed_block_loop`) paths so the tier table can't drift between them.
781///
782/// `block_capacity` (the active `targetCBlockSize` cap, or the 128 KiB
783/// format ceiling) bounds every tier: with a small target the first
784/// allocation tracks one capped block + header/checksum slack instead of
785/// keeping the upstream zstd-sized floor that only later growth respects.
786fn initial_all_blocks_cap(initial_size_hint: Option<u64>, block_capacity: usize) -> usize {
787 const TINY_THRESHOLD: u64 = 4 * 1024;
788 const SMALL_THRESHOLD: u64 = 64 * 1024;
789 const TINY_CAP: usize = 4 * 1024;
790 const SMALL_CAP: usize = 16 * 1024;
791 const DEFAULT_CAP: usize = 130 * 1024;
792 let first_block_cap = block_capacity + 3 + 16;
793 match initial_size_hint {
794 Some(h) if h <= TINY_THRESHOLD => TINY_CAP.min(first_block_cap),
795 Some(h) if h <= SMALL_THRESHOLD => SMALL_CAP.min(first_block_cap),
796 _ => DEFAULT_CAP.min(first_block_cap),
797 }
798}
799
800/// Per-block feeder for `run_owned_block_loop`.
801///
802/// `fill_block` appends source bytes to `buf` (which already holds any
803/// carried pre-split suffix) until `buf.len() == block_capacity` or the
804/// source is exhausted, returning `(bytes_appended, reached_eof)`.
805/// `reached_eof` is true iff the block could NOT be filled to
806/// `block_capacity` — the boundary the historical `Read`-loop produced (an
807/// input that is an exact multiple of the block size still yields a
808/// trailing empty last block on the next iteration).
809///
810/// The slice impl exists so the slice entry points
811/// (`compress_independent_frame_into`, `compress_oneshot_*` fallbacks)
812/// append with one `extend_from_slice` — the generic reader impl must
813/// `resize` an initialized target region before `Read::read` can fill it,
814/// which costs a zero-fill memset of the whole block on every frame.
815pub(crate) trait OwnedBlockSource {
816 fn fill_block(
817 &mut self,
818 buf: &mut Vec<u8>,
819 block_capacity: usize,
820 size_hint_remaining: Option<u64>,
821 ) -> (usize, bool);
822}
823
824impl OwnedBlockSource for &[u8] {
825 fn fill_block(
826 &mut self,
827 buf: &mut Vec<u8>,
828 block_capacity: usize,
829 _size_hint_remaining: Option<u64>,
830 ) -> (usize, bool) {
831 let want = block_capacity - buf.len();
832 let take = want.min(self.len());
833 buf.extend_from_slice(&self[..take]);
834 *self = &self[take..];
835 (take, take < want)
836 }
837}
838
839/// Adapter routing a generic [`Read`] source through [`OwnedBlockSource`]:
840/// preserves the historical sizing behaviour — an initialized target region
841/// bounded by the source-size hint, grown (doubling, capped) only when the
842/// hint under-counted.
843pub(crate) struct ReaderBlockSource<Rd>(pub(crate) Rd);
844
845impl<Rd: Read> OwnedBlockSource for ReaderBlockSource<Rd> {
846 fn fill_block(
847 &mut self,
848 buf: &mut Vec<u8>,
849 block_capacity: usize,
850 size_hint_remaining: Option<u64>,
851 ) -> (usize, bool) {
852 let start = buf.len();
853 let mut filled = start;
854 let mut reached_eof = false;
855 // Size the read buffer to the bytes this block actually expects
856 // rather than always zero-filling a full MAX_BLOCK_SIZE: a small
857 // frame otherwise pays a 128 KiB `resize(_, 0)` memset per block
858 // just to read a few KiB (the zero-fill past `filled` is then
859 // truncated away).
860 //
861 // Overflow-free by construction (no `saturating_*` masking):
862 // `filled <= block_capacity` always (the read only ever targets
863 // `[filled..len]` with `len <= block_capacity`, and a carried-over
864 // pre-split suffix is a `split_off` below `block_capacity`), so
865 // `block_capacity - filled` never underflows; pinning `remaining`
866 // to `block_capacity` before the `usize` cast keeps the cast and
867 // the final add within `usize` on every target.
868 let initial_target = match size_hint_remaining {
869 Some(remaining) => {
870 let remaining = remaining.min(block_capacity as u64) as usize;
871 filled + remaining.min(block_capacity - filled)
872 }
873 // Unknown hint, or an inexact hint already met by prior blocks:
874 // read against the full block window.
875 None => block_capacity,
876 };
877 if buf.len() < initial_target {
878 buf.resize(initial_target, 0);
879 }
880 loop {
881 if reached_eof || filled == block_capacity {
882 break;
883 }
884 if filled == buf.len() {
885 // Hint under-counted the block; grow toward block_capacity
886 // (doubling, capped) so reading continues without paying a
887 // full-buffer zero up front. `len <= block_capacity` so the
888 // double stays well within `usize`; `filled < block_capacity`
889 // here (the `== block_capacity` break fired otherwise), so
890 // `filled + 1 <= block_capacity`.
891 let grow_to = (buf.len() * 2).clamp(filled + 1, block_capacity);
892 buf.resize(grow_to, 0);
893 }
894 let read_end = buf.len();
895 let new_bytes = self.0.read(&mut buf[filled..read_end]).unwrap();
896 if new_bytes == 0 {
897 reached_eof = true;
898 break;
899 }
900 filled += new_bytes;
901 }
902 buf.truncate(filled);
903 (filled - start, reached_eof)
904 }
905}
906
907impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
908 /// Create a new `FrameCompressor`
909 pub fn new(compression_level: CompressionLevel) -> Self {
910 Self {
911 uncompressed_data: None,
912 compressed_data: None,
913 compression_level,
914 dictionary: None,
915 dictionary_entropy_cache: None,
916 source_size_hint: None,
917 state: CompressState {
918 matcher: MatchGeneratorDriver::new(1024 * 128, 1),
919 last_huff_table: None,
920 huff_table_spare: None,
921 fse_tables: FseTables::new(),
922 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
923 offset_hist: [1, 4, 8],
924 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
925 compression_level,
926 ),
927 huf_optimal_search: true,
928 },
929 magicless: false,
930 content_checksum: false,
931 content_size_flag: true,
932 dict_id_flag: true,
933 target_block_size: None,
934 #[cfg(feature = "hash")]
935 hasher: XxHash64::with_seed(0),
936 #[cfg(feature = "lsm")]
937 frame_emit_info: None,
938 #[cfg(all(feature = "lsm", feature = "hash"))]
939 per_block_checksums_enabled: false,
940 #[cfg(all(feature = "lsm", feature = "hash"))]
941 block_checksums: None,
942 #[cfg(feature = "lsm")]
943 block_decompressed_sizes: alloc::vec::Vec::new(),
944 strategy_override: None,
945 }
946 }
947
948 /// Configure fine-grained compression parameters (#27).
949 ///
950 /// Resets the base [`CompressionLevel`](crate::encoding::CompressionLevel)
951 /// to the parameters' level and installs the per-knob overrides
952 /// (window/hash/chain/search logs, strategy, LDM) applied at the next
953 /// frame. Pass `None`-equivalent (a builder that overrides nothing)
954 /// to fall back to plain level-based compression.
955 ///
956 /// ```rust
957 /// use structured_zstd::encoding::{
958 /// CompressionLevel, CompressionParameters, FrameCompressor, Strategy,
959 /// };
960 /// let params = CompressionParameters::builder(CompressionLevel::Level(19))
961 /// .strategy(Strategy::Btultra2)
962 /// .enable_long_distance_matching(true)
963 /// .build()
964 /// .unwrap();
965 /// let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
966 /// compressor.set_parameters(¶ms);
967 /// let compressed = compressor.compress_independent_frame(b"some data to compress");
968 /// assert!(!compressed.is_empty());
969 /// ```
970 pub fn set_parameters(&mut self, params: &crate::encoding::CompressionParameters) {
971 self.compression_level = params.level();
972 let overrides = params.overrides();
973 self.strategy_override = overrides.strategy.map(|s| s.tag());
974 // Keep `state.strategy_tag` consistent immediately so the borrowed
975 // one-shot eligibility gate (`borrowed_eligible`) and literal gates
976 // are correct even before the next `compress()` re-sync.
977 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
978 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
979 });
980 self.state.huf_optimal_search =
981 fast_huf_search_enabled(self.state.strategy_tag, self.source_size_hint);
982 self.state.matcher.set_param_overrides(Some(overrides));
983 }
984
985 /// Whether the borrowed (no per-block history copy) one-shot loop is
986 /// valid for an `input_len`-byte slice under the resolved `prep`.
987 ///
988 /// `Uncompressed` resolves to `StrategyTag::Fast` but must emit stored
989 /// Raw blocks, which the borrowed loop's
990 /// `compress_block_encoded_borrowed` (RLE/raw-fast/compressed) does NOT
991 /// do, so exclude it; it then takes the owned path's dedicated
992 /// Uncompressed arm.
993 ///
994 /// No window-size gate: over-window inputs are handled too. The owned
995 /// path bounds matches to the last `advertised_window` bytes via
996 /// `window_low` and evicts/rehashes its history; the borrowed path
997 /// computes the identical `window_low = block_end - advertised_window`
998 /// and the kernel rejects any hash candidate below it, while the
999 /// per-position `put` during the scan keeps in-window slots current,
1000 /// so it produces byte-identical output to the owned (evicting) path
1001 /// without ever copying the input into `history`, even when the input
1002 /// far exceeds the window.
1003 ///
1004 /// BUT gate on `input_len <= u32::MAX`: the Fast kernel stores ABSOLUTE
1005 /// positions in a `u32` hash table, and the borrowed scan walks
1006 /// absolute input offsets up to `block_end == input.len()`. Past 4 GiB
1007 /// those offsets truncate / overflow the `u32` position math
1008 /// (`base_off + ip0 as u32`, `window_low`), panicking or corrupting.
1009 /// The owned/evicting path keeps the scanned window bounded (positions
1010 /// stay small), so >4 GiB inputs fall back to it.
1011 fn borrowed_eligible(&self, input_len: usize, prep: &FramePrep) -> bool {
1012 if matches!(self.compression_level, CompressionLevel::Uncompressed)
1013 || input_len > u32::MAX as usize
1014 {
1015 return false;
1016 }
1017 if prep.use_dictionary_state {
1018 // The borrowed dict scan runs in VIRTUAL `[dict][input]` coordinates,
1019 // so the position space is `dict_content.len() + input_len`, not just
1020 // `input_len`. A large attached dictionary plus an otherwise-allowed
1021 // input can exceed the `u32` floor the kernel asserts — fall back to
1022 // the owned (copy) path in that case.
1023 let fits_u32 = self
1024 .dictionary
1025 .as_ref()
1026 .and_then(|dict| dict.inner.dict_content.len().checked_add(input_len))
1027 .is_some_and(|virtual_len| virtual_len <= u32::MAX as usize);
1028 if !fits_u32 {
1029 return false;
1030 }
1031 // Dictionary frames: only the Simple (Fast) backend in attach mode
1032 // has a borrowed (no input copy) dict scan. Copy-mode dict frames
1033 // and the other backends still take the owned path.
1034 return self.state.matcher.borrowed_dict_supported();
1035 }
1036 // The borrowed (no-copy, in-place over-window) scan exists for the
1037 // Simple (Fast), Dfast, and Row backends, and for the HashChain
1038 // backend's lazy CHAIN parser; BT/optimal (BinaryTree search) stay on
1039 // the owned path. Every borrowed scan applies the per-position
1040 // `window_low = abs_ip - advertised_window` offset cap so over-window
1041 // inputs are matched in place (no input->history copy), matching C's
1042 // continuous-index + windowLow one-shot behaviour.
1043 self.state.matcher.borrowed_supported()
1044 }
1045
1046 /// Compress `input` as one frame's worth of blocks into `out` (appended
1047 /// from its current end): the borrowed in-place loop when
1048 /// [`Self::borrowed_eligible`], else the owned (history-copying) loop fed
1049 /// an in-place `&[u8]` cursor. Returns `total_uncompressed`; the caller
1050 /// emits the frame header (before this call, when the content size is
1051 /// known) or the drain tail.
1052 fn run_one_frame(&mut self, input: &[u8], prep: &FramePrep, out: &mut Vec<u8>) -> u64 {
1053 if self.borrowed_eligible(input.len(), prep) {
1054 self.run_borrowed_block_loop(input, out)
1055 } else {
1056 let mut cursor: &[u8] = input;
1057 self.run_owned_block_loop(&mut cursor, prep.initial_size_hint, true, out)
1058 }
1059 }
1060
1061 /// Compress one contiguous `&[u8]` as a single independent Zstd frame,
1062 /// writing the frame bytes into `out` (its previous contents are
1063 /// replaced and its allocation reused), reusing this compressor's heavy
1064 /// state across calls.
1065 ///
1066 /// This is the reusable-compression-context (CCtx-equivalent) entry
1067 /// point, mirroring C `ZSTD_compress2` over a reused `ZSTD_CCtx`:
1068 /// construct ONE `FrameCompressor` and call this in a loop to emit N
1069 /// independent, self-describing frames (each carrying its own header,
1070 /// blocks, and checksum, decodable in isolation, with no cross-frame
1071 /// match history). Every call resets the per-frame state via
1072 /// [`Self::prepare_frame`]: only the allocations are kept, so the
1073 /// dominant per-frame setup cost (table allocation + dictionary prime)
1074 /// is paid once instead of N times. Passing the same `out` buffer each
1075 /// call additionally reuses the output allocation, matching C's
1076 /// caller-owned `dst` buffer (no per-frame output allocation).
1077 ///
1078 /// Reusing the context + `out` across many small frames (the typical
1079 /// per-block-frame workload) is far cheaper than a fresh
1080 /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec)
1081 /// per block, which allocates and primes from scratch each time.
1082 ///
1083 /// The input is read in place: no [`Self::set_source`] /
1084 /// [`Self::set_drain`] setup is required, and the input lifetime is not
1085 /// baked into the compressor type, so successive calls may pass slices
1086 /// with unrelated lifetimes. When the Fast (Simple) backend is active
1087 /// and no dictionary is set, the matcher references the input directly
1088 /// (no per-block history copy); other backends / dictionary use copy
1089 /// each block into history exactly as the streaming
1090 /// [`compress`](Self::compress) path does. The source-size hint is
1091 /// derived from the input length on every call, so per-frame table
1092 /// sizing tracks each frame's actual size regardless of any earlier
1093 /// hint.
1094 ///
1095 /// A sticky dictionary set via
1096 /// [`set_dictionary`](Self::set_dictionary) (or its variants) is primed
1097 /// into every frame, mirroring `ZSTD_CCtx_loadDictionary` /
1098 /// `ZSTD_CCtx_refCDict`.
1099 ///
1100 /// # Panics
1101 ///
1102 /// Panics on encoder error, matching [`Self::compress`] and
1103 /// [`compress_slice_to_vec`](crate::encoding::compress_slice_to_vec).
1104 pub fn compress_independent_frame_into(&mut self, input: &[u8], out: &mut Vec<u8>) {
1105 // Size the next frame from the actual payload, not a stale hint a
1106 // previous call may have left behind (a wrong hint would change the
1107 // resolved window/header and could flip borrowed eligibility).
1108 self.source_size_hint = Some(input.len() as u64);
1109 let prep = self.prepare_frame();
1110 // Content size is known up front (one-shot), so write the frame
1111 // header FIRST and emit blocks STRAIGHT into `out` — no separate
1112 // `all_blocks` accumulator and no header+blocks copy (which was the
1113 // dominant per-frame memmove + the only un-amortized per-frame alloc
1114 // even when the compressor is reused).
1115 let total_uncompressed = input.len() as u64;
1116 let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
1117 let checksum_len = if emit_checksum { 4 } else { 0 };
1118 out.clear();
1119 // Reserve the header plus ONE block's worst case up front; the block
1120 // loops then grow `out` from the compression ratio observed so far
1121 // (`reserve_for_next_block`). Reserving `compress_bound(input_len)`
1122 // here held a whole-input-sized allocation for the entire frame —
1123 // ~100 MiB peak on a 100 MiB stream whose compressed output is a few
1124 // MiB, where the reference implementation's context peaks at
1125 // window-sized state. Small frames (<= one block) still get their
1126 // full bound in one shot, so the reused-`out` steady state is
1127 // unchanged. 18 = max frame header (magic 4 + descriptor 1 + window
1128 // 1 + dict id 4 + FCS 8).
1129 let first_block_bound = input.len().min(self.block_capacity()) + 3;
1130 out.reserve(18 + first_block_bound + checksum_len);
1131 self.append_frame_header(total_uncompressed, &prep, out);
1132 let header_len = out.len();
1133 let _ = self.run_one_frame(input, &prep, out);
1134 #[cfg(feature = "hash")]
1135 if self.content_checksum {
1136 out.extend_from_slice(&(self.hasher.finish() as u32).to_le_bytes());
1137 }
1138 #[cfg(feature = "lsm")]
1139 {
1140 let blocks_end = out.len() - checksum_len;
1141 self.populate_frame_emit_info(header_len, &out[header_len..blocks_end], emit_checksum);
1142 }
1143 #[cfg(not(feature = "lsm"))]
1144 let _ = header_len;
1145 }
1146
1147 /// Convenience wrapper over [`Self::compress_independent_frame_into`]
1148 /// that allocates and returns a fresh `Vec` per call. Prefer the
1149 /// `_into` form in tight per-block-frame loops to reuse one output
1150 /// buffer across frames (the CCtx-equivalent zero-per-call-alloc
1151 /// output, matching C's caller-owned `dst`).
1152 ///
1153 /// ```rust
1154 /// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
1155 /// let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
1156 /// let frame_a = cctx.compress_independent_frame(b"first block payload");
1157 /// let frame_b = cctx.compress_independent_frame(b"second block payload");
1158 /// assert!(!frame_a.is_empty() && !frame_b.is_empty());
1159 /// ```
1160 pub fn compress_independent_frame(&mut self, input: &[u8]) -> Vec<u8> {
1161 let mut out = Vec::new();
1162 self.compress_independent_frame_into(input, &mut out);
1163 out
1164 }
1165
1166 /// Borrowed one-shot block loop: walks `input` in `MAX_BLOCK_SIZE`
1167 /// strides (the Fast backend never pre-splits, so boundaries match the
1168 /// owned loop), scanning each block range in place against the
1169 /// borrowed window via `compress_block_encoded_borrowed` — no
1170 /// per-block `commit_space` copy. Returns `(all_blocks,
1171 /// total_uncompressed)`. Caller guarantees Fast backend + no
1172 /// dictionary; over-window inputs are fine (matches are bounded by
1173 /// `window_low` exactly as the owned evicting path).
1174 fn run_borrowed_block_loop(&mut self, input: &[u8], out: &mut Vec<u8>) -> u64 {
1175 // Blocks are appended to `out` starting here. `out` may already hold
1176 // the frame header (the one-shot compress-into-Vec path writes it
1177 // first, since the content size is known up front, and the loop
1178 // emits blocks straight after it — no separate `all_blocks` Vec and
1179 // no header+blocks copy). Output-size reads below are taken RELATIVE
1180 // to `blocks_start` so a header prefix never skews the upstream zstd split
1181 // `savings` gate (which would change block boundaries / wire output).
1182 let blocks_start = out.len();
1183 let total_uncompressed = input.len() as u64;
1184 // Empty input: emit a single empty last Raw block (mirrors the
1185 // owned loop's empty-file special case).
1186 if input.is_empty() {
1187 let header = BlockHeader {
1188 last_block: true,
1189 block_type: crate::blocks::block::BlockType::Raw,
1190 block_size: 0,
1191 };
1192 header.serialize(out);
1193 #[cfg(feature = "lsm")]
1194 self.block_decompressed_sizes.push(0);
1195 #[cfg(all(feature = "lsm", feature = "hash"))]
1196 if let Some(checksums) = self.block_checksums.as_mut() {
1197 checksums.push(xxh64_block_low32(&[]));
1198 }
1199 return total_uncompressed;
1200 }
1201 // SAFETY: `input` outlives this call (held by the caller across
1202 // the call) and is not mutated. Only the Simple backend is active
1203 // (gated by `compress_oneshot_borrowed`).
1204 unsafe {
1205 self.state.matcher.set_borrowed_window(input);
1206 }
1207 // Panic-safety: clear the borrowed `(ptr, len)` on EVERY exit,
1208 // including an unwind from an `assert!` inside the block loop, so
1209 // a caught-and-reused compressor never retains a dangling window.
1210 // (The next frame's `reset()` also clears it before any read, but
1211 // this guard makes the invariant local and unwind-proof.)
1212 struct ClearBorrowedOnDrop(*mut MatchGeneratorDriver);
1213 impl Drop for ClearBorrowedOnDrop {
1214 fn drop(&mut self) {
1215 // SAFETY: at drop (normal return or unwind) the loop's
1216 // borrows of the matcher have ended, so this is the only
1217 // access. `addr_of_mut!` produced this pointer without an
1218 // intermediate `&mut`, so the interleaved `&mut` uses in
1219 // the loop did not invalidate it.
1220 unsafe { (*self.0).clear_borrowed_window() };
1221 }
1222 }
1223 let _clear_guard = ClearBorrowedOnDrop(core::ptr::addr_of_mut!(self.state.matcher));
1224 let block_capacity = self.block_capacity();
1225 let mut start = 0usize;
1226 while start < input.len() {
1227 reserve_for_next_block(
1228 out,
1229 blocks_start,
1230 start as u64,
1231 input.len() - start,
1232 block_capacity,
1233 );
1234 // Upstream zstd `ZSTD_compress_frameChunk`: size each block via the cheap
1235 // fingerprint pre-splitter so a full 128 KiB block is cut at a
1236 // statistical boundary when it pays. `savings = consumed -
1237 // produced` mirrors the upstream zstd gate (the first block and
1238 // incompressible input keep the full 128 KiB). The borrowed window
1239 // already spans the whole input, so a smaller block is just a
1240 // narrower `(block_start, block_end)` range into it.
1241 let savings = start as i64 - (out.len() - blocks_start) as i64;
1242 // Borrowed path only: warm the pre-split window before the
1243 // cache-cold strided fingerprint read. Gated to exactly the
1244 // conditions under which `optimal_block_size` reads `block`
1245 // (a pre-split level, a full 128 KiB block remaining, the
1246 // block-size cap admits a full block, and `savings >= 3` so the
1247 // splitter actually runs) — so non-pre-split levels, the first
1248 // block, and the trailing partial block pay nothing. See
1249 // `warm_presplit_window`.
1250 if savings >= 3
1251 && input.len() - start >= MAX_BLOCK_SIZE as usize
1252 && block_capacity >= MAX_BLOCK_SIZE as usize
1253 && crate::encoding::match_generator::level_pre_split(self.compression_level)
1254 .is_some()
1255 {
1256 warm_presplit_window(&input[start..start + MAX_BLOCK_SIZE as usize]);
1257 }
1258 let block_len = optimal_block_size(
1259 self.compression_level,
1260 &input[start..],
1261 input.len() - start,
1262 block_capacity,
1263 savings,
1264 );
1265 let end = (start + block_len).min(input.len());
1266 let block = &input[start..end];
1267 let last_block = end == input.len();
1268 #[cfg(feature = "hash")]
1269 if self.content_checksum {
1270 self.hasher.write(block);
1271 }
1272 let dict_active =
1273 self.dictionary.is_some() && self.state.matcher.supports_dictionary_priming();
1274 crate::encoding::levels::compress_block_encoded_borrowed(
1275 &mut self.state,
1276 self.compression_level,
1277 last_block,
1278 block,
1279 start,
1280 end,
1281 dict_active,
1282 out,
1283 #[cfg(feature = "lsm")]
1284 Some(&mut self.block_decompressed_sizes),
1285 #[cfg(all(feature = "lsm", feature = "hash"))]
1286 self.block_checksums.as_mut(),
1287 );
1288 start = end;
1289 }
1290 // `_clear_guard` drops here, clearing the borrowed window.
1291 total_uncompressed
1292 }
1293}
1294
1295impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
1296 /// Create a new `FrameCompressor` with a custom matching algorithm implementation
1297 pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
1298 Self {
1299 uncompressed_data: None,
1300 compressed_data: None,
1301 dictionary: None,
1302 dictionary_entropy_cache: None,
1303 source_size_hint: None,
1304 state: CompressState {
1305 matcher,
1306 last_huff_table: None,
1307 huff_table_spare: None,
1308 fse_tables: FseTables::new(),
1309 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
1310 offset_hist: [1, 4, 8],
1311 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
1312 compression_level,
1313 ),
1314 huf_optimal_search: true,
1315 },
1316 compression_level,
1317 magicless: false,
1318 content_checksum: false,
1319 content_size_flag: true,
1320 dict_id_flag: true,
1321 target_block_size: None,
1322 #[cfg(feature = "hash")]
1323 hasher: XxHash64::with_seed(0),
1324 #[cfg(feature = "lsm")]
1325 frame_emit_info: None,
1326 #[cfg(all(feature = "lsm", feature = "hash"))]
1327 per_block_checksums_enabled: false,
1328 #[cfg(all(feature = "lsm", feature = "hash"))]
1329 block_checksums: None,
1330 #[cfg(feature = "lsm")]
1331 block_decompressed_sizes: alloc::vec::Vec::new(),
1332 strategy_override: None,
1333 }
1334 }
1335
1336 /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
1337 ///
1338 /// When set to `true`, emitted frames omit the 4-byte magic number
1339 /// prefix. The matching decoder must be configured to expect a
1340 /// magicless stream — wire-format only round-trips with a
1341 /// magicless-aware decoder.
1342 pub fn set_magicless(&mut self, magicless: bool) {
1343 self.magicless = magicless;
1344 }
1345
1346 /// Enable or disable the trailing XXH64 content checksum
1347 /// (semantics of upstream `ZSTD_c_checksumFlag`). Default `false`,
1348 /// matching the upstream library default (`ZSTD_c_checksumFlag = 0`)
1349 /// so out-of-the-box frames carry the same layout and pay the same
1350 /// costs as the reference implementation.
1351 ///
1352 /// When `false`, emitted frames set `Content_Checksum_flag = 0` and carry
1353 /// no trailing digest; such frames are valid (RFC 8878) and decode
1354 /// correctly in any [`ContentChecksum`](crate::decoding::ContentChecksum)
1355 /// mode. Without the `hash` feature no checksum is emitted regardless of
1356 /// this setting.
1357 pub fn set_content_checksum(&mut self, emit: bool) {
1358 self.content_checksum = emit;
1359 }
1360
1361 /// Enable or disable recording `Frame_Content_Size` in the frame header
1362 /// when the total size is known (semantics of upstream
1363 /// `ZSTD_c_contentSizeFlag`). Default `true`, matching upstream. With
1364 /// the flag off the header carries a window descriptor instead (and the
1365 /// single-segment layout, which requires an FCS, is disabled).
1366 pub fn set_content_size_flag(&mut self, emit: bool) {
1367 self.content_size_flag = emit;
1368 }
1369
1370 /// Enable or disable recording the dictionary ID in the frame header
1371 /// when a dictionary is attached (semantics of upstream
1372 /// `ZSTD_c_dictIDFlag`). Default `true`, matching upstream. Frames
1373 /// emitted with the flag off still decode when the decoder is handed
1374 /// the dictionary explicitly.
1375 pub fn set_dictionary_id_flag(&mut self, emit: bool) {
1376 self.dict_id_flag = emit;
1377 }
1378
1379 /// Set an upper bound on emitted block sizes (semantics of upstream
1380 /// `ZSTD_c_targetCBlockSize`): every physical block's payload is capped
1381 /// at `target` bytes (+3-byte block header on the wire), trading some
1382 /// ratio for bounded per-block latency. The value is clamped to
1383 /// `[MIN_TARGET_BLOCK_SIZE, MAX_BLOCK_SIZE]` (the upstream bounds).
1384 /// `None` removes the target.
1385 pub fn set_target_block_size(&mut self, target: Option<u32>) {
1386 self.target_block_size = target.map(|t| {
1387 t.clamp(
1388 crate::common::MIN_TARGET_BLOCK_SIZE,
1389 crate::common::MAX_BLOCK_SIZE,
1390 )
1391 });
1392 }
1393
1394 /// The active block-size cap: the configured target, or the format's
1395 /// 128 KiB block ceiling.
1396 fn block_capacity(&self) -> usize {
1397 self.target_block_size
1398 .map_or(crate::common::MAX_BLOCK_SIZE as usize, |t| t as usize)
1399 }
1400
1401 /// Before calling [FrameCompressor::compress] you need to set the source.
1402 ///
1403 /// This is the data that is compressed and written into the drain.
1404 pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
1405 self.uncompressed_data.replace(uncompressed_data)
1406 }
1407
1408 /// Before calling [FrameCompressor::compress] you need to set the drain.
1409 ///
1410 /// As the compressor compresses data, the drain serves as a place for the output to be writte.
1411 pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
1412 self.compressed_data.replace(compressed_data)
1413 }
1414
1415 /// Provide a hint about the total uncompressed size for the next frame.
1416 ///
1417 /// When set, the encoder selects smaller hash tables and windows for
1418 /// small inputs, matching the C zstd source-size-class behavior.
1419 ///
1420 /// This hint applies only to frame payload bytes (`size`). Dictionary
1421 /// history is primed separately and does not inflate the hinted size or
1422 /// advertised frame window.
1423 /// Must be called before [`compress`](Self::compress).
1424 pub fn set_source_size_hint(&mut self, size: u64) {
1425 self.source_size_hint = Some(size);
1426 }
1427
1428 /// Total heap bytes this compressor's allocations hold, excluding the
1429 /// inline struct: the match-finder tables / history / recycled buffers and
1430 /// the primed-dictionary snapshot (via the matcher), the retained
1431 /// Huffman tables (active + recycled spare), the retained dictionary
1432 /// content, the cached dictionary entropy tables (literals Huffman +
1433 /// LL/ML/OF FSE), and the per-block sidecar buffers. Lets a context
1434 /// report its true footprint through `ZSTD_sizeof_CCtx`.
1435 pub fn heap_size(&self) -> usize {
1436 let mut total = self.state.matcher.heap_size();
1437 total += self
1438 .state
1439 .last_huff_table
1440 .as_ref()
1441 .map_or(0, |table| table.heap_size());
1442 total += self
1443 .state
1444 .huff_table_spare
1445 .as_ref()
1446 .map_or(0, |table| table.heap_size());
1447 total += self
1448 .dictionary
1449 .as_ref()
1450 .map_or(0, |d| d.inner.dict_content.capacity());
1451 total += self
1452 .dictionary_entropy_cache
1453 .as_ref()
1454 .map_or(0, CachedDictionaryEntropy::heap_size);
1455 #[cfg(all(feature = "lsm", feature = "hash"))]
1456 {
1457 total += self
1458 .block_checksums
1459 .as_ref()
1460 .map_or(0, |v| v.capacity() * core::mem::size_of::<u32>());
1461 }
1462 #[cfg(feature = "lsm")]
1463 {
1464 total += self.block_decompressed_sizes.capacity() * core::mem::size_of::<u32>();
1465 }
1466 total
1467 }
1468
1469 /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
1470 ///
1471 /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
1472 /// All compressed blocks are buffered in memory so that the frame header can include the
1473 /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
1474 /// entire frame — header, blocks, and optional checksum — is then written to the drain
1475 /// at the end. This means peak memory usage is O(compressed_size).
1476 ///
1477 /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
1478 /// [Read::take] function
1479 /// Per-frame setup values resolved by [`Self::prepare_frame`] and
1480 /// consumed by the block loop + [`Self::finish_frame`]. Lets the
1481 /// owned `compress()` and the borrowed one-shot path share the exact
1482 /// same reset / dict-prime / entropy-seed setup and frame tail.
1483 pub fn compress(&mut self) {
1484 let prep = self.prepare_frame();
1485 // Take the reader out so `run_owned_block_loop` can borrow it
1486 // mutably alongside `&mut self` (the rest of the loop touches
1487 // `self.state` / `self.hasher`, disjoint from the reader). Restored
1488 // before the frame tail so a reused compressor keeps its source.
1489 //
1490 // Deliberately NOT restored on unwind: if the block loop panics the
1491 // source has been partially consumed, so handing it back would let a
1492 // `catch_unwind` caller "successfully" compress the remaining tail
1493 // from an arbitrary midpoint — silent data corruption. Leaving the
1494 // slot empty makes any post-panic reuse fail loudly at the `expect`
1495 // below (matcher/entropy state is equally unre-usable after an
1496 // unwind; the reference implementation likewise requires a context
1497 // reset after an error).
1498 let mut source = self
1499 .uncompressed_data
1500 .take()
1501 .expect("source must be set via set_source before compress()");
1502 // Streaming drain: the content size is only known at EOF, so the
1503 // frame header can't precede the blocks — accumulate them in a local
1504 // buffer and let `finish_frame` write header + blocks to the drain.
1505 let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap(
1506 prep.initial_size_hint,
1507 self.block_capacity(),
1508 ));
1509 let mut block_source = ReaderBlockSource(&mut source);
1510 let total_uncompressed = self.run_owned_block_loop(
1511 &mut block_source,
1512 prep.initial_size_hint,
1513 false,
1514 &mut all_blocks,
1515 );
1516 self.uncompressed_data = Some(source);
1517 self.finish_frame(all_blocks, total_uncompressed, &prep);
1518 }
1519
1520 fn prepare_frame(&mut self) -> FramePrep {
1521 // Reset per-frame introspection state so a re-used compressor
1522 // doesn't carry over the previous frame's layout/checksums.
1523 #[cfg(feature = "lsm")]
1524 {
1525 self.frame_emit_info = None;
1526 // Always captured under lsm (drives `decompressed_byte_range`);
1527 // clear, keep the allocation for a reused compressor.
1528 self.block_decompressed_sizes.clear();
1529 }
1530 #[cfg(all(feature = "lsm", feature = "hash"))]
1531 {
1532 if self.per_block_checksums_enabled {
1533 self.block_checksums = Some(alloc::vec::Vec::new());
1534 } else {
1535 self.block_checksums = None;
1536 }
1537 }
1538 let initial_size_hint = self.source_size_hint;
1539 let source_size_hint_known = initial_size_hint.is_some();
1540 let use_dictionary_state =
1541 !matches!(self.compression_level, CompressionLevel::Uncompressed)
1542 && self.state.matcher.supports_dictionary_priming()
1543 && self.dictionary.is_some();
1544 if let Some(size_hint) = self.source_size_hint.take() {
1545 // Keep source-size hint scoped to payload bytes; dictionary priming
1546 // is applied separately and should not force larger matcher sizing.
1547 self.state.matcher.set_source_size_hint(size_hint);
1548 }
1549 // Hand the matcher the dictionary's content size so its binary-tree /
1550 // hash-chain tables shrink to the dictionary's cParams tier (upstream zstd CDict
1551 // economics: the dictionary supplies long matches, so a source-sized live
1552 // table is wasted peak memory). The eviction window stays source-sized so
1553 // the dictionary bytes remain referenceable. Set before `reset` (which
1554 // consumes it) and only when a dictionary will actually be primed.
1555 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1556 self.state
1557 .matcher
1558 .set_dictionary_size_hint(dict.inner.dict_content.len());
1559 }
1560 // Clearing buffers to allow re-using of the compressor
1561 self.state.matcher.reset(self.compression_level);
1562 self.state.offset_hist = [1, 4, 8];
1563 // Sync `state.strategy_tag` to the level resolved at this reset so
1564 // the literal-compression gates (`min_literals_to_compress` /
1565 // `min_gain` in `encoding::blocks::compressed`) see the correct
1566 // strategy for the next frame. Frame-by-frame level changes go
1567 // through this same `compress()` entry point, so re-syncing here
1568 // covers level switches without touching the matcher dispatch.
1569 // A public-parameter strategy override (#27) wins over the level's
1570 // derived tag so the literal-compression gates and dict-attach
1571 // cutoff below see the strategy the matcher actually runs.
1572 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
1573 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
1574 });
1575 // `initial_size_hint` (captured before the `.take()` above) — by here
1576 // `self.source_size_hint` is None.
1577 self.state.huf_optimal_search =
1578 fast_huf_search_enabled(self.state.strategy_tag, initial_size_hint);
1579 let cached_entropy = if use_dictionary_state {
1580 self.dictionary_entropy_cache.as_ref()
1581 } else {
1582 None
1583 };
1584 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
1585 // This state drives sequence encoding, while matcher priming below updates
1586 // the match generator's internal repeat-offset history for match finding.
1587 self.state.offset_hist = dict.inner.offset_hist;
1588 // Upstream zstd `ZSTD_shouldAttachDict` (`zstd_compress.c`): a
1589 // precomputed-dictionary table is COPIED into the working context
1590 // only when the source is larger than a per-strategy cutoff; at or
1591 // below it (and for unknown size) the upstream zstd ATTACHES the dictionary
1592 // tables by reference (no per-frame table touch at all). We don't
1593 // have an attach-by-reference path yet, so:
1594 // - large source (> cutoff): reuse the captured prime snapshot
1595 // (a table copy) instead of re-hashing the dictionary — the
1596 // upstream zstd COPY regime, where the copy is cheaper than re-priming;
1597 // - small / unknown source: re-prime (the snapshot copy of the
1598 // whole table would cost MORE than the sparse re-prime here,
1599 // which is exactly why the upstream zstd attaches by reference instead).
1600 // `attachDictSizeCutoffs` per strategy: fast 8K, dfast 16K,
1601 // greedy/lazy/btopt 32K, btultra/btultra2 8K. Expressed as the
1602 // ceil-log bucket (8K = 2^13, 16K = 2^14, 32K = 2^15) so the
1603 // decision uses the SAME bucketed representation as the driver's
1604 // attach/copy gate (`reset_size_log`) — comparing
1605 // `source_size_ceil_log(hint)` on the full u64 avoids the `as usize`
1606 // truncation that could diverge from the driver on 32-bit targets.
1607 // For a power-of-two cutoff `2^k`, `ceil_log2(hint) > k` is exactly
1608 // `hint > 2^k`, so this is identical to the raw `hint > cutoff` on
1609 // 64-bit.
1610 let cutoff_log = match self.state.strategy_tag {
1611 // Fast always attaches now (the copy-mode owned path memmoved the
1612 // whole input into history every frame); keep the copy-snapshot
1613 // gate in sync with the matcher's attach cutoff so Fast never
1614 // captures/restores a copy snapshot it can no longer use.
1615 crate::encoding::strategy::StrategyTag::Fast => {
1616 crate::encoding::match_generator::FAST_ATTACH_DICT_CUTOFF_LOG
1617 }
1618 crate::encoding::strategy::StrategyTag::BtUltra
1619 | crate::encoding::strategy::StrategyTag::BtUltra2 => 13,
1620 crate::encoding::strategy::StrategyTag::Dfast => 14,
1621 crate::encoding::strategy::StrategyTag::Greedy
1622 | crate::encoding::strategy::StrategyTag::Lazy
1623 | crate::encoding::strategy::StrategyTag::Btlazy2
1624 | crate::encoding::strategy::StrategyTag::BtOpt => 15,
1625 };
1626 if self.state.matcher.dictionary_is_resident() {
1627 // Re-borrow fast path: the previous frame's reset kept this
1628 // dict's bytes + cached index resident, so skip the re-commit /
1629 // re-index and only reapply the offset history.
1630 self.state
1631 .matcher
1632 .reapply_resident_dictionary(dict.inner.offset_hist);
1633 } else {
1634 let prefer_copy_snapshot = initial_size_hint.is_some_and(|s| {
1635 crate::encoding::match_generator::source_size_ceil_log(s) > cutoff_log
1636 });
1637 let restored = prefer_copy_snapshot
1638 && self
1639 .state
1640 .matcher
1641 .restore_primed_dictionary(self.compression_level);
1642 if !restored {
1643 self.state.matcher.prime_with_dictionary(
1644 dict.inner.dict_content.as_slice(),
1645 dict.inner.offset_hist,
1646 );
1647 if prefer_copy_snapshot {
1648 self.state
1649 .matcher
1650 .capture_primed_dictionary(self.compression_level);
1651 }
1652 }
1653 }
1654 }
1655 if let Some(cache) = cached_entropy {
1656 // Refill an empty slot from the recycled spare before
1657 // `clone_from`: `Option::clone_from(None ← Some)` falls back to
1658 // a fresh clone (two Vec allocations), while `Some ← Some`
1659 // delegates to the table's buffer-reusing `clone_from`. Frames
1660 // whose last block cleared the table would otherwise re-clone
1661 // the dict seed every frame.
1662 match &cache.huff {
1663 Some(src) => {
1664 if self.state.last_huff_table.is_none() {
1665 self.state.last_huff_table = self.state.huff_table_spare.take();
1666 }
1667 match &mut self.state.last_huff_table {
1668 Some(dst) => dst.clone_from(src),
1669 slot => *slot = Some(src.clone()),
1670 }
1671 }
1672 None => self.state.clear_huff_table(),
1673 }
1674 } else {
1675 self.state.clear_huff_table();
1676 }
1677 // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
1678 // reusing existing allocations where possible instead of reallocating every frame.
1679 if let Some(cache) = cached_entropy {
1680 self.state
1681 .fse_tables
1682 .ll_previous
1683 .clone_from(&cache.ll_previous);
1684 self.state
1685 .fse_tables
1686 .ml_previous
1687 .clone_from(&cache.ml_previous);
1688 self.state
1689 .fse_tables
1690 .of_previous
1691 .clone_from(&cache.of_previous);
1692 } else {
1693 self.state.fse_tables.ll_previous = None;
1694 self.state.fse_tables.ml_previous = None;
1695 self.state.fse_tables.of_previous = None;
1696 }
1697 let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
1698 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1699 _ => None,
1700 });
1701 let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
1702 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1703 _ => None,
1704 });
1705 let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
1706 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
1707 _ => None,
1708 });
1709 self.state.matcher.seed_dictionary_entropy(
1710 self.state.last_huff_table.as_ref(),
1711 ll_entropy,
1712 ml_entropy,
1713 of_entropy,
1714 );
1715 #[cfg(feature = "hash")]
1716 {
1717 self.hasher = XxHash64::with_seed(0);
1718 }
1719 let window_size = self.state.matcher.window_size();
1720 assert!(
1721 window_size != 0,
1722 "matcher reported window_size == 0, which is invalid"
1723 );
1724 FramePrep {
1725 window_size,
1726 use_dictionary_state,
1727 source_size_hint_known,
1728 initial_size_hint,
1729 }
1730 }
1731
1732 /// Owned streaming block loop: reads blocks from the caller-provided
1733 /// `source` reader, optionally pre-splits, hashes for the content
1734 /// checksum, and emits each block via `compress_block_encoded`,
1735 /// accumulating the block bytes. Returns `(all_blocks,
1736 /// total_uncompressed)`. The source is passed in (rather than read
1737 /// from `self.uncompressed_data`) so the streaming `compress` path can
1738 /// feed the configured reader while the slice paths
1739 /// (`compress_oneshot_borrowed`, `compress_independent_frame`) feed an
1740 /// in-place `&[u8]` cursor without baking its lifetime into the
1741 /// compressor type.
1742 fn run_owned_block_loop<S: OwnedBlockSource>(
1743 &mut self,
1744 source: &mut S,
1745 initial_size_hint: Option<u64>,
1746 // Whether `initial_size_hint` is the input's exact length (the
1747 // one-shot slice paths) or a caller-provided estimate (the streaming
1748 // `Read` path, where `set_source_size_hint` is advisory). An exact
1749 // hint drives the one-shot ratio reservation; an estimate is only
1750 // trusted up to a small lookahead past the bytes actually read.
1751 hint_is_exact: bool,
1752 out: &mut Vec<u8>,
1753 ) -> u64 {
1754 // Compressed blocks are appended to `out` from its current end. The
1755 // streaming drain path passes a fresh buffer (the frame header is
1756 // written to the drain afterward, since Frame_Content_Size is only
1757 // known once the reader hits EOF); the one-shot compress-into-Vec
1758 // path passes `out` already holding the header. The upstream zstd split
1759 // `savings` gate below accumulates block-relative (`before_len`)
1760 // output deltas, so a header prefix never skews it.
1761 let blocks_start = out.len();
1762 let mut total_uncompressed: u64 = 0;
1763 let mut pending_input: Vec<u8> = Vec::new();
1764 let mut reached_eof = false;
1765 let mut savings = 0i64;
1766 // Compress block by block
1767 loop {
1768 // Read up to one upstream zstd block. When the pre-block splitter keeps a
1769 // suffix, top it back up before compressing the next block, matching
1770 // ZSTD_compress_frameChunk() over a contiguous input buffer.
1771 let block_capacity = self.block_capacity();
1772 // Always draw the block buffer from the matcher's recycled pool
1773 // (its capacity already covers the block size, so the resize below
1774 // stays in-place). Any carried pre-split suffix is copied in, and
1775 // `pending_input` is retained as a reusable carry buffer. The prior
1776 // approach `split_off`'d a fresh suffix Vec per pre-split and
1777 // `reserve_exact`-grew it to `block_capacity` every block; on a
1778 // heavily pre-split frame that churned one block-sized allocation
1779 // per split (~12 MB over ~90 splits on a 1 MiB corpus input).
1780 let mut uncompressed_data = self.state.matcher.get_next_space();
1781 uncompressed_data.clear();
1782 uncompressed_data.extend_from_slice(&pending_input);
1783 pending_input.clear();
1784 if !reached_eof {
1785 // Remaining-bytes expectation for the reader source's sizing
1786 // (`None` = unknown, or an inexact hint already met by prior
1787 // blocks). The slice source appends directly and ignores it.
1788 let size_hint_remaining = match initial_size_hint {
1789 Some(hint) if hint > total_uncompressed => Some(hint - total_uncompressed),
1790 _ => None,
1791 };
1792 let (appended, eof) =
1793 source.fill_block(&mut uncompressed_data, block_capacity, size_hint_remaining);
1794 total_uncompressed += appended as u64;
1795 reached_eof = eof;
1796 }
1797 let mut last_block = reached_eof;
1798 let remaining_for_split = if reached_eof {
1799 uncompressed_data.len()
1800 } else {
1801 block_capacity
1802 };
1803 if !matches!(self.compression_level, CompressionLevel::Uncompressed)
1804 && uncompressed_data.len() == block_capacity
1805 {
1806 let block_len = optimal_block_size(
1807 self.compression_level,
1808 &uncompressed_data,
1809 remaining_for_split,
1810 block_capacity,
1811 savings,
1812 );
1813 if block_len < uncompressed_data.len() {
1814 // Carry the kept suffix into the reusable `pending_input`
1815 // buffer (cleared, capacity retained) instead of allocating
1816 // a fresh Vec via `split_off`. Next iteration copies it back
1817 // into a pooled block buffer. The block currently being
1818 // compressed is truncated to the chosen split length.
1819 pending_input.clear();
1820 pending_input.extend_from_slice(&uncompressed_data[block_len..]);
1821 uncompressed_data.truncate(block_len);
1822 last_block = false;
1823 }
1824 }
1825 // As we read, hash that data too (skipped when the content
1826 // checksum is disabled).
1827 #[cfg(feature = "hash")]
1828 if self.content_checksum {
1829 self.hasher.write(&uncompressed_data);
1830 }
1831 // Per-physical-block XXH64 (low 32 bits) for the optional
1832 // per-block checksum sidecar. Hashing happens INSIDE the
1833 // block emitters (RLE / Raw fast-path / Compressed /
1834 // post-split partitions), so the digests vector has
1835 // exactly one entry per physical Block_Header written to
1836 // `all_blocks` — 1:1 with `FrameEmitInfo.blocks`. See
1837 // `enable_per_block_checksums` rustdoc.
1838 // Size the output ahead of this block's emission from the ratio
1839 // observed so far (see `reserve_for_next_block`); with no usable
1840 // size hint, ensure one block's worst case and let the doubling
1841 // growth policy amortize across blocks.
1842 let emitted =
1843 total_uncompressed - uncompressed_data.len() as u64 - pending_input.len() as u64;
1844 match initial_size_hint {
1845 Some(hint) if hint >= total_uncompressed => {
1846 // An advisory hint (streaming path) is only trusted up to
1847 // a small lookahead past the bytes actually read: a hint
1848 // far above the real input would otherwise reserve the
1849 // whole phantom remainder up front.
1850 let hint_remaining = hint - emitted;
1851 let remaining = if hint_is_exact {
1852 hint_remaining
1853 } else {
1854 let buffered = total_uncompressed - emitted;
1855 const HINT_LOOKAHEAD: u64 = 64 * 1024;
1856 hint_remaining.min(buffered + HINT_LOOKAHEAD)
1857 };
1858 reserve_for_next_block(
1859 out,
1860 blocks_start,
1861 emitted,
1862 remaining as usize,
1863 self.block_capacity(),
1864 );
1865 }
1866 _ => {
1867 out.reserve(uncompressed_data.len() + 3 + 16);
1868 }
1869 }
1870 // Special handling is needed for compression of a totally empty file
1871 if uncompressed_data.is_empty() {
1872 let header = BlockHeader {
1873 last_block: true,
1874 block_type: crate::blocks::block::BlockType::Raw,
1875 block_size: 0,
1876 };
1877 header.serialize(out);
1878 #[cfg(feature = "lsm")]
1879 self.block_decompressed_sizes.push(0);
1880 #[cfg(all(feature = "lsm", feature = "hash"))]
1881 if let Some(checksums) = self.block_checksums.as_mut() {
1882 checksums.push(xxh64_block_low32(&[]));
1883 }
1884 break;
1885 }
1886
1887 match self.compression_level {
1888 CompressionLevel::Uncompressed => {
1889 let header = BlockHeader {
1890 last_block,
1891 block_type: crate::blocks::block::BlockType::Raw,
1892 block_size: uncompressed_data.len().try_into().unwrap(),
1893 };
1894 header.serialize(out);
1895 #[cfg(feature = "lsm")]
1896 self.block_decompressed_sizes
1897 .push(uncompressed_data.len() as u32);
1898 #[cfg(all(feature = "lsm", feature = "hash"))]
1899 if let Some(checksums) = self.block_checksums.as_mut() {
1900 checksums.push(xxh64_block_low32(&uncompressed_data));
1901 }
1902 out.extend_from_slice(&uncompressed_data);
1903 savings +=
1904 uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
1905 }
1906 CompressionLevel::Fastest
1907 | CompressionLevel::Default
1908 | CompressionLevel::Better
1909 | CompressionLevel::Best
1910 | CompressionLevel::Level(_) => {
1911 let before_len = out.len();
1912 let block_len = uncompressed_data.len();
1913 // A primed dictionary makes "incompressible-looking"
1914 // blocks matchable against the dict, so the raw-fast-
1915 // path inside must be bypassed (it skips matching).
1916 // Mirror prepare_frame's `use_dictionary_state`: a dict
1917 // is only PRIMED (and thus matchable) when the matcher
1918 // supports priming — a non-priming matcher ignores an
1919 // attached dictionary, so the raw-fast-path must stay
1920 // enabled for it. (This arm is already non-Uncompressed.)
1921 let dict_active = self.dictionary.is_some()
1922 && self.state.matcher.supports_dictionary_priming();
1923 compress_block_encoded(
1924 &mut self.state,
1925 self.compression_level,
1926 last_block,
1927 uncompressed_data,
1928 out,
1929 dict_active,
1930 #[cfg(feature = "lsm")]
1931 Some(&mut self.block_decompressed_sizes),
1932 #[cfg(all(feature = "lsm", feature = "hash"))]
1933 self.block_checksums.as_mut(),
1934 );
1935 savings += block_len as i64 - (out.len() - before_len) as i64;
1936 }
1937 }
1938 if last_block && pending_input.is_empty() {
1939 break;
1940 }
1941 }
1942 total_uncompressed
1943 }
1944
1945 /// Append the frame header bytes onto `out` once the total payload size
1946 /// is known (so `Frame_Content_Size` / `single_segment` can be set).
1947 /// Appends rather than returns so the one-shot path serializes straight
1948 /// into the reused output buffer with no per-frame header `Vec`.
1949 fn append_frame_header(&self, total_uncompressed: u64, prep: &FramePrep, out: &mut Vec<u8>) {
1950 // Match the upstream zstd framing policy (`ZSTD_writeFrameHeader`):
1951 // single-segment whenever the content size is known and the whole
1952 // source fits the active window (`contentSizeFlag && windowSize >=
1953 // srcSize`). A single-segment frame REQUIRES an FCS field, so
1954 // suppressing the content size (`content_size_flag` off) forces the
1955 // windowed layout. There is no lower size bound: small payloads
1956 // benefit most, since a windowed frame cannot encode a content size
1957 // below 256 in fewer than 4 FCS bytes (the 1-byte FCS class is
1958 // single-segment-only, see `find_fcs_field_size`), whereas a
1959 // single-segment frame stores it in one byte and omits the window
1960 // descriptor. The single-segment window equals the FCS, so a block
1961 // must never reference past the content: the post-hoc raw fallback in
1962 // the block emitters guarantees any non-shrinking block is stored raw,
1963 // and genuine matches stay within the already-emitted output.
1964 // Dictionary frames qualify too (the dictionary is decoder setup
1965 // state, not part of the regenerated segment), keeping the decoder's
1966 // single-allocation path (our decoder caps reservation to
1967 // min(window, FCS) either way).
1968 let single_segment = self.content_size_flag
1969 && prep.source_size_hint_known
1970 && total_uncompressed <= prep.window_size;
1971 let header = FrameHeader {
1972 frame_content_size: self.content_size_flag.then_some(total_uncompressed),
1973 single_segment,
1974 content_checksum: cfg!(feature = "hash") && self.content_checksum,
1975 dictionary_id: if prep.use_dictionary_state && self.dict_id_flag {
1976 self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
1977 } else {
1978 None
1979 },
1980 window_size: if single_segment {
1981 None
1982 } else {
1983 Some(prep.window_size)
1984 },
1985 magicless: self.magicless,
1986 };
1987 header.serialize(out);
1988 }
1989
1990 /// Write the frame header, accumulated block bytes, and optional
1991 /// trailing content checksum to the configured drain; populate
1992 /// `frame_emit_info` (lsm). Header and blocks are written separately to
1993 /// avoid shifting `all_blocks` to prepend the header. Used by
1994 /// `compress` and `compress_oneshot_borrowed`.
1995 fn finish_frame(&mut self, all_blocks: Vec<u8>, total_uncompressed: u64, prep: &FramePrep) {
1996 let mut header_buf: Vec<u8> = Vec::with_capacity(18);
1997 self.append_frame_header(total_uncompressed, prep, &mut header_buf);
1998 // Snapshot the checksum before borrowing the drain field so the
1999 // `self.hasher` read and the `self.compressed_data` write don't
2000 // both need `&mut self` simultaneously.
2001 #[cfg(feature = "hash")]
2002 let checksum_bytes = self
2003 .content_checksum
2004 .then(|| (self.hasher.finish() as u32).to_le_bytes());
2005 let drain = self.compressed_data.as_mut().unwrap();
2006 drain.write_all(&header_buf).unwrap();
2007 drain.write_all(&all_blocks).unwrap();
2008 // With the `hash` feature AND the content checksum enabled, the header
2009 // set `Content_Checksum_flag` and the 32-bit digest is written at the
2010 // end of the frame. Disabled => no trailing bytes, flag stays 0.
2011 #[cfg(feature = "hash")]
2012 if let Some(checksum_bytes) = checksum_bytes {
2013 drain.write_all(&checksum_bytes).unwrap();
2014 }
2015 #[cfg(feature = "lsm")]
2016 {
2017 let emit_checksum = cfg!(feature = "hash") && self.content_checksum;
2018 self.populate_frame_emit_info(header_buf.len(), &all_blocks, emit_checksum);
2019 }
2020 }
2021
2022 /// Assemble the frame (header + blocks + optional checksum) into the
2023 /// caller-provided `out` buffer, replacing its contents, and populate
2024 /// `frame_emit_info` (lsm). `out` is cleared first (its allocation is
2025 /// reused, the CCtx-equivalent zero-per-call-alloc output path) then
2026 /// grown once to the exact frame size. Used by
2027 /// `compress_independent_frame_into`. The single `all_blocks` copy into
2028 /// `out` is the same one copy `finish_frame` performs writing
2029 /// `all_blocks` into a `Vec` drain, no extra buffering vs the drain
2030 /// path.
2031 /// Walk `all_blocks` to recover per-block layout and store it in
2032 /// `frame_emit_info`. Each Block_Header is 3 bytes LE packing
2033 /// `(block_size << 3) | (block_type << 1) | last_block`. Physical body
2034 /// size differs by type: RLE bodies are always 1 byte (the repeated
2035 /// byte), Raw/Compressed bodies span `block_size`. `header_len` is the
2036 /// serialized frame-header length (frame offset of the first block).
2037 #[cfg(feature = "lsm")]
2038 fn populate_frame_emit_info(
2039 &mut self,
2040 header_len: usize,
2041 all_blocks: &[u8],
2042 emit_checksum: bool,
2043 ) {
2044 use crate::blocks::block::BlockType as BT;
2045 use crate::encoding::frame_emit_info::{FrameBlock, FrameEmitInfo};
2046 // All frame-offset arithmetic below is bounded by u32 on the wire
2047 // (Block_Size is a 21-bit field, frames bounded by MAX_BLOCK_SIZE *
2048 // #blocks). A pathologically large frame whose total emitted size
2049 // exceeds u32::MAX would overflow the cast; bail out by leaving
2050 // `frame_emit_info` at `None` rather than handing the caller a
2051 // silently-truncated layout. The overflow path is statically
2052 // unreachable on every realistic frame so the predictor amortises
2053 // the branch to zero cost.
2054 let frame_header_len: u32 = match u32::try_from(header_len) {
2055 Ok(v) => v,
2056 Err(_) => return,
2057 };
2058 let all_blocks_len_u32: u32 = match u32::try_from(all_blocks.len()) {
2059 Ok(v) => v,
2060 Err(_) => return,
2061 };
2062 let mut blocks: Vec<FrameBlock> = Vec::new();
2063 let mut cursor: usize = 0;
2064 while cursor + 3 <= all_blocks.len() {
2065 let mut header_u32 = [0u8; 4];
2066 header_u32[..3].copy_from_slice(&all_blocks[cursor..cursor + 3]);
2067 let raw = u32::from_le_bytes(header_u32);
2068 let last_block = (raw & 1) != 0;
2069 let block_type = match (raw >> 1) & 0b11 {
2070 0 => BT::Raw,
2071 1 => BT::RLE,
2072 2 => BT::Compressed,
2073 _ => BT::Reserved,
2074 };
2075 let block_size_field = raw >> 3;
2076 // RLE bodies are always 1 byte physical on the wire (the single
2077 // repeated byte); the spec's Block_Size field carries the
2078 // logical repeat count. Raw and Compressed bodies physically
2079 // span block_size_field bytes. Store the physical length in
2080 // body_size so the 'offset + header + body_size' arithmetic
2081 // always lands on the next block boundary, and surface the raw
2082 // spec field separately as block_size_field.
2083 let physical_body: u32 = match block_type {
2084 BT::RLE => 1,
2085 _ => block_size_field,
2086 };
2087 let cursor_u32: u32 = match u32::try_from(cursor) {
2088 Ok(v) => v,
2089 Err(_) => return,
2090 };
2091 let offset_in_frame = match frame_header_len.checked_add(cursor_u32) {
2092 Some(v) => v,
2093 None => return,
2094 };
2095 // Decompressed (regenerated) size, captured per physical block
2096 // during emit (1:1 with the wire blocks scanned here). Raw/RLE are
2097 // wire-derivable (`block_size_field`), so a short sidecar still
2098 // yields the correct value for them. A Compressed block's size is
2099 // NOT on the wire: if the sidecar is missing its entry, fabricating
2100 // 0 would publish a silently-wrong `decompressed_byte_range`. Since
2101 // this metadata is the authoritative mapping for a successful
2102 // encode, bail out (leave `frame_emit_info` at `None`) rather than
2103 // hand back a corrupt layout; the 1:1 push invariant makes this
2104 // unreachable in practice (debug_assert catches a regression).
2105 let decompressed_size = match self.block_decompressed_sizes.get(blocks.len()).copied() {
2106 Some(size) => size,
2107 None if matches!(block_type, BT::Raw | BT::RLE) => block_size_field,
2108 None => {
2109 debug_assert!(
2110 false,
2111 "missing decompressed-size sidecar entry for compressed block {}",
2112 blocks.len()
2113 );
2114 return;
2115 }
2116 };
2117 blocks.push(FrameBlock {
2118 offset_in_frame,
2119 header_size: 3,
2120 body_size: physical_body,
2121 block_size_field,
2122 block_type,
2123 last_block,
2124 decompressed_size,
2125 });
2126 cursor += 3 + physical_body as usize;
2127 if last_block {
2128 break;
2129 }
2130 }
2131 // Fail closed on a structurally incomplete scan: the loop must have
2132 // consumed the whole block section AND ended on a parsed last block.
2133 // A premature `last_block` (bytes left over) or a run-off without any
2134 // last block would otherwise publish an invalid public `FrameEmitInfo`.
2135 // Unreachable for a well-formed self-produced frame (debug_assert
2136 // catches a regression); on release we bail, leaving `frame_emit_info`
2137 // at `None` rather than handing back a corrupt layout.
2138 if cursor != all_blocks.len() || !blocks.last().is_some_and(|b| b.last_block) {
2139 debug_assert!(
2140 false,
2141 "incomplete block scan in populate_frame_emit_info: cursor={} len={} last_block={:?}",
2142 cursor,
2143 all_blocks.len(),
2144 blocks.last().map(|b| b.last_block)
2145 );
2146 return;
2147 }
2148 let checksum_range = if emit_checksum {
2149 let cs_start = match frame_header_len.checked_add(all_blocks_len_u32) {
2150 Some(v) => v,
2151 None => return,
2152 };
2153 let cs_end = match cs_start.checked_add(4) {
2154 Some(v) => v,
2155 None => return,
2156 };
2157 Some(cs_start..cs_end)
2158 } else {
2159 None
2160 };
2161 let body_total = match frame_header_len.checked_add(all_blocks_len_u32) {
2162 Some(v) => v,
2163 None => return,
2164 };
2165 let total_size = if checksum_range.is_some() {
2166 match body_total.checked_add(4) {
2167 Some(v) => v,
2168 None => return,
2169 }
2170 } else {
2171 body_total
2172 };
2173 self.frame_emit_info = Some(FrameEmitInfo {
2174 frame_header_range: 0..frame_header_len,
2175 blocks,
2176 checksum_range,
2177 total_size,
2178 });
2179 }
2180
2181 /// Layout of the most recently emitted frame.
2182 ///
2183 /// Returns `None` if [`compress`](Self::compress) has not been
2184 /// called yet on this compressor. After a successful `compress()`
2185 /// the returned `FrameEmitInfo` describes the frame header range,
2186 /// every emitted block's offset / size / type, and the optional
2187 /// trailing content-checksum range — all in frame-absolute byte
2188 /// offsets matching the bytes written to the drain.
2189 ///
2190 /// Behind the `lsm` Cargo feature.
2191 #[cfg(feature = "lsm")]
2192 pub fn last_frame_emit_info(&self) -> Option<&crate::encoding::frame_emit_info::FrameEmitInfo> {
2193 self.frame_emit_info.as_ref()
2194 }
2195
2196 /// Opt in to per-block XXH64 checksum computation during
2197 /// [`compress`](Self::compress). Default off; zero cost when
2198 /// disabled. The captured digests are accessible via
2199 /// [`last_frame_block_checksums`](Self::last_frame_block_checksums).
2200 ///
2201 /// One checksum is emitted per physical FrameBlock written to
2202 /// the drain: 1:1 cardinality with
2203 /// [`last_frame_emit_info`](Self::last_frame_emit_info)'s
2204 /// `blocks` vector. On the post-split optimization path
2205 /// (Level 16-22 with large window) the per-partition decompressed
2206 /// range is hashed inside the partition loop so the digest count
2207 /// still matches the emitted block count. The decoder collects
2208 /// per-physical-block digests on the same granularity, so
2209 /// element-wise equality holds round-trip.
2210 ///
2211 /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
2212 /// primitive lives behind the `hash` feature, so this method only
2213 /// compiles when both are enabled.
2214 #[cfg(all(feature = "lsm", feature = "hash"))]
2215 pub fn enable_per_block_checksums(&mut self) {
2216 self.per_block_checksums_enabled = true;
2217 }
2218
2219 /// Per-block XXH64 (low 32 bits) digests captured during the most
2220 /// recent `compress()` call. `None` unless
2221 /// [`enable_per_block_checksums`](Self::enable_per_block_checksums)
2222 /// was called before `compress()`.
2223 ///
2224 /// Behind `all(feature = "lsm", feature = "hash")`.
2225 #[cfg(all(feature = "lsm", feature = "hash"))]
2226 pub fn last_frame_block_checksums(&self) -> Option<&[u32]> {
2227 self.block_checksums.as_deref()
2228 }
2229
2230 /// Get a mutable reference to the source
2231 pub fn source_mut(&mut self) -> Option<&mut R> {
2232 self.uncompressed_data.as_mut()
2233 }
2234
2235 /// Get a mutable reference to the drain
2236 pub fn drain_mut(&mut self) -> Option<&mut W> {
2237 self.compressed_data.as_mut()
2238 }
2239
2240 /// Get a reference to the source
2241 pub fn source(&self) -> Option<&R> {
2242 self.uncompressed_data.as_ref()
2243 }
2244
2245 /// Get a reference to the drain
2246 pub fn drain(&self) -> Option<&W> {
2247 self.compressed_data.as_ref()
2248 }
2249
2250 /// Retrieve the source
2251 pub fn take_source(&mut self) -> Option<R> {
2252 self.uncompressed_data.take()
2253 }
2254
2255 /// Retrieve the drain
2256 pub fn take_drain(&mut self) -> Option<W> {
2257 self.compressed_data.take()
2258 }
2259
2260 /// Before calling [FrameCompressor::compress] you can replace the matcher
2261 pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
2262 core::mem::swap(&mut match_generator, &mut self.state.matcher);
2263 match_generator
2264 }
2265
2266 /// Before calling [FrameCompressor::compress] you can replace the compression level.
2267 ///
2268 /// This also clears any fine-grained parameter overrides installed via
2269 /// [`set_parameters`](Self::set_parameters): reverting to a bare level
2270 /// means plain level-based tuning, not the previous frame's customized
2271 /// strategy / LDM / log overrides. To keep overriding, call
2272 /// [`set_parameters`](Self::set_parameters) again with the new base level.
2273 pub fn set_compression_level(
2274 &mut self,
2275 compression_level: CompressionLevel,
2276 ) -> CompressionLevel {
2277 let old = self.compression_level;
2278 self.compression_level = compression_level;
2279 // Drop sticky overrides so the level switch yields plain geometry.
2280 self.strategy_override = None;
2281 self.state.matcher.clear_param_overrides();
2282 old
2283 }
2284
2285 /// Get the current compression level
2286 pub fn compression_level(&self) -> CompressionLevel {
2287 self.compression_level
2288 }
2289
2290 /// Attach a pre-parsed dictionary to be used for subsequent compressions.
2291 ///
2292 /// In compressed modes, the dictionary id is written only when the active
2293 /// matcher supports dictionary priming.
2294 /// Uncompressed mode and non-priming matchers ignore the attached dictionary
2295 /// at encode time.
2296 pub fn set_dictionary(
2297 &mut self,
2298 dictionary: crate::decoding::Dictionary,
2299 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2300 self.attach_dictionary(EncoderDictionary::from_dictionary(dictionary))
2301 }
2302
2303 /// Parse and attach a serialized dictionary blob.
2304 ///
2305 /// Parses with the encoder-only path (skips the FSE/HUF decode lookup-table
2306 /// build the encoder never reads); the entropy ENCODER tables — and thus
2307 /// the emitted frame — are identical to a full parse.
2308 pub fn set_dictionary_from_bytes(
2309 &mut self,
2310 raw_dictionary: &[u8],
2311 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2312 self.attach_dictionary(EncoderDictionary::from_bytes(raw_dictionary)?)
2313 }
2314
2315 /// Attach an already-parsed [`EncoderDictionary`] without reparsing a raw
2316 /// blob.
2317 ///
2318 /// Accepts an `EncoderDictionary` produced once via
2319 /// [`EncoderDictionary::from_bytes`] / [`EncoderDictionary::from_dictionary`]
2320 /// or handed back by [`Self::clear_dictionary`] / the `set_dictionary*`
2321 /// return value, so callers can reattach or reuse a prepared dictionary
2322 /// across compressions without re-running the dictionary parse each time.
2323 /// Returns the previously-attached dictionary, if any.
2324 pub fn set_encoder_dictionary(
2325 &mut self,
2326 dictionary: EncoderDictionary,
2327 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2328 self.attach_dictionary(dictionary)
2329 }
2330
2331 /// Remove the attached dictionary, returning it as an [`EncoderDictionary`].
2332 pub fn clear_dictionary(&mut self) -> Option<EncoderDictionary> {
2333 self.dictionary_entropy_cache = None;
2334 // Drop the CDict prime snapshot — it is keyed to the dictionary
2335 // being removed and must not be restored against a different (or no)
2336 // dictionary on the next frame.
2337 self.state.matcher.invalidate_primed_dictionary();
2338 self.dictionary.take()
2339 }
2340
2341 /// Validate `enc`, build the encoder entropy cache from it, store it, and
2342 /// return the previously-attached dictionary. Shared by every public
2343 /// attach entry point: `set_dictionary`, `set_dictionary_from_bytes`, and
2344 /// `set_encoder_dictionary`.
2345 fn attach_dictionary(
2346 &mut self,
2347 enc: EncoderDictionary,
2348 ) -> Result<Option<EncoderDictionary>, crate::decoding::errors::DictionaryDecodeError> {
2349 let dictionary = &enc.inner;
2350 if dictionary.id == 0 {
2351 return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
2352 }
2353 if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
2354 return Err(
2355 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
2356 index: index as u8,
2357 },
2358 );
2359 }
2360 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(dictionary));
2361 // A previously-captured CDict prime snapshot belongs to the OLD
2362 // dictionary; drop it so the first frame with the new dictionary
2363 // re-primes (and re-captures) instead of restoring stale tables.
2364 self.state.matcher.invalidate_primed_dictionary();
2365 Ok(self.dictionary.replace(enc))
2366 }
2367}
2368
2369#[cfg(test)]
2370mod tests {
2371 // `format!` is used by ungated tests (e.g. the btlazy2 dict-reuse
2372 // byte-identity test), so the import must not be feature-gated — under
2373 // default features (no `dict_builder`) the gated form left `format!`
2374 // unresolved when the test module is compiled.
2375 use alloc::format;
2376 use alloc::vec;
2377
2378 use super::FrameCompressor;
2379 use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
2380 use crate::decoding::FrameDecoder;
2381 use crate::encoding::{Matcher, Sequence};
2382 use alloc::vec::Vec;
2383
2384 fn generate_data(seed: u64, len: usize) -> Vec<u8> {
2385 let mut state = seed;
2386 let mut data = Vec::with_capacity(len);
2387 for _ in 0..len {
2388 state = state
2389 .wrapping_mul(6364136223846793005)
2390 .wrapping_add(1442695040888963407);
2391 data.push((state >> 33) as u8);
2392 }
2393 data
2394 }
2395
2396 // Cross-implementation parity tests (compress here, decode through the C
2397 // bindings) moved to `ffi-bench/tests/frame_compressor_ffi.rs` so the
2398 // library crate never links libzstd.
2399
2400 struct NoDictionaryMatcher {
2401 last_space: Vec<u8>,
2402 window_size: u64,
2403 }
2404
2405 impl NoDictionaryMatcher {
2406 fn new(window_size: u64) -> Self {
2407 Self {
2408 last_space: Vec::new(),
2409 window_size,
2410 }
2411 }
2412 }
2413
2414 impl Matcher for NoDictionaryMatcher {
2415 fn get_next_space(&mut self) -> Vec<u8> {
2416 vec![0; self.window_size as usize]
2417 }
2418
2419 fn get_last_space(&mut self) -> &[u8] {
2420 self.last_space.as_slice()
2421 }
2422
2423 fn commit_space(&mut self, space: Vec<u8>) {
2424 self.last_space = space;
2425 }
2426
2427 fn skip_matching(&mut self) {}
2428
2429 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
2430 handle_sequence(Sequence::Literals {
2431 literals: self.last_space.as_slice(),
2432 });
2433 }
2434
2435 fn reset(&mut self, _level: super::CompressionLevel) {
2436 self.last_space.clear();
2437 }
2438
2439 fn window_size(&self) -> u64 {
2440 self.window_size
2441 }
2442 }
2443
2444 #[test]
2445 fn frame_starts_with_magic_num() {
2446 let mock_data = [1_u8, 2, 3].as_slice();
2447 let mut output: Vec<u8> = Vec::new();
2448 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2449 compressor.set_source(mock_data);
2450 compressor.set_drain(&mut output);
2451
2452 compressor.compress();
2453 assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
2454 }
2455
2456 #[test]
2457 fn very_simple_raw_compress() {
2458 let mock_data = [1_u8, 2, 3].as_slice();
2459 let mut output: Vec<u8> = Vec::new();
2460 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2461 compressor.set_source(mock_data);
2462 compressor.set_drain(&mut output);
2463
2464 compressor.compress();
2465 }
2466
2467 #[test]
2468 fn very_simple_compress() {
2469 let mut mock_data = vec![0; 1 << 17];
2470 mock_data.extend(vec![1; (1 << 17) - 1]);
2471 mock_data.extend(vec![2; (1 << 18) - 1]);
2472 mock_data.extend(vec![2; 1 << 17]);
2473 mock_data.extend(vec![3; (1 << 17) - 1]);
2474 let mut output: Vec<u8> = Vec::new();
2475 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2476 compressor.set_source(mock_data.as_slice());
2477 compressor.set_drain(&mut output);
2478
2479 compressor.compress();
2480
2481 let mut decoder = FrameDecoder::new();
2482 let mut decoded = Vec::with_capacity(mock_data.len());
2483 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2484 assert_eq!(mock_data, decoded);
2485 }
2486
2487 #[test]
2488 fn rle_compress() {
2489 let mock_data = vec![0; 1 << 19];
2490 let mut output: Vec<u8> = Vec::new();
2491 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2492 compressor.set_source(mock_data.as_slice());
2493 compressor.set_drain(&mut output);
2494
2495 compressor.compress();
2496
2497 let mut decoder = FrameDecoder::new();
2498 let mut decoded = Vec::with_capacity(mock_data.len());
2499 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2500 assert_eq!(mock_data, decoded);
2501 }
2502
2503 #[test]
2504 fn aaa_compress() {
2505 let mock_data = vec![0, 1, 3, 4, 5];
2506 let mut output: Vec<u8> = Vec::new();
2507 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
2508 compressor.set_source(mock_data.as_slice());
2509 compressor.set_drain(&mut output);
2510
2511 compressor.compress();
2512
2513 let mut decoder = FrameDecoder::new();
2514 let mut decoded = Vec::with_capacity(mock_data.len());
2515 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
2516 assert_eq!(mock_data, decoded);
2517 }
2518
2519 #[test]
2520 fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
2521 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2522 let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2523 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2524
2525 let mut data = Vec::new();
2526 for _ in 0..8 {
2527 data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
2528 }
2529
2530 let mut with_dict = Vec::new();
2531 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2532 let previous = compressor
2533 .set_dictionary_from_bytes(dict_raw)
2534 .expect("dictionary bytes should parse");
2535 assert!(
2536 previous.is_none(),
2537 "first dictionary insert should return None"
2538 );
2539 assert_eq!(
2540 compressor
2541 .set_dictionary(dict_for_encoder)
2542 .expect("valid dictionary should attach")
2543 .expect("set_dictionary_from_bytes inserted previous dictionary")
2544 .id(),
2545 dict_for_decoder.id
2546 );
2547 compressor.set_source(data.as_slice());
2548 compressor.set_drain(&mut with_dict);
2549 compressor.compress();
2550
2551 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2552 .expect("encoded stream should have a frame header");
2553 assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
2554
2555 let mut decoder = FrameDecoder::new();
2556 let mut missing_dict_target = Vec::with_capacity(data.len());
2557 let err = decoder
2558 .decode_all_to_vec(&with_dict, &mut missing_dict_target)
2559 .unwrap_err();
2560 assert!(
2561 matches!(
2562 &err,
2563 crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
2564 ),
2565 "dict-compressed stream should require dictionary id, got: {err:?}"
2566 );
2567
2568 let mut decoder = FrameDecoder::new();
2569 decoder.add_dict(dict_for_decoder).unwrap();
2570 let mut decoded = Vec::with_capacity(data.len());
2571 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2572 assert_eq!(decoded, data);
2573 }
2574
2575 #[cfg(all(feature = "dict_builder", feature = "std"))]
2576 #[test]
2577 fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
2578 use std::io::Cursor;
2579
2580 let mut training = Vec::new();
2581 for idx in 0..256u32 {
2582 training.extend_from_slice(
2583 format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
2584 );
2585 }
2586 let mut raw_dict = Vec::new();
2587 crate::dictionary::create_raw_dict_from_source(
2588 Cursor::new(training.as_slice()),
2589 training.len(),
2590 &mut raw_dict,
2591 4096,
2592 )
2593 .expect("dict_builder training should succeed");
2594 assert!(
2595 !raw_dict.is_empty(),
2596 "dict_builder produced an empty dictionary"
2597 );
2598
2599 let dict_id = 0xD1C7_0008;
2600 let encoder_dict =
2601 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2602 let decoder_dict =
2603 crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
2604
2605 let mut payload = Vec::new();
2606 for idx in 0..96u32 {
2607 payload.extend_from_slice(
2608 format!(
2609 "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
2610 )
2611 .as_bytes(),
2612 );
2613 }
2614
2615 let mut without_dict = Vec::new();
2616 let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
2617 baseline.set_source(payload.as_slice());
2618 baseline.set_drain(&mut without_dict);
2619 baseline.compress();
2620
2621 let mut with_dict = Vec::new();
2622 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2623 compressor
2624 .set_dictionary(encoder_dict)
2625 .expect("valid dict_builder dictionary should attach");
2626 compressor.set_source(payload.as_slice());
2627 compressor.set_drain(&mut with_dict);
2628 compressor.compress();
2629
2630 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2631 .expect("encoded stream should have a frame header");
2632 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
2633 let mut decoder = FrameDecoder::new();
2634 decoder.add_dict(decoder_dict).unwrap();
2635 let mut decoded = Vec::with_capacity(payload.len());
2636 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2637 assert_eq!(decoded, payload);
2638 assert!(
2639 with_dict.len() < without_dict.len(),
2640 "trained dictionary should improve compression for this small payload"
2641 );
2642 }
2643
2644 #[test]
2645 fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
2646 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2647 let mut output = Vec::new();
2648 let input = b"";
2649
2650 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2651 let previous = compressor
2652 .set_dictionary_from_bytes(dict_raw)
2653 .expect("dictionary bytes should parse");
2654 assert!(previous.is_none());
2655
2656 compressor.set_source(input.as_slice());
2657 compressor.set_drain(&mut output);
2658 compressor.compress();
2659
2660 assert!(
2661 compressor.state.last_huff_table.is_some(),
2662 "dictionary entropy should seed previous huffman table before first block"
2663 );
2664 assert!(
2665 compressor.state.fse_tables.ll_previous.is_some(),
2666 "dictionary entropy should seed previous ll table before first block"
2667 );
2668 assert!(
2669 compressor.state.fse_tables.ml_previous.is_some(),
2670 "dictionary entropy should seed previous ml table before first block"
2671 );
2672 assert!(
2673 compressor.state.fse_tables.of_previous.is_some(),
2674 "dictionary entropy should seed previous of table before first block"
2675 );
2676 }
2677
2678 // `set_content_size_flag(false)`: the header must omit the FCS field
2679 // (and the single-segment layout that requires it) while the frame
2680 // still round-trips through our decoder.
2681 #[test]
2682 fn content_size_flag_off_omits_fcs_and_roundtrips() {
2683 let payload = alloc::vec![0x42u8; 4096];
2684
2685 let mut compressor: FrameCompressor =
2686 FrameCompressor::new(super::CompressionLevel::Fastest);
2687 let mut with_fcs = Vec::new();
2688 compressor.compress_independent_frame_into(&payload, &mut with_fcs);
2689
2690 compressor.set_content_size_flag(false);
2691 let mut without_fcs = Vec::new();
2692 compressor.compress_independent_frame_into(&payload, &mut without_fcs);
2693
2694 let parsed_with = crate::decoding::frame::read_frame_header(with_fcs.as_slice())
2695 .expect("flag-on frame header must parse")
2696 .0;
2697 assert_eq!(parsed_with.frame_content_size(), 4096);
2698
2699 let parsed_without = crate::decoding::frame::read_frame_header(without_fcs.as_slice())
2700 .expect("flag-off frame header must parse")
2701 .0;
2702 // 0 is the decoder's "unknown content size" sentinel...
2703 assert_eq!(
2704 parsed_without.frame_content_size(),
2705 0,
2706 "FCS must be omitted with the content-size flag off"
2707 );
2708 // ...and the descriptor must confirm the field is ABSENT (0 bytes),
2709 // not present with an explicit zero value.
2710 assert_eq!(
2711 parsed_without
2712 .descriptor
2713 .frame_content_size_bytes()
2714 .expect("descriptor must parse"),
2715 0,
2716 "the FCS field itself must be omitted, not written as zero"
2717 );
2718
2719 let mut decoder = crate::decoding::FrameDecoder::new();
2720 // `decode_all_to_vec` fills existing capacity (no FCS to pre-size
2721 // from with the flag off), so reserve the expected payload upfront.
2722 let mut decoded = Vec::with_capacity(payload.len() + 64);
2723 decoder
2724 .decode_all_to_vec(&without_fcs, &mut decoded)
2725 .expect("flag-off frame must decode");
2726 assert_eq!(decoded, payload);
2727 }
2728
2729 // `set_dictionary_id_flag(false)`: a dict-compressed frame must omit
2730 // the dictionary ID and still decode when the dictionary is handed to
2731 // the decoder explicitly.
2732 #[test]
2733 fn dict_id_flag_off_omits_dictionary_id_and_roundtrips() {
2734 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2735 let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(8);
2736
2737 let mut compressor: FrameCompressor =
2738 FrameCompressor::new(super::CompressionLevel::Fastest);
2739 compressor
2740 .set_dictionary_from_bytes(dict_raw)
2741 .expect("dictionary bytes should parse");
2742 compressor.set_dictionary_id_flag(false);
2743 let mut frame = Vec::new();
2744 compressor.compress_independent_frame_into(&payload, &mut frame);
2745
2746 let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
2747 .expect("frame header must parse")
2748 .0;
2749 assert_eq!(
2750 parsed.dictionary_id(),
2751 None,
2752 "dictionary id must be omitted with the dict-id flag off"
2753 );
2754
2755 // With the ID omitted the decoder cannot look the dictionary up by
2756 // header; hand it explicitly (the `reset_with_dict_handle` path).
2757 let mut sd = crate::decoding::StreamingDecoder::new_with_dictionary_bytes(
2758 frame.as_slice(),
2759 dict_raw,
2760 )
2761 .expect("decoder must accept the dictionary");
2762 let mut dec = Vec::new();
2763 std::io::Read::read_to_end(&mut sd, &mut dec)
2764 .expect("frame must decode with the dictionary handed explicitly");
2765 assert_eq!(dec, payload);
2766 }
2767
2768 // The output reservation must track the observed compression ratio, not
2769 // the whole-input `compress_bound`: a multi-MiB compressible stream's
2770 // output buffer stays at output scale (the old up-front bound held an
2771 // input-sized allocation for the whole frame). Incompressible input may
2772 // still re-estimate to ~the full bound — that is the genuine worst case.
2773 #[test]
2774 fn compressible_stream_output_capacity_stays_at_output_scale() {
2775 // 4 MiB of highly repetitive log-like lines.
2776 let line = b"ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo\n";
2777 let mut input = Vec::with_capacity(4 << 20);
2778 while input.len() < (4 << 20) {
2779 let take = line.len().min((4 << 20) - input.len());
2780 input.extend_from_slice(&line[..take]);
2781 }
2782
2783 let mut compressor: FrameCompressor =
2784 FrameCompressor::new(super::CompressionLevel::Fastest);
2785 let mut out = Vec::new();
2786 compressor.compress_independent_frame_into(&input, &mut out);
2787
2788 assert!(!out.is_empty());
2789 assert!(
2790 out.capacity() < input.len() / 4,
2791 "capacity {} must stay at output scale (input {}, output {})",
2792 out.capacity(),
2793 input.len(),
2794 out.len()
2795 );
2796
2797 // Round-trip: the adaptive reservation must not affect the bytes.
2798 let mut decoder = crate::decoding::FrameDecoder::new();
2799 let mut decoded = Vec::with_capacity(input.len() + 64);
2800 decoder
2801 .decode_all_to_vec(&out, &mut decoded)
2802 .expect("frame must decode");
2803 assert_eq!(decoded, input);
2804 }
2805
2806 // A dictionary frame with a known content size that fits the window
2807 // must take the single-segment layout (reference parity): the
2808 // dictionary is decoder setup state, not part of the regenerated
2809 // segment, so it must not force the windowed multi-segment layout.
2810 #[test]
2811 fn dict_frame_with_known_size_is_single_segment() {
2812 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2813 let payload = b"dictionary-keyed payload dictionary-keyed payload".repeat(64);
2814
2815 let mut compressor: FrameCompressor =
2816 FrameCompressor::new(super::CompressionLevel::Fastest);
2817 compressor
2818 .set_dictionary_from_bytes(dict_raw)
2819 .expect("dictionary bytes should parse");
2820 let mut frame = Vec::new();
2821 compressor.compress_independent_frame_into(&payload, &mut frame);
2822
2823 let parsed = crate::decoding::frame::read_frame_header(frame.as_slice())
2824 .expect("frame header must parse")
2825 .0;
2826 assert!(
2827 parsed.descriptor.single_segment_flag(),
2828 "dict frame with known size <= window must be single-segment"
2829 );
2830 assert!(parsed.dictionary_id().is_some());
2831 assert_eq!(parsed.frame_content_size(), payload.len() as u64);
2832
2833 // Round-trip through our own decoder with the dictionary.
2834 let mut decoder = crate::decoding::FrameDecoder::new();
2835 decoder
2836 .add_dict_from_bytes(dict_raw)
2837 .expect("decoder must accept the dictionary");
2838 let mut decoded = Vec::with_capacity(payload.len() + 64);
2839 decoder
2840 .decode_all_to_vec(&frame, &mut decoded)
2841 .expect("single-segment dict frame must decode");
2842 assert_eq!(decoded, payload);
2843 }
2844
2845 // Regression: after `clear_dictionary()` a reused compressor must fully
2846 // deactivate the dictionary state. The dict-active matcher reset rewinds the
2847 // hash/chain tables to the origin (`position_base = 0`) and DEFERS the table
2848 // clear to the next prime/restore. If `clear_dictionary` leaves the matcher
2849 // marked dictionary-active, a subsequent NO-dictionary frame hits that
2850 // deferred-clear branch but never runs prime/restore, so stale dict-region
2851 // entries (old absolute positions) survive at the rewound base and can
2852 // surface as bogus matches. The no-dict frame must still round-trip without
2853 // the dictionary. Uses Level(16) (btopt → HashChain backend, whose storage
2854 // owns the deferred-clear path).
2855 #[test]
2856 fn clear_dictionary_then_nodict_frame_roundtrips() {
2857 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2858 // Payload B embeds dictionary bytes up front so a surviving dict-region
2859 // chain entry would be a tempting (wrong) match candidate.
2860 let mut payload_b = Vec::new();
2861 payload_b.extend_from_slice(&dict_raw[..dict_raw.len().min(2048)]);
2862 payload_b.extend_from_slice(
2863 b"no-dictionary tail no-dictionary tail"
2864 .repeat(16)
2865 .as_slice(),
2866 );
2867
2868 let mut compressor: FrameCompressor =
2869 FrameCompressor::new(super::CompressionLevel::Level(16));
2870 compressor
2871 .set_dictionary_from_bytes(dict_raw)
2872 .expect("dictionary bytes should parse");
2873 // Frame A primes the dictionary (sets the matcher dictionary-active).
2874 let mut frame_a = Vec::new();
2875 compressor.compress_independent_frame_into(
2876 b"dictionary-keyed payload dictionary-keyed payload"
2877 .repeat(8)
2878 .as_slice(),
2879 &mut frame_a,
2880 );
2881 // Remove the dictionary, then compress a no-dictionary frame on the
2882 // SAME reused compressor.
2883 compressor.clear_dictionary();
2884 let mut frame_b = Vec::new();
2885 compressor.compress_independent_frame_into(&payload_b, &mut frame_b);
2886
2887 // Frame B must decode WITHOUT any dictionary.
2888 let mut decoder = crate::decoding::FrameDecoder::new();
2889 let mut decoded = Vec::with_capacity(payload_b.len() + 64);
2890 decoder
2891 .decode_all_to_vec(&frame_b, &mut decoded)
2892 .expect("no-dict frame after clear_dictionary must decode");
2893 assert_eq!(
2894 decoded, payload_b,
2895 "no-dict frame after clear_dictionary must round-trip exactly"
2896 );
2897 }
2898
2899 // Regression test: `heap_size()` must count the retained Huffman tables
2900 // (the active `last_huff_table` and the recycled `huff_table_spare`).
2901 // A reused context that parks a table would otherwise under-report its
2902 // footprint through the public size API.
2903 #[test]
2904 fn heap_size_counts_active_and_spare_huffman_tables() {
2905 let mut compressor: FrameCompressor =
2906 FrameCompressor::new(super::CompressionLevel::Fastest);
2907 let base = compressor.heap_size();
2908
2909 let active = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
2910 b"abacabadabacabaeabacabadabacaba",
2911 );
2912 let active_bytes = active.heap_size();
2913 assert!(active_bytes > 0, "built table must own heap buffers");
2914 compressor.state.last_huff_table = Some(active);
2915 assert_eq!(
2916 compressor.heap_size(),
2917 base + active_bytes,
2918 "heap_size must include the active last_huff_table"
2919 );
2920
2921 let spare = crate::huff0::huff0_encoder::HuffmanTable::build_from_data(
2922 b"the quick brown fox jumps over the lazy dog",
2923 );
2924 let spare_bytes = spare.heap_size();
2925 assert!(spare_bytes > 0, "built table must own heap buffers");
2926 compressor.state.huff_table_spare = Some(spare);
2927 assert_eq!(
2928 compressor.heap_size(),
2929 base + active_bytes + spare_bytes,
2930 "heap_size must include the parked huff_table_spare"
2931 );
2932 }
2933
2934 #[test]
2935 fn set_encoder_dictionary_reattaches_prepared_dict_without_reparse() {
2936 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2937 let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
2938 tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
2939
2940 // Prepare the EncoderDictionary once, then attach it via the prepared-
2941 // dictionary API (no raw-blob reparse at attach time).
2942 let prepared =
2943 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
2944 let dict_id = prepared.id();
2945
2946 let mut with_dict = Vec::new();
2947 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
2948 let previous = compressor
2949 .set_encoder_dictionary(prepared)
2950 .expect("prepared dictionary should attach");
2951 assert!(previous.is_none());
2952 compressor.set_source(payload.as_slice());
2953 compressor.set_drain(&mut with_dict);
2954 compressor.compress();
2955 // clear_dictionary hands the prepared dictionary back (last use of
2956 // `compressor`, so its `&mut with_dict` drain borrow ends here).
2957 let returned = compressor
2958 .clear_dictionary()
2959 .expect("dictionary was attached");
2960 assert_eq!(returned.id(), dict_id);
2961
2962 // The reattached dictionary drives the frame: its id is advertised and
2963 // the stream round-trips through a decoder primed with the same dict.
2964 let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
2965 .expect("encoded stream should have a frame header");
2966 assert_eq!(frame_header.dictionary_id(), Some(dict_id));
2967 let decoder_dict = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
2968 let mut decoder = FrameDecoder::new();
2969 decoder.add_dict(decoder_dict).unwrap();
2970 let mut decoded = Vec::with_capacity(payload.len());
2971 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
2972 assert_eq!(decoded.as_slice(), payload.as_slice());
2973
2974 // The dictionary handed back by clear_dictionary reattaches to another
2975 // compressor without touching the raw bytes again, producing an
2976 // identical frame.
2977 let mut with_dict2 = Vec::new();
2978 let mut compressor2 = FrameCompressor::new(super::CompressionLevel::Fastest);
2979 compressor2
2980 .set_encoder_dictionary(returned)
2981 .expect("returned dictionary should reattach");
2982 compressor2.set_source(payload.as_slice());
2983 compressor2.set_drain(&mut with_dict2);
2984 compressor2.compress();
2985 assert_eq!(
2986 with_dict2, with_dict,
2987 "reattached prepared dict must produce an identical frame"
2988 );
2989 }
2990
2991 #[test]
2992 fn dict_primed_matcher_snapshot_reused_across_frames_is_byte_identical() {
2993 // CDict-equivalent: a compressor reused across frames with the same
2994 // dictionary restores the primed matcher snapshot on frames 2..N
2995 // (a table copy) instead of re-hashing the dictionary. The restored
2996 // state must reproduce the first-frame (freshly-primed) output
2997 // byte-for-byte, and every frame must round-trip through a
2998 // dict-primed decoder.
2999 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3000 // Source must exceed the Fast strategy's 8 KiB attach cutoff so the
3001 // copy-snapshot (restore) path is taken on frame 2 — at or below the
3002 // cutoff the upstream zstd attaches by reference and we fall back to re-prime,
3003 // which would not exercise restore.
3004 let mut payload = Vec::new();
3005 while payload.len() < 16 * 1024 {
3006 payload.extend_from_slice(
3007 b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n",
3008 );
3009 }
3010
3011 let prepared =
3012 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3013 let dict_id = prepared.id();
3014 let mut compressor: FrameCompressor =
3015 FrameCompressor::new(super::CompressionLevel::Fastest);
3016 compressor
3017 .set_encoder_dictionary(prepared)
3018 .expect("prepared dictionary should attach");
3019
3020 // Frame 1 primes + captures the snapshot; frame 2 restores it.
3021 let frame1 = compressor.compress_independent_frame(payload.as_slice());
3022 let frame2 = compressor.compress_independent_frame(payload.as_slice());
3023 assert_eq!(
3024 frame1, frame2,
3025 "restored prime snapshot must reproduce the freshly-primed frame byte-for-byte"
3026 );
3027
3028 // Both frames advertise the dict id and round-trip through a
3029 // dict-primed decoder.
3030 for frame in [&frame1, &frame2] {
3031 let (hdr, _) =
3032 crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3033 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3034 let mut decoder = FrameDecoder::new();
3035 decoder
3036 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3037 .unwrap();
3038 let mut decoded = Vec::with_capacity(payload.len());
3039 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3040 assert_eq!(decoded.as_slice(), payload.as_slice());
3041 }
3042 }
3043
3044 #[test]
3045 fn dict_primed_matcher_cache_reused_across_small_attach_frames_is_byte_identical() {
3046 // CDict-equivalent ATTACH path (small source, at/below the Fast 8 KiB
3047 // attach cutoff): frames 2..N re-prime — re-committing the dict bytes
3048 // to history — but reuse the already-built dict table instead of
3049 // re-hashing it. The cached-table frame must reproduce the
3050 // freshly-primed first frame byte-for-byte, and a fresh single-frame
3051 // compressor (no prior dict cache) must produce the identical bytes
3052 // too, proving the cache changes timing, not output.
3053 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3054 // Stay under the 8 KiB cutoff so the attach (re-prime) path is taken
3055 // every frame rather than the copy-snapshot restore.
3056 let mut payload = Vec::new();
3057 while payload.len() < 2 * 1024 {
3058 payload.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3059 }
3060
3061 let prepared =
3062 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3063 let dict_id = prepared.id();
3064 let mut compressor: FrameCompressor =
3065 FrameCompressor::new(super::CompressionLevel::Fastest);
3066 compressor
3067 .set_encoder_dictionary(prepared)
3068 .expect("prepared dictionary should attach");
3069
3070 // Frame 1 builds + marks the dict table; frame 2 reuses it.
3071 let frame1 = compressor.compress_independent_frame(payload.as_slice());
3072 let frame2 = compressor.compress_independent_frame(payload.as_slice());
3073 assert_eq!(
3074 frame1, frame2,
3075 "reused dict table (attach path) must reproduce the freshly-built frame byte-for-byte"
3076 );
3077
3078 // A fresh compressor (cold dict cache) must emit the same bytes — the
3079 // cache is a timing optimization, never a content change.
3080 let fresh_prepared =
3081 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse");
3082 let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3083 fresh
3084 .set_encoder_dictionary(fresh_prepared)
3085 .expect("prepared dictionary should attach");
3086 let fresh_frame = fresh.compress_independent_frame(payload.as_slice());
3087 assert_eq!(
3088 fresh_frame, frame1,
3089 "cold-cache compressor must match the warm-cache frame byte-for-byte"
3090 );
3091
3092 for frame in [&frame1, &frame2] {
3093 let (hdr, _) =
3094 crate::decoding::frame::read_frame_header(frame.as_slice()).expect("frame header");
3095 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3096 let mut decoder = FrameDecoder::new();
3097 decoder
3098 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3099 .unwrap();
3100 let mut decoded = Vec::with_capacity(payload.len());
3101 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3102 assert_eq!(decoded.as_slice(), payload.as_slice());
3103 }
3104 }
3105
3106 #[test]
3107 fn dict_reused_across_many_lazy_frames_stays_applied() {
3108 // Regression: a reused HashChain-backed dictionary frame (lazy levels)
3109 // re-primes the dict at the live `history_abs_start` every frame. The
3110 // floor-advance reset let that base climb frame-over-frame until the
3111 // freshly-primed dict region dropped below `window_low`, after which
3112 // every dict match was silently lost and the output ballooned to the
3113 // no-dict size (observed at frame 3-4 of a reused compressor). Drive
3114 // many frames and require every one to stay byte-identical to the
3115 // first — the dict must keep applying, not decay after a few frames.
3116 // Multiple distinct lines so the dictionary is load-bearing: without it
3117 // frame 0 must emit each distinct line as literals; with it those lines
3118 // match the primed dict immediately. A single repeated line matches
3119 // in-frame regardless and would hide the regression.
3120 let lines: &[&[u8]] = &[
3121 b"ts=2026 level=INFO msg=\"flush memtable\" tenant=demo table=orders\n",
3122 b"ts=2026 level=INFO msg=\"rotate segment\" tenant=demo table=orders\n",
3123 b"ts=2026 level=INFO msg=\"compact level\" tenant=demo table=orders\n",
3124 b"ts=2026 level=INFO msg=\"write block\" tenant=demo table=orders\n",
3125 ];
3126 let fill = |n: usize| -> Vec<u8> {
3127 let mut b = Vec::with_capacity(n);
3128 while b.len() < n {
3129 for l in lines {
3130 if b.len() >= n {
3131 break;
3132 }
3133 let take = (n - b.len()).min(l.len());
3134 b.extend_from_slice(&l[..take]);
3135 }
3136 }
3137 b
3138 };
3139 let dict = fill(8 * 1024);
3140 let payload = fill(16 * 1024);
3141 let dict_obj =
3142 crate::decoding::Dictionary::from_raw_content(1, dict).expect("raw dict should build");
3143
3144 let mut compressor: FrameCompressor =
3145 FrameCompressor::new(super::CompressionLevel::Level(6));
3146 compressor.set_dictionary_id_flag(false);
3147 compressor
3148 .set_dictionary(dict_obj)
3149 .expect("dict should attach");
3150
3151 let first = compressor.compress_independent_frame(payload.as_slice());
3152
3153 // No-dict baseline at the same level: the dictionary must be
3154 // load-bearing, so the dict-applied frame has to beat it. Without this
3155 // anchor the equal-length loop below would also pass if EVERY frame
3156 // decayed to the no-dict size in lockstep.
3157 let mut nodict: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(6));
3158 let no_dict_frame = nodict.compress_independent_frame(payload.as_slice());
3159 assert!(
3160 first.len() < no_dict_frame.len(),
3161 "dict must be load-bearing: dict frame {} should beat the no-dict baseline {}",
3162 first.len(),
3163 no_dict_frame.len(),
3164 );
3165
3166 for i in 1..16 {
3167 let frame = compressor.compress_independent_frame(payload.as_slice());
3168 // Byte-identity, not just equal length: a same-size divergence (e.g.
3169 // a different match decision once the resident dict bookkeeping
3170 // drifts) would slip past a length-only check.
3171 assert_eq!(
3172 frame, first,
3173 "frame {i} of a reused dict compressor must stay byte-identical to \
3174 the first (dict still applied, no decay or bookkeeping drift)"
3175 );
3176 }
3177 }
3178
3179 #[test]
3180 fn dict_fast_epoch_reset_many_frames_and_attach_copy_alternation_byte_identical() {
3181 // The Fast attach path invalidates the main hash table between
3182 // frames with an epoch-bias advance instead of a memset. Two things
3183 // need proving against a fresh-compressor reference:
3184 // 1. the bias accumulates across MANY reused frames without ever
3185 // letting a stale entry through (every frame byte-identical);
3186 // 2. crossing the 8 KiB attach/copy cutoff in both directions
3187 // (attach → copy clears the bias for the raw-slice kernel,
3188 // copy → attach re-enters epoch mode) stays byte-identical.
3189 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3190 let mut small = Vec::new();
3191 while small.len() < 2 * 1024 {
3192 small.extend_from_slice(b"tenant=demo op=put key=1 value=aaaaabbbbbcccccddddd\n");
3193 }
3194 // Over the Fast 8 KiB attach cutoff → copy-mode frame.
3195 let mut large = Vec::new();
3196 while large.len() < 64 * 1024 {
3197 large.extend_from_slice(b"tenant=demo op=scan range=[k0,k9) limit=500 order=asc\n");
3198 }
3199
3200 let mut reused: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3201 reused
3202 .set_encoder_dictionary(
3203 super::EncoderDictionary::from_bytes(dict_raw).expect("dict bytes should parse"),
3204 )
3205 .expect("prepared dictionary should attach");
3206
3207 let reference = |payload: &[u8]| -> alloc::vec::Vec<u8> {
3208 let mut fresh: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3209 fresh
3210 .set_encoder_dictionary(
3211 super::EncoderDictionary::from_bytes(dict_raw)
3212 .expect("dict bytes should parse"),
3213 )
3214 .expect("prepared dictionary should attach");
3215 fresh.compress_independent_frame(payload)
3216 };
3217
3218 let small_expected = reference(&small);
3219 let large_expected = reference(&large);
3220
3221 // 1. Long attach-only run: every frame advances the epoch bias.
3222 for i in 0..32 {
3223 let frame = reused.compress_independent_frame(small.as_slice());
3224 assert_eq!(
3225 frame, small_expected,
3226 "attach frame {i} diverged from the fresh-compressor reference"
3227 );
3228 }
3229 // 2. Cutoff alternation: attach → copy → attach → copy.
3230 for i in 0..4 {
3231 let frame = reused.compress_independent_frame(large.as_slice());
3232 assert_eq!(
3233 frame, large_expected,
3234 "copy frame {i} diverged from the fresh-compressor reference"
3235 );
3236 let frame = reused.compress_independent_frame(small.as_slice());
3237 assert_eq!(
3238 frame, small_expected,
3239 "attach frame after copy {i} diverged from the fresh-compressor reference"
3240 );
3241 }
3242 }
3243
3244 #[test]
3245 fn dict_primed_btlazy2_reused_across_attach_and_copy_boundary_is_byte_identical() {
3246 // Btlazy2 (Level 15) uses the 32 KiB dict attach/copy cutoff in
3247 // prepare_frame. Exercise BOTH sides of that boundary on a reused
3248 // compressor: a sub-cutoff payload (re-prime/attach path) and an
3249 // over-cutoff payload (copy-snapshot restore path). In each case the
3250 // warm-cache second frame must reproduce the cold-cache first frame
3251 // byte-for-byte (the dict cache is a timing optimization, never a
3252 // content change), and every frame must round-trip.
3253 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3254 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3255 .expect("dict bytes should parse")
3256 .id();
3257 // Distinct lines so the payload does not trivially self-compress; the
3258 // BT finder + dict dual-probe both get exercised.
3259 let make_payload = |target: usize| {
3260 let mut p = Vec::with_capacity(target);
3261 let mut i = 0u64;
3262 while p.len() < target {
3263 p.extend_from_slice(
3264 format!(
3265 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3266 i % 97
3267 )
3268 .as_bytes(),
3269 );
3270 i += 1;
3271 }
3272 p
3273 };
3274 // Below the 32 KiB cutoff (attach/re-prime) and above it (copy-snapshot).
3275 for target in [16 * 1024usize, 64 * 1024usize] {
3276 let payload = make_payload(target);
3277 let mut warm: FrameCompressor =
3278 FrameCompressor::new(super::CompressionLevel::Level(15));
3279 warm.set_encoder_dictionary(
3280 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3281 )
3282 .expect("dict attach");
3283 // Frame 1 builds + marks the dict tables; frame 2 reuses them.
3284 let frame1 = warm.compress_independent_frame(payload.as_slice());
3285 let frame2 = warm.compress_independent_frame(payload.as_slice());
3286 assert_eq!(
3287 frame1, frame2,
3288 "reused dict cache must reproduce the freshly-primed frame byte-for-byte \
3289 (Level 15, target={target})"
3290 );
3291 // Cold-cache compressor: must match the warm-cache bytes.
3292 let mut cold: FrameCompressor =
3293 FrameCompressor::new(super::CompressionLevel::Level(15));
3294 cold.set_encoder_dictionary(
3295 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3296 )
3297 .expect("dict attach");
3298 let cold_frame = cold.compress_independent_frame(payload.as_slice());
3299 assert_eq!(
3300 cold_frame, frame1,
3301 "cold-cache compressor must match warm-cache frame (Level 15, target={target})"
3302 );
3303 // Round-trip through a decoder primed with the same dict.
3304 for frame in [&frame1, &frame2] {
3305 let (hdr, _) = crate::decoding::frame::read_frame_header(frame.as_slice())
3306 .expect("frame header");
3307 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3308 let mut decoder = FrameDecoder::new();
3309 decoder
3310 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3311 .unwrap();
3312 let mut decoded = Vec::with_capacity(payload.len());
3313 decoder.decode_all_to_vec(frame, &mut decoded).unwrap();
3314 assert_eq!(decoded.as_slice(), payload.as_slice());
3315 }
3316 }
3317 }
3318
3319 #[test]
3320 fn dict_primed_btultra2_restore_is_floor_safe_and_byte_identical() {
3321 // Regression guard for the dictionary primed-snapshot RESTORE path on
3322 // the binary-tree (btultra2 / Level 22) backend — the path a minimal /
3323 // decoupled prepared-dict refactor rewrites.
3324 //
3325 // The trap it pins: a reused compressor compresses frame A (which fills
3326 // the live hash/chain tables with frame-A positions and advances the
3327 // window floor), then frame B of the SAME resolved shape (same size →
3328 // same PrimedKey → the snapshot RESTORE path) but DIFFERENT content. The
3329 // restore must reinstate the clean post-prime dict state with NO live
3330 // frame-A entries surviving above the restored floor; a restore that
3331 // leaks stale frame-A positions would surface FALSE matches and produce
3332 // a different (or undecodable) frame B. The invariant: a snapshot
3333 // restore is a pure timing optimization and MUST be byte-identical to a
3334 // cold compressor compressing frame B from scratch, and must round-trip.
3335 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3336 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3337 .expect("dict bytes should parse")
3338 .id();
3339 // 48 KiB > the btultra2 8 KiB attach cutoff → the copy-snapshot
3340 // capture/restore path. Two distinct payloads of the SAME size so frame
3341 // B resolves to frame A's snapshot key and takes the restore path.
3342 let make_payload = |seed: u64, target: usize| {
3343 let mut p = Vec::with_capacity(target);
3344 let mut i = seed;
3345 while p.len() < target {
3346 p.extend_from_slice(
3347 format!(
3348 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3349 i % 89
3350 )
3351 .as_bytes(),
3352 );
3353 i = i.wrapping_add(1);
3354 }
3355 p.truncate(target);
3356 p
3357 };
3358 let size = 48 * 1024usize;
3359 let frame_a = make_payload(0, size);
3360 let frame_b = make_payload(1_000_000, size);
3361
3362 let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3363 warm.set_encoder_dictionary(
3364 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3365 )
3366 .expect("dict attach");
3367 // Frame A: cold cache — primes the dict + captures the snapshot, and
3368 // fills the live tables with frame-A positions.
3369 let _wa = warm.compress_independent_frame(frame_a.as_slice());
3370 // Frame B: warm cache — takes the snapshot RESTORE path (same size).
3371 let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3372
3373 // Cold compressor compressing frame B from scratch: the ground truth.
3374 let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3375 cold.set_encoder_dictionary(
3376 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3377 )
3378 .expect("dict attach");
3379 let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3380
3381 assert_eq!(
3382 warm_b, cold_b,
3383 "frame B via snapshot restore must be byte-identical to a cold compress \
3384 (a restore that leaks frame-A live-table entries would diverge here)"
3385 );
3386
3387 // Round-trip frame B through a dict-primed decoder.
3388 let (hdr, _) =
3389 crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3390 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3391 let mut decoder = FrameDecoder::new();
3392 decoder
3393 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3394 .unwrap();
3395 let mut decoded = Vec::with_capacity(frame_b.len());
3396 decoder
3397 .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3398 .unwrap();
3399 assert_eq!(decoded.as_slice(), frame_b.as_slice());
3400 }
3401
3402 #[test]
3403 fn dict_primed_btultra2_ldm_restore_is_byte_identical() {
3404 // Same restore-path byte-identity guard as
3405 // `dict_primed_btultra2_restore_is_floor_safe_and_byte_identical`, but
3406 // with long-distance matching ENABLED. The BtMatcher's LDM producer is
3407 // part of the snapshot; a refactor that decouples it (so the snapshot
3408 // does not retain the empty LDM table) must reinstate an equivalent
3409 // empty producer on restore. This pins that the warm-cache (restore)
3410 // frame stays byte-identical to a cold compress when LDM is on.
3411 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3412 let dict_id = super::EncoderDictionary::from_bytes(dict_raw)
3413 .expect("dict bytes should parse")
3414 .id();
3415 let make_payload = |seed: u64, target: usize| {
3416 let mut p = Vec::with_capacity(target);
3417 let mut i = seed;
3418 while p.len() < target {
3419 p.extend_from_slice(
3420 format!(
3421 "tenant=demo op=put key={i} value=aaaaabbbbbcccccddddd-{}\n",
3422 i % 89
3423 )
3424 .as_bytes(),
3425 );
3426 i = i.wrapping_add(1);
3427 }
3428 p.truncate(target);
3429 p
3430 };
3431 let ldm_params =
3432 crate::encoding::CompressionParameters::builder(super::CompressionLevel::Level(22))
3433 .enable_long_distance_matching(true)
3434 .build()
3435 .expect("LDM-only params build");
3436 let size = 48 * 1024usize;
3437 let frame_a = make_payload(0, size);
3438 let frame_b = make_payload(1_000_000, size);
3439
3440 let mut warm: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3441 warm.set_parameters(&ldm_params);
3442 warm.set_encoder_dictionary(
3443 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3444 )
3445 .expect("dict attach");
3446 let _wa = warm.compress_independent_frame(frame_a.as_slice());
3447 let warm_b = warm.compress_independent_frame(frame_b.as_slice());
3448
3449 let mut cold: FrameCompressor = FrameCompressor::new(super::CompressionLevel::Level(22));
3450 cold.set_parameters(&ldm_params);
3451 cold.set_encoder_dictionary(
3452 super::EncoderDictionary::from_bytes(dict_raw).expect("dict parse"),
3453 )
3454 .expect("dict attach");
3455 let cold_b = cold.compress_independent_frame(frame_b.as_slice());
3456
3457 assert_eq!(
3458 warm_b, cold_b,
3459 "LDM-on frame B via snapshot restore must be byte-identical to a cold compress"
3460 );
3461
3462 let (hdr, _) =
3463 crate::decoding::frame::read_frame_header(warm_b.as_slice()).expect("frame header");
3464 assert_eq!(hdr.dictionary_id(), Some(dict_id));
3465 let mut decoder = FrameDecoder::new();
3466 decoder
3467 .add_dict(crate::decoding::Dictionary::decode_dict(dict_raw).unwrap())
3468 .unwrap();
3469 let mut decoded = Vec::with_capacity(frame_b.len());
3470 decoder
3471 .decode_all_to_vec(warm_b.as_slice(), &mut decoded)
3472 .unwrap();
3473 assert_eq!(decoded.as_slice(), frame_b.as_slice());
3474 }
3475
3476 #[test]
3477 fn set_dictionary_from_bytes_matches_full_decode_byte_for_byte() {
3478 // The encoder-only dict parse (`decode_dict_for_encoding`, used by
3479 // `set_dictionary_from_bytes`) skips the FSE/HUF decoder-table build and
3480 // the enrich passes. The encoder entropy tables are derived purely from
3481 // the symbol probabilities / Huffman weights, so the compressed output
3482 // MUST be byte-identical to the full-decode path. This pins the
3483 // load-bearing equivalence so a future FSE/HUF parsing refactor that
3484 // still round-trips but silently diverges on the probabilities/weights
3485 // fails loudly here instead of producing a different (but valid) frame.
3486 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3487 let payload = b"tenant=demo table=orders op=put key=1 value=aaaaabbbbbcccccdddddeeeee\n\
3488 tenant=demo table=orders op=put key=2 value=aaaaabbbbbcccccdddddeeeee\n";
3489
3490 // Path A: encoder-only parse straight from the raw blob.
3491 let mut from_bytes_out = Vec::new();
3492 {
3493 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3494 compressor
3495 .set_dictionary_from_bytes(dict_raw)
3496 .expect("dictionary bytes should parse");
3497 compressor.set_source(payload.as_slice());
3498 compressor.set_drain(&mut from_bytes_out);
3499 compressor.compress();
3500 }
3501
3502 // Path B: full decode (builds the decoder tables too), then attach for
3503 // encoding via the `Dictionary` setter.
3504 let full_decode = crate::decoding::Dictionary::decode_dict(dict_raw)
3505 .expect("dictionary bytes should fully decode");
3506 let mut full_decode_out = Vec::new();
3507 {
3508 let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
3509 compressor
3510 .set_dictionary(full_decode)
3511 .expect("full-decode dictionary should attach");
3512 compressor.set_source(payload.as_slice());
3513 compressor.set_drain(&mut full_decode_out);
3514 compressor.compress();
3515 }
3516
3517 assert_eq!(
3518 from_bytes_out, full_decode_out,
3519 "encoder-only dict parse must produce byte-identical output to the full decode"
3520 );
3521 }
3522
3523 #[test]
3524 fn set_dictionary_rejects_zero_dictionary_id() {
3525 let invalid = crate::decoding::Dictionary {
3526 id: 0,
3527 fse: crate::decoding::scratch::FSEScratch::new(),
3528 huf: crate::decoding::scratch::HuffmanScratch::new(),
3529 dict_content: vec![1, 2, 3],
3530 offset_hist: [1, 4, 8],
3531 };
3532
3533 let mut compressor: FrameCompressor<
3534 &[u8],
3535 Vec<u8>,
3536 crate::encoding::match_generator::MatchGeneratorDriver,
3537 > = FrameCompressor::new(super::CompressionLevel::Fastest);
3538 let result = compressor.set_dictionary(invalid);
3539 assert!(matches!(
3540 result,
3541 Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
3542 ));
3543 }
3544
3545 #[test]
3546 fn set_dictionary_rejects_zero_repeat_offsets() {
3547 let invalid = crate::decoding::Dictionary {
3548 id: 1,
3549 fse: crate::decoding::scratch::FSEScratch::new(),
3550 huf: crate::decoding::scratch::HuffmanScratch::new(),
3551 dict_content: vec![1, 2, 3],
3552 offset_hist: [0, 4, 8],
3553 };
3554
3555 let mut compressor: FrameCompressor<
3556 &[u8],
3557 Vec<u8>,
3558 crate::encoding::match_generator::MatchGeneratorDriver,
3559 > = FrameCompressor::new(super::CompressionLevel::Fastest);
3560 let result = compressor.set_dictionary(invalid);
3561 assert!(matches!(
3562 result,
3563 Err(
3564 crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
3565 index: 0
3566 }
3567 )
3568 ));
3569 }
3570
3571 #[test]
3572 fn uncompressed_mode_does_not_require_dictionary() {
3573 let dict_id = 0xABCD_0001;
3574 let dict =
3575 crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
3576 .expect("raw dictionary should be valid");
3577
3578 let payload = b"plain-bytes-that-should-stay-raw";
3579 let mut output = Vec::new();
3580 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
3581 compressor
3582 .set_dictionary(dict)
3583 .expect("dictionary should attach in uncompressed mode");
3584 compressor.set_source(payload.as_slice());
3585 compressor.set_drain(&mut output);
3586 compressor.compress();
3587
3588 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3589 .expect("encoded frame should have a header");
3590 assert_eq!(
3591 frame_header.dictionary_id(),
3592 None,
3593 "raw/uncompressed frames must not advertise dictionary dependency"
3594 );
3595
3596 let mut decoder = FrameDecoder::new();
3597 let mut decoded = Vec::with_capacity(payload.len());
3598 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3599 assert_eq!(decoded, payload);
3600 }
3601
3602 #[test]
3603 fn default_level_tiny_raw_dict_compresses_cleanly() {
3604 // Coverage for the dfast dict-attach fast path with a
3605 // sub-min-match raw-content dictionary: the dict-table probe in
3606 // `start_matching_fast_loop` is gated on the dict table actually
3607 // existing (`table().is_some()`), not merely on `is_attached()`,
3608 // so a dictionary whose hashable region is shorter than the
3609 // short-hash lookahead (where `prime_dict_tables_for_range`
3610 // returns before allocating the tables) never dereferences a
3611 // null dict pointer. Compressing at the default (dfast) level
3612 // with such a dict must succeed.
3613 let dict_id = 0xABCD_0009;
3614 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3615 .expect("raw dictionary should be valid");
3616 let payload = b"the quick brown fox jumps over the lazy dog, repeatedly and at length";
3617 let mut output = Vec::new();
3618 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
3619 compressor
3620 .set_dictionary(dict)
3621 .expect("tiny raw dictionary should attach");
3622 compressor.set_source(payload.as_slice());
3623 compressor.set_drain(&mut output);
3624 compressor.compress();
3625 assert!(!output.is_empty(), "compression should produce a frame");
3626
3627 // The emitted frame must advertise the attached dictionary id, proving
3628 // the tiny-dict path stayed active (the payload round-trips either way,
3629 // so without this the test would also pass on a silent no-dict frame).
3630 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3631 .expect("encoded frame should have a readable header");
3632 assert_eq!(
3633 frame_header.dictionary_id(),
3634 Some(dict_id),
3635 "tiny raw dict frame should still advertise its dictionary id",
3636 );
3637
3638 // Full roundtrip: decode the dict-compressed frame with the SAME
3639 // dictionary attached and confirm byte-exact recovery — proves the
3640 // tiny-dict fast path produces a correct frame, not just a non-empty
3641 // one.
3642 let decode_dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abc".to_vec())
3643 .expect("raw dictionary should be valid");
3644 let mut decoder = FrameDecoder::new();
3645 decoder
3646 .add_dict(decode_dict)
3647 .expect("decoder dict should attach");
3648 let mut decoded = Vec::with_capacity(payload.len());
3649 decoder
3650 .decode_all_to_vec(&output, &mut decoded)
3651 .expect("dict roundtrip should decode");
3652 assert_eq!(decoded, payload, "tiny-dict roundtrip mismatch");
3653 }
3654
3655 /// Exercises the dictionary dual-probe (live + immutable dict tables)
3656 /// in the Fast / dfast / Row match finders with a dict whose content
3657 /// the payload actually reuses, so each backend's dict long/short
3658 /// probe (and the dfast `ip+1` dict-long retry) is reached and the
3659 /// dict-compressed frame round-trips through a decoder primed with the
3660 /// same dict. The 3-byte-dict test above only proves the null-table
3661 /// guard; this proves the full attach path produces correct frames.
3662 #[test]
3663 fn dict_attach_roundtrips_across_backends_with_matching_payload() {
3664 let dict_id = 0xD1C7_0001;
3665 // Distinct lines so the payload does NOT self-compress: each line
3666 // appears exactly once in the payload, so without the dictionary there
3667 // are no in-frame back-references to exploit. The dictionary holds the
3668 // SAME lines, so the only way the output shrinks is if the dict probe
3669 // actually fires. A no-dict baseline below pins that the dict path ran
3670 // (self-compressible payloads would round-trip + stay small via
3671 // in-frame matches alone, proving nothing).
3672 let line = |i: u32| {
3673 alloc::format!(
3674 "ts=2026-03-26T21:{:02}:{:02}Z level=INFO msg=\"event {i:05}\" tenant=t{i} region=eu\n",
3675 i / 60 % 60,
3676 i % 60,
3677 )
3678 .into_bytes()
3679 };
3680 let mut dict_content = Vec::new();
3681 for i in 0..256u32 {
3682 dict_content.extend_from_slice(&line(i));
3683 }
3684 // Payload = the same distinct lines in a different (stride) order, each
3685 // once → no self-repeats, every line is a dictionary match.
3686 let mut payload = Vec::new();
3687 let mut i = 0u32;
3688 for _ in 0..256u32 {
3689 payload.extend_from_slice(&line(i));
3690 i = (i + 97) % 256; // coprime stride → permutation, no adjacency
3691 }
3692
3693 let compress_at = |level, dict: Option<Vec<u8>>| -> Vec<u8> {
3694 let mut compressor = FrameCompressor::new(level);
3695 if let Some(bytes) = dict {
3696 let d = crate::decoding::Dictionary::from_raw_content(dict_id, bytes)
3697 .expect("raw dictionary should be valid");
3698 compressor
3699 .set_dictionary(d)
3700 .expect("dictionary should attach");
3701 }
3702 let mut out = Vec::new();
3703 compressor.set_source(payload.as_slice());
3704 compressor.set_drain(&mut out);
3705 compressor.compress();
3706 out
3707 };
3708
3709 for level in [
3710 super::CompressionLevel::Level(-5), // Fast (negative)
3711 super::CompressionLevel::Level(1), // Fast
3712 super::CompressionLevel::Default, // dfast (L3)
3713 super::CompressionLevel::Level(8), // Row-backed lazy2
3714 ] {
3715 let out = compress_at(level, Some(dict_content.clone()));
3716 let no_dict = compress_at(level, None);
3717 // The dict path MUST measurably beat no-dict on this
3718 // non-self-compressible payload — otherwise the dict probe never
3719 // fired and the roundtrip below would prove nothing.
3720 assert!(
3721 out.len() < no_dict.len(),
3722 "level {level:?}: dict-primed output ({}) must beat no-dict ({}) — dict probe did not fire",
3723 out.len(),
3724 no_dict.len(),
3725 );
3726
3727 let ddict =
3728 crate::decoding::Dictionary::from_raw_content(dict_id, dict_content.clone())
3729 .expect("raw dictionary should be valid");
3730 let mut decoder = FrameDecoder::new();
3731 decoder.add_dict(ddict).expect("decoder dict should attach");
3732 let mut decoded = Vec::with_capacity(payload.len());
3733 decoder
3734 .decode_all_to_vec(&out, &mut decoded)
3735 .unwrap_or_else(|e| panic!("level {level:?}: dict roundtrip decode failed: {e:?}"));
3736 assert_eq!(decoded, payload, "level {level:?}: dict roundtrip mismatch");
3737 }
3738 }
3739
3740 /// Reusing one compressor across independent frames with DIFFERENT
3741 /// dictionaries must drop the per-backend dict cache on each swap
3742 /// (Simple/Dfast/Row keep the attach index across frames). Without the
3743 /// invalidation a later frame would reuse the previous dict's rows.
3744 /// Each frame round-trips through a decoder primed with its own dict.
3745 #[test]
3746 fn dict_swap_across_reused_compressor_roundtrips() {
3747 // Distinct lines per dict (not a single repeated line) so payloads do
3748 // NOT self-compress: each line appears once, so a frame only shrinks if
3749 // the dict probe fires, and — crucially for the invalidation check — if
3750 // frame B reused dict A's stale rows it would emit offsets into A's
3751 // distinct content, which decode under dict B reconstructs as WRONG
3752 // bytes (caught by the roundtrip). A single repeated line would hide
3753 // pollution behind in-frame matches.
3754 let lines = |tag: &str| -> (Vec<u8>, Vec<u8>) {
3755 let line =
3756 |i: u32| alloc::format!("{tag} record {i:05} field=value{i} end\n").into_bytes();
3757 let mut dict = Vec::new();
3758 for i in 0..256u32 {
3759 dict.extend_from_slice(&line(i));
3760 }
3761 let mut payload = Vec::new();
3762 let mut i = 0u32;
3763 for _ in 0..256u32 {
3764 payload.extend_from_slice(&line(i));
3765 i = (i + 97) % 256;
3766 }
3767 (dict, payload)
3768 };
3769 let (dict_a, payload_a) = lines("alpha");
3770 let (dict_b, payload_b) = lines("bravo");
3771
3772 for level in [
3773 super::CompressionLevel::Default,
3774 super::CompressionLevel::Level(8),
3775 ] {
3776 let no_dict = |payload: &[u8]| -> usize {
3777 let mut c: FrameCompressor = FrameCompressor::new(level);
3778 c.compress_independent_frame(payload).len()
3779 };
3780 let no_dict_a = no_dict(&payload_a);
3781 let no_dict_b = no_dict(&payload_b);
3782
3783 let mut compressor: FrameCompressor = FrameCompressor::new(level);
3784 for (dict_bytes, payload, no_dict_len) in [
3785 (&dict_a, &payload_a, no_dict_a),
3786 (&dict_b, &payload_b, no_dict_b),
3787 ] {
3788 let dict =
3789 crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
3790 .expect("raw dictionary should be valid");
3791 compressor
3792 .set_dictionary(dict)
3793 .expect("dictionary should attach");
3794 let out = compressor.compress_independent_frame(payload.as_slice());
3795 assert!(
3796 out.len() < no_dict_len,
3797 "level {level:?}: dict frame ({}) must beat no-dict ({}) — dict probe did not fire",
3798 out.len(),
3799 no_dict_len,
3800 );
3801
3802 let ddict =
3803 crate::decoding::Dictionary::from_raw_content(0xD1C7_0002, dict_bytes.clone())
3804 .expect("raw dictionary should be valid");
3805 let mut decoder = FrameDecoder::new();
3806 decoder.add_dict(ddict).expect("decoder dict should attach");
3807 let mut decoded = Vec::with_capacity(payload.len());
3808 decoder
3809 .decode_all_to_vec(&out, &mut decoded)
3810 .unwrap_or_else(|e| panic!("level {level:?}: dict-swap decode failed: {e:?}"));
3811 assert_eq!(
3812 decoded, *payload,
3813 "level {level:?}: dict-swap roundtrip mismatch (stale dict rows?)"
3814 );
3815 }
3816 }
3817 }
3818
3819 #[test]
3820 fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
3821 use crate::encoding::match_generator::MatchGeneratorDriver;
3822
3823 let dict_id = 0xABCD_0002;
3824 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
3825 .expect("raw dictionary should be valid");
3826 let dict_for_decoder =
3827 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
3828 .expect("raw dictionary should be valid");
3829
3830 // Payload must exceed the encoder's advertised window (512 KiB
3831 // for Fastest after `window_log = 19` alignment with upstream zstd's
3832 // L1 fast row in `clevels.h`) so the test actually exercises
3833 // cross-window-boundary behavior.
3834 let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
3835 let matcher = MatchGeneratorDriver::new(1024, 1);
3836
3837 let mut no_dict_output = Vec::new();
3838 let mut no_dict_compressor =
3839 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
3840 no_dict_compressor.set_source(payload.as_slice());
3841 no_dict_compressor.set_drain(&mut no_dict_output);
3842 no_dict_compressor.compress();
3843 let (no_dict_frame_header, _) =
3844 crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
3845 .expect("baseline frame should have a header");
3846 let no_dict_window = no_dict_frame_header
3847 .window_size()
3848 .expect("window size should be present");
3849
3850 let mut output = Vec::new();
3851 let matcher = MatchGeneratorDriver::new(1024, 1);
3852 let mut compressor =
3853 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
3854 compressor
3855 .set_dictionary(dict)
3856 .expect("dictionary should attach");
3857 compressor.set_source(payload.as_slice());
3858 compressor.set_drain(&mut output);
3859 compressor.compress();
3860
3861 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
3862 .expect("encoded frame should have a header");
3863 let advertised_window = frame_header
3864 .window_size()
3865 .expect("window size should be present");
3866 assert_eq!(
3867 advertised_window, no_dict_window,
3868 "dictionary priming must not inflate advertised window size"
3869 );
3870 assert!(
3871 payload.len() > advertised_window as usize,
3872 "test must cross the advertised window boundary"
3873 );
3874
3875 let mut decoder = FrameDecoder::new();
3876 decoder.add_dict(dict_for_decoder).unwrap();
3877 let mut decoded = Vec::with_capacity(payload.len());
3878 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
3879 assert_eq!(decoded, payload);
3880 }
3881
3882 #[test]
3883 fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
3884 let dict_id = 0xABCD_0004;
3885 let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
3886 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
3887 let dict_for_decoder =
3888 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
3889 let payload = b"abcdabcdabcdabcd".repeat(128);
3890
3891 let mut hinted_output = Vec::new();
3892 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
3893 hinted.set_dictionary(dict).unwrap();
3894 hinted.set_source_size_hint(1);
3895 hinted.set_source(payload.as_slice());
3896 hinted.set_drain(&mut hinted_output);
3897 hinted.compress();
3898
3899 let mut no_hint_output = Vec::new();
3900 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
3901 no_hint
3902 .set_dictionary(
3903 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
3904 .unwrap(),
3905 )
3906 .unwrap();
3907 no_hint.set_source(payload.as_slice());
3908 no_hint.set_drain(&mut no_hint_output);
3909 no_hint.compress();
3910
3911 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
3912 .expect("encoded frame should have a header")
3913 .0
3914 .window_size()
3915 .expect("window size should be present");
3916 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
3917 .expect("encoded frame should have a header")
3918 .0
3919 .window_size()
3920 .expect("window size should be present");
3921 assert!(
3922 hinted_window <= no_hint_window,
3923 "source-size hint should not increase advertised window with dictionary priming",
3924 );
3925
3926 let mut decoder = FrameDecoder::new();
3927 decoder.add_dict(dict_for_decoder).unwrap();
3928 let mut decoded = Vec::with_capacity(payload.len());
3929 decoder
3930 .decode_all_to_vec(&hinted_output, &mut decoded)
3931 .unwrap();
3932 assert_eq!(decoded, payload);
3933 }
3934
3935 /// A dictionary segment embedded ONCE in otherwise-incompressible
3936 /// input must be matched against the dictionary. Before the fix the
3937 /// raw-fast-path (which skips matching) fired on the
3938 /// incompressible-looking block and the dictionary was never searched,
3939 /// so `with_dict` came out the same size as `no_dict` (the embedded
3940 /// match was lost). Now the block compresses against the dict.
3941 #[test]
3942 fn dictionary_segment_in_incompressible_input_is_matched() {
3943 // Deterministic LCG bytes: high-entropy, so the only compressible
3944 // content is the embedded dictionary segment.
3945 fn lcg(seed: u64, n: usize) -> alloc::vec::Vec<u8> {
3946 let mut s = seed;
3947 (0..n)
3948 .map(|_| {
3949 s = s
3950 .wrapping_mul(6364136223846793005)
3951 .wrapping_add(1442695040888963407);
3952 (s >> 56) as u8
3953 })
3954 .collect()
3955 }
3956 let dict_id = 0x00DC_7777;
3957 let r = lcg(1, 512); // the dictionary content
3958 let mut payload = lcg(2, 2000); // incompressible filler before
3959 payload.extend_from_slice(&r); // the single dict-matchable segment
3960 payload.extend_from_slice(&lcg(3, 1500)); // filler after
3961
3962 // Precondition: the payload must actually look incompressible so
3963 // that the raw-fast-path WOULD fire (and skip matching) without
3964 // the fix. If the heuristic ever changes and this no longer holds,
3965 // the test below would pass vacuously — assert it up front.
3966 assert!(
3967 crate::encoding::incompressible::block_looks_incompressible(&payload),
3968 "test payload must look incompressible to exercise the raw-fast-path",
3969 );
3970
3971 let compress =
3972 |level: super::CompressionLevel, dict: Option<&[u8]>| -> alloc::vec::Vec<u8> {
3973 let mut out = alloc::vec::Vec::new();
3974 let mut c = FrameCompressor::new(level);
3975 if let Some(d) = dict {
3976 c.set_dictionary(
3977 crate::decoding::Dictionary::from_raw_content(dict_id, d.to_vec()).unwrap(),
3978 )
3979 .unwrap();
3980 }
3981 c.set_source(payload.as_slice());
3982 c.set_drain(&mut out);
3983 c.compress();
3984 out
3985 };
3986
3987 for lvl in [
3988 super::CompressionLevel::Level(2),
3989 super::CompressionLevel::Level(6),
3990 super::CompressionLevel::Level(19),
3991 ] {
3992 let with_dict = compress(lvl, Some(&r));
3993 let no_dict = compress(lvl, None);
3994 // The 512-byte dict segment should be matched, saving most of
3995 // its length (generous slack for sequence/header coding).
3996 assert!(
3997 with_dict.len() + 300 < no_dict.len(),
3998 "{lvl:?}: dict segment not matched (with_dict={}, no_dict={})",
3999 with_dict.len(),
4000 no_dict.len(),
4001 );
4002 // The dict-compressed frame must round-trip through the decoder.
4003 let mut decoder = FrameDecoder::new();
4004 decoder
4005 .add_dict(
4006 crate::decoding::Dictionary::from_raw_content(dict_id, r.clone()).unwrap(),
4007 )
4008 .unwrap();
4009 let mut decoded = Vec::with_capacity(payload.len());
4010 decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
4011 assert_eq!(decoded, payload, "{lvl:?}: dict round-trip mismatch");
4012
4013 // A dictionary that does NOT appear in the input must not make
4014 // the output larger than the no-dict (raw) encoding: the
4015 // post-compress raw fallback covers incompressible-with-dict.
4016 let unrelated = lcg(99, 512);
4017 let with_bad_dict = compress(lvl, Some(&unrelated));
4018 assert!(
4019 with_bad_dict.len() <= no_dict.len() + 16,
4020 "{lvl:?}: unhelpful dict expanded output (with={}, no_dict={})",
4021 with_bad_dict.len(),
4022 no_dict.len(),
4023 );
4024 }
4025 }
4026
4027 #[test]
4028 fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
4029 let dict_id = 0xABCD_0005;
4030 let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
4031 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
4032 let dict_for_decoder =
4033 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
4034 let payload = b"abcd".repeat(1024); // 4 KiB payload
4035 let payload_len = payload.len() as u64;
4036
4037 let mut hinted_output = Vec::new();
4038 let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
4039 hinted.set_dictionary(dict).unwrap();
4040 hinted.set_source_size_hint(payload_len);
4041 hinted.set_source(payload.as_slice());
4042 hinted.set_drain(&mut hinted_output);
4043 hinted.compress();
4044
4045 let mut no_hint_output = Vec::new();
4046 let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
4047 no_hint
4048 .set_dictionary(
4049 crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
4050 .unwrap(),
4051 )
4052 .unwrap();
4053 no_hint.set_source(payload.as_slice());
4054 no_hint.set_drain(&mut no_hint_output);
4055 no_hint.compress();
4056
4057 let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
4058 .expect("encoded frame should have a header")
4059 .0
4060 .window_size()
4061 .expect("window size should be present");
4062 let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
4063 .expect("encoded frame should have a header")
4064 .0
4065 .window_size()
4066 .expect("window size should be present");
4067 assert!(
4068 hinted_window <= no_hint_window,
4069 "source-size hint should not increase advertised window with dictionary priming",
4070 );
4071
4072 let mut decoder = FrameDecoder::new();
4073 decoder.add_dict(dict_for_decoder).unwrap();
4074 let mut decoded = Vec::with_capacity(payload.len());
4075 decoder
4076 .decode_all_to_vec(&hinted_output, &mut decoded)
4077 .unwrap();
4078 assert_eq!(decoded, payload);
4079 }
4080
4081 #[test]
4082 fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
4083 let dict_id = 0xABCD_0003;
4084 let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
4085 .expect("raw dictionary should be valid");
4086 let payload = b"abcdefghabcdefgh";
4087
4088 let mut output = Vec::new();
4089 let matcher = NoDictionaryMatcher::new(64);
4090 let mut compressor =
4091 FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
4092 compressor
4093 .set_dictionary(dict)
4094 .expect("dictionary should attach");
4095 compressor.set_source(payload.as_slice());
4096 compressor.set_drain(&mut output);
4097 compressor.compress();
4098
4099 let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
4100 .expect("encoded frame should have a header");
4101 assert_eq!(
4102 frame_header.dictionary_id(),
4103 None,
4104 "matchers that do not support dictionary priming must not advertise dictionary dependency"
4105 );
4106
4107 let mut decoder = FrameDecoder::new();
4108 let mut decoded = Vec::with_capacity(payload.len());
4109 decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
4110 assert_eq!(decoded, payload);
4111 }
4112
4113 #[cfg(feature = "hash")]
4114 #[test]
4115 fn checksum_two_frames_reused_compressor() {
4116 // Compress the same data twice using the same compressor and verify that:
4117 // 1. The checksum written in each frame matches what the decoder calculates.
4118 // 2. The hasher is correctly reset between frames (no cross-contamination).
4119 // If the hasher were NOT reset, the second frame's calculated checksum
4120 // would differ from the one stored in the frame data, causing assert_eq to fail.
4121 let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
4122
4123 let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
4124
4125 // --- Frame 1 ---
4126 let mut compressed1 = Vec::new();
4127 compressor.set_source(data.as_slice());
4128 compressor.set_drain(&mut compressed1);
4129 compressor.compress();
4130
4131 // --- Frame 2 (reuse the same compressor) ---
4132 let mut compressed2 = Vec::new();
4133 compressor.set_source(data.as_slice());
4134 compressor.set_drain(&mut compressed2);
4135 compressor.compress();
4136
4137 fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
4138 let mut decoder = FrameDecoder::new();
4139 let mut source = compressed;
4140 decoder.reset(&mut source).unwrap();
4141 while !decoder.is_finished() {
4142 decoder
4143 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4144 .unwrap();
4145 }
4146 let mut decoded = Vec::new();
4147 decoder.collect_to_writer(&mut decoded).unwrap();
4148 (
4149 decoded,
4150 decoder.get_checksum_from_data(),
4151 decoder.get_calculated_checksum(),
4152 )
4153 }
4154
4155 let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
4156 assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
4157 assert_eq!(
4158 chksum_from_data1, chksum_calculated1,
4159 "frame 1: checksum mismatch"
4160 );
4161
4162 let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
4163 assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
4164 assert_eq!(
4165 chksum_from_data2, chksum_calculated2,
4166 "frame 2: checksum mismatch"
4167 );
4168
4169 // Same data compressed twice must produce the same checksum.
4170 // If state leaked across frames, the second calculated checksum would differ.
4171 assert_eq!(
4172 chksum_from_data1, chksum_from_data2,
4173 "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
4174 );
4175 }
4176
4177 #[cfg(feature = "lsm")]
4178 #[test]
4179 fn frame_emit_info_decompressed_ranges_match_decoded_output() {
4180 // Part A correctness: the per-block `decompressed_size` captured during
4181 // encode (and the `decompressed_byte_range` prefix sum derived from it)
4182 // must describe the real decoded output exactly — one entry per
4183 // physical block, contiguous, summing to the full decompressed length.
4184 // A multi-block compressible payload exercises the Compressed-block
4185 // path (whose regenerated size is NOT on the wire, so it relies on the
4186 // encode-side capture this test guards).
4187 let data = emit_info_fixture_data();
4188
4189 // Cover both the single-block-per-chunk path (Default) and the
4190 // Level(16..=22) post-split path (multiple physical partitions per
4191 // input chunk), since lsm-tree compresses at zstd:22 and post-split
4192 // is the riskiest capture site (per-partition `src_size`).
4193 for level in [
4194 super::CompressionLevel::Default,
4195 super::CompressionLevel::Level(22),
4196 ] {
4197 let mut compressed = Vec::new();
4198 let mut compressor = FrameCompressor::new(level);
4199 // Pledge the source size so the high-level (22) window shrinks to
4200 // fit the payload, keeping the frame compact (no oversized window
4201 // descriptor for a small input). Still >= 128 KiB, so post-split
4202 // eligibility is preserved.
4203 compressor.set_source_size_hint(data.len() as u64);
4204 compressor.set_source(data.as_slice());
4205 compressor.set_drain(&mut compressed);
4206 compressor.compress();
4207
4208 let info = compressor
4209 .last_frame_emit_info()
4210 .expect("emit info populated after compress")
4211 .clone();
4212
4213 // Reference: full decode of the same frame.
4214 let mut decoder = FrameDecoder::new();
4215 let mut source = compressed.as_slice();
4216 decoder.reset(&mut source).unwrap();
4217 while !decoder.is_finished() {
4218 decoder
4219 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4220 .unwrap();
4221 }
4222 let mut decoded = Vec::new();
4223 decoder.collect_to_writer(&mut decoded).unwrap();
4224 assert_eq!(decoded, data, "sanity: frame must round-trip ({level:?})");
4225
4226 assert!(
4227 info.blocks.len() >= 2,
4228 "fixture must span multiple blocks to exercise the mapping ({level:?}, got {})",
4229 info.blocks.len()
4230 );
4231 assert!(
4232 info.blocks.last().unwrap().last_block,
4233 "final block must carry last_block ({level:?})"
4234 );
4235
4236 // Pin the Level(22) post-split path: the owned loop feeds the
4237 // encoder MAX_BLOCK_SIZE input chunks, so without post-split the
4238 // block count cannot exceed the chunk count. More blocks than
4239 // chunks proves at least one chunk was split into multiple physical
4240 // partitions (the per-partition `src_size` capture under test).
4241 if matches!(level, super::CompressionLevel::Level(22)) {
4242 let max_block = crate::common::MAX_BLOCK_SIZE as usize;
4243 let n_chunks = data.len().div_ceil(max_block);
4244 assert!(
4245 info.blocks.len() > n_chunks,
4246 "Level(22) must exercise post-split: {} blocks for {} input chunks",
4247 info.blocks.len(),
4248 n_chunks
4249 );
4250 }
4251
4252 // Per-block ranges: contiguous, zero-based, summing to the full output.
4253 let mut expected_start = 0u64;
4254 for i in 0..info.blocks.len() {
4255 let range = info
4256 .decompressed_byte_range(i)
4257 .expect("in-bounds block has a range");
4258 assert_eq!(
4259 range.start, expected_start,
4260 "block {i} range must start where the previous ended ({level:?})"
4261 );
4262 assert_eq!(
4263 u64::from(info.blocks[i].decompressed_size),
4264 range.end - range.start,
4265 "block {i} decompressed_size must equal its range width ({level:?})"
4266 );
4267 // Validate the mapping against REAL per-block bytes, not just
4268 // prefix-sum consistency: decode block `i` alone and require it
4269 // to equal the corresponding slice of the full decode. A
4270 // sidecar that swapped sizes between adjacent blocks (same sum,
4271 // same contiguity) would fail here.
4272 let mut psrc = compressed.as_slice();
4273 let mut pdec = FrameDecoder::new();
4274 pdec.reset(&mut psrc).unwrap();
4275 let pd = pdec
4276 .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4277 .unwrap();
4278 assert!(
4279 pd.stopped_at.is_none(),
4280 "block {i} must decode cleanly ({level:?})"
4281 );
4282 assert_eq!(
4283 pd.data.as_slice(),
4284 &decoded[range.start as usize..range.end as usize],
4285 "block {i} partial-decode bytes must equal the full-decode slice ({level:?})"
4286 );
4287 expected_start = range.end;
4288 }
4289 assert_eq!(
4290 expected_start,
4291 decoded.len() as u64,
4292 "block decompressed sizes must sum to the full decoded length ({level:?})"
4293 );
4294 assert_eq!(
4295 info.decompressed_byte_range(info.blocks.len()),
4296 None,
4297 "out-of-range index yields None ({level:?})"
4298 );
4299 }
4300 }
4301
4302 /// ~400 KiB semi-repetitive payload (long runs interleaved with a stride
4303 /// phrase) that compresses into several multi-block frames across levels.
4304 #[cfg(feature = "lsm")]
4305 fn emit_info_fixture_data() -> Vec<u8> {
4306 let mut data: Vec<u8> = Vec::with_capacity(400 * 1024);
4307 let mut x = 0x9E37_79B9u32;
4308 while data.len() < 400 * 1024 {
4309 x ^= x << 13;
4310 x ^= x >> 17;
4311 x ^= x << 5;
4312 let run = 16 + (x as usize % 48);
4313 let byte = (x >> 24) as u8;
4314 for _ in 0..run {
4315 data.push(byte);
4316 }
4317 data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4318 }
4319 data
4320 }
4321
4322 #[cfg(feature = "lsm")]
4323 #[test]
4324 fn frame_emit_info_decompressed_ranges_match_on_borrowed_oneshot_path() {
4325 // The borrowed one-shot path (`compress_independent_frame` ->
4326 // `run_borrowed_block_loop` -> `compress_block_encoded_borrowed`)
4327 // threads the decompressed-size sidecar through a DIFFERENT emit site
4328 // than the owned/streaming loop, so it needs its own per-block mapping
4329 // check. A Fast level keeps the encoder on the borrowed-eligible
4330 // (Simple matcher) path.
4331 let data = emit_info_fixture_data();
4332
4333 let mut compressor: FrameCompressor =
4334 FrameCompressor::new(super::CompressionLevel::Fastest);
4335 let compressed = compressor.compress_independent_frame(data.as_slice());
4336 let info = compressor
4337 .last_frame_emit_info()
4338 .expect("emit info populated after compress_independent_frame")
4339 .clone();
4340 // Pin the compressed-block path: without this the fixture could regress
4341 // into the raw-fast fallback and still pass via the Raw wire-size
4342 // fallback in populate_frame_emit_info, never exercising the borrowed
4343 // compressed-block sidecar capture this test targets.
4344 assert!(
4345 info.blocks
4346 .iter()
4347 .any(|b| matches!(b.block_type, crate::blocks::block::BlockType::Compressed)),
4348 "borrowed-path fixture must emit at least one compressed block"
4349 );
4350 assert!(
4351 info.blocks.len() >= 2,
4352 "borrowed fixture must span multiple blocks (got {})",
4353 info.blocks.len()
4354 );
4355 assert!(info.blocks.last().unwrap().last_block);
4356
4357 // Full decode reference.
4358 let mut decoder = FrameDecoder::new();
4359 let mut source = compressed.as_slice();
4360 decoder.reset(&mut source).unwrap();
4361 while !decoder.is_finished() {
4362 decoder
4363 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4364 .unwrap();
4365 }
4366 let mut decoded = Vec::new();
4367 decoder.collect_to_writer(&mut decoded).unwrap();
4368 assert_eq!(decoded, data, "borrowed one-shot frame must round-trip");
4369
4370 // Each block's mapping must match real per-block bytes.
4371 let mut expected_start = 0u64;
4372 for i in 0..info.blocks.len() {
4373 let range = info.decompressed_byte_range(i).unwrap();
4374 assert_eq!(range.start, expected_start, "block {i} range contiguity");
4375 let mut psrc = compressed.as_slice();
4376 let mut pdec = FrameDecoder::new();
4377 pdec.reset(&mut psrc).unwrap();
4378 let pd = pdec
4379 .decode_blocks_partial(&mut psrc, i as u32, i as u32 + 1, None, false)
4380 .unwrap();
4381 assert!(pd.stopped_at.is_none(), "block {i} must decode cleanly");
4382 assert_eq!(
4383 pd.data.as_slice(),
4384 &decoded[range.start as usize..range.end as usize],
4385 "borrowed block {i} partial-decode bytes must equal the full-decode slice"
4386 );
4387 expected_start = range.end;
4388 }
4389 assert_eq!(
4390 expected_start,
4391 decoded.len() as u64,
4392 "ranges sum to full length"
4393 );
4394 }
4395
4396 // The fuzz-artifact interop replay (C-compress -> our-decode and
4397 // our-compress -> C-decode) moved to `ffi-bench/tests/fuzz_interop.rs` so
4398 // the library crate never links libzstd.
4399
4400 /// Homogeneous input — every byte the same — must NOT be split:
4401 /// both border histograms are identical (all 512 hits on a single
4402 /// slot), so `presplit_fingerprints_differ` returns `false` and the
4403 /// function takes the early-return path at
4404 /// `zstd_preSplit.c:214` returning `blockSize`.
4405 #[test]
4406 fn split_block_from_borders_keeps_homogeneous_block() {
4407 let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
4408 let split = super::split_block_from_borders(&block);
4409 assert_eq!(split, MAX_BLOCK_SIZE as usize);
4410 }
4411
4412 /// Heterogeneous input — first half all zeros, second half a
4413 /// counter sequence — has clearly distinguishable border
4414 /// histograms, so the borders heuristic decides to split.
4415 ///
4416 /// The transition sits at exactly the block midpoint, so the
4417 /// middle 512-byte sample (`block[mid-256..mid+256]`) is half
4418 /// zeros + half counter values. That makes it roughly
4419 /// equidistant from both border fingerprints — the
4420 /// `abs_diff(dist_from_begin, dist_from_end) < min_distance`
4421 /// branch fires and the heuristic returns the midpoint (64 KiB)
4422 /// per `zstd_preSplit.c:222`. The test asserts the exact value
4423 /// rather than just "one of {32K, 64K, 96K}" so a regression
4424 /// to a different quantised arm cannot silently slip through.
4425 #[test]
4426 fn split_block_from_borders_returns_midpoint_for_centred_transition() {
4427 let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
4428 for (i, byte) in block
4429 .iter_mut()
4430 .enumerate()
4431 .skip(MAX_BLOCK_SIZE as usize / 2)
4432 {
4433 *byte = (i % 251 + 1) as u8;
4434 }
4435 let split = super::split_block_from_borders(&block);
4436 assert_eq!(
4437 split,
4438 64 * 1024,
4439 "centred-transition fixture must take the symmetric \
4440 midpoint arm (`abs_diff < min_distance`), got {split}"
4441 );
4442 }
4443
4444 /// `level_pre_split` resolves the per-level split knob through the
4445 /// `LevelParams` table, returning the EFFECTIVE upstream `ZSTD_splitBlock`
4446 /// level (`splitLevels[strategy] - 2`, clamped at 0) that
4447 /// `ZSTD_optimalBlockSize` dispatches: fast/dfast/greedy/lazy(d1) → 0
4448 /// (from-borders), lazy2/btlazy2 → 1 (byChunks rate 43),
4449 /// btopt/btultra/btultra2 → 2 (byChunks rate 11). `Uncompressed` has no
4450 /// numeric level so it stays `None`.
4451 #[test]
4452 fn pre_split_level_dispatches_by_compression_level() {
4453 use crate::encoding::CompressionLevel;
4454 use crate::encoding::match_generator::level_pre_split;
4455 assert_eq!(level_pre_split(CompressionLevel::Uncompressed), None);
4456 // Fastest = level 1 (fast) → 0 (from-borders).
4457 assert_eq!(level_pre_split(CompressionLevel::Fastest), Some(0));
4458 // Default = level 3 (dfast) → 0 (splitLevels 1 - 2, clamped).
4459 assert_eq!(level_pre_split(CompressionLevel::Default), Some(0));
4460 // Better is a pure alias for level 7 (lazy): same as Level(7).
4461 assert_eq!(
4462 level_pre_split(CompressionLevel::Better),
4463 level_pre_split(CompressionLevel::Level(7)),
4464 );
4465 // Best resolves to the level-13 table row (btlazy2): pin it to that
4466 // numeric route so the named path can't drift from the pre-split
4467 // table.
4468 assert_eq!(
4469 level_pre_split(CompressionLevel::Best),
4470 level_pre_split(CompressionLevel::Level(13)),
4471 );
4472 assert_eq!(level_pre_split(CompressionLevel::Level(2)), Some(0)); // fast
4473 assert_eq!(level_pre_split(CompressionLevel::Level(4)), Some(0)); // dfast
4474 assert_eq!(level_pre_split(CompressionLevel::Level(5)), Some(0)); // greedy
4475 assert_eq!(level_pre_split(CompressionLevel::Level(7)), Some(0)); // lazy (depth 1)
4476 // lazy2 / btlazy2: splitLevels 3 - 2 = 1 (byChunks rate 43, hashLog 8).
4477 // The coarse byte-histogram tier is robust to the periodic-input
4478 // phantom-split the rate-5/hashLog-10 tier suffered, so it matches
4479 // upstream AND stays whole on periodic input (`periodic_stream_not_oversplit`).
4480 assert_eq!(level_pre_split(CompressionLevel::Level(8)), Some(1)); // lazy2 lower bound
4481 assert_eq!(level_pre_split(CompressionLevel::Level(11)), Some(1)); // lazy2 (depth 2)
4482 assert_eq!(level_pre_split(CompressionLevel::Level(12)), Some(1)); // lazy2 upper bound
4483 assert_eq!(level_pre_split(CompressionLevel::Level(13)), Some(1)); // btlazy2 lower bound
4484 assert_eq!(level_pre_split(CompressionLevel::Level(15)), Some(1)); // btlazy2 (depth 2)
4485 assert_eq!(level_pre_split(CompressionLevel::Level(16)), Some(2)); // btopt
4486 assert_eq!(level_pre_split(CompressionLevel::Level(22)), Some(2)); // btultra2
4487 }
4488
4489 /// Regression: a homogeneous but periodic multi-block stream must not be
4490 /// pre-split into tiny blocks at the lazy2 / btlazy2 levels. The rate-5
4491 /// chunk sampler used to phantom-split such input at every 8 KB chunk,
4492 /// cascading a large stream into hundreds of tiny blocks whose per-block
4493 /// headers ballooned the output (~5x vs the lazy level next door). With
4494 /// the rate-1 full-scan splitter the periodic stream is seen as uniform
4495 /// and stays a few full blocks. We assert the lazy2 (L8) and btlazy2 (L15)
4496 /// outputs stay within 2x of the lazy (L7) output on the same input, and
4497 /// that every output round-trips.
4498 #[test]
4499 fn periodic_stream_not_oversplit() {
4500 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4501 const LINES: &[&str] = &[
4502 "ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo table=orders region=eu-west\n",
4503 "ts=2026-03-26T21:39:29Z level=INFO msg=\"rotate segment\" tenant=demo table=orders region=eu-west\n",
4504 "ts=2026-03-26T21:39:30Z level=INFO msg=\"compact level\" tenant=demo table=orders region=eu-west\n",
4505 "ts=2026-03-26T21:39:31Z level=INFO msg=\"write block\" tenant=demo table=orders region=eu-west\n",
4506 ];
4507 // 512 KB = 4 upstream zstd blocks, enough for the cascade to manifest.
4508 let target = 512 * 1024usize;
4509 let mut data = Vec::with_capacity(target);
4510 let mut i = 0;
4511 while data.len() < target {
4512 let line = LINES[i % LINES.len()].as_bytes();
4513 let take = line.len().min(target - data.len());
4514 data.extend_from_slice(&line[..take]);
4515 i += 1;
4516 }
4517 let l7 = compress_slice_to_vec(&data, CompressionLevel::Level(7)); // lazy depth1
4518 let l8 = compress_slice_to_vec(&data, CompressionLevel::Level(8)); // lazy2
4519 let l15 = compress_slice_to_vec(&data, CompressionLevel::Level(15)); // btlazy2
4520 assert!(
4521 l8.len() < l7.len() * 2,
4522 "lazy2 over-split periodic stream: l7={} l8={}",
4523 l7.len(),
4524 l8.len()
4525 );
4526 assert!(
4527 l15.len() < l7.len() * 2,
4528 "btlazy2 over-split periodic stream: l7={} l15={}",
4529 l7.len(),
4530 l15.len()
4531 );
4532 for out in [&l7, &l8, &l15] {
4533 let mut decoder = FrameDecoder::new();
4534 let mut round = Vec::with_capacity(data.len());
4535 decoder
4536 .decode_all_to_vec(out, &mut round)
4537 .expect("decode periodic stream");
4538 assert_eq!(round, data, "periodic stream roundtrip mismatch");
4539 }
4540 }
4541
4542 /// End-to-end: a 256 KB payload whose SECOND 128 KB upstream zstd block carries
4543 /// an intra-block fingerprint transition, compressed at Level(5)
4544 /// (greedy, the pre-split path this revision routes through the cheap
4545 /// chunk splitter), round-trips through the crate's own decoder.
4546 ///
4547 /// The transition lives in the second block on purpose: the upstream zstd
4548 /// `savings < 3` gate skips splitting the first block (savings start at
4549 /// 0), so the first block is a homogeneous compressible run that banks
4550 /// savings, and the second block is the one whose intra-block transition
4551 /// `split_block_by_chunks()` resolves into a sub-block boundary (the
4552 /// `pending_input.split_off(...)` path). The test asserts that split
4553 /// decision directly so it cannot silently stop exercising the path if
4554 /// the fixture or params drift, then proves the emitted split frame
4555 /// round-trips. Level 13 (lazy) no longer pre-splits, hence Level 5.
4556 #[test]
4557 fn greedy_chunk_split_roundtrips_through_own_decoder() {
4558 use crate::encoding::CompressionLevel;
4559 let mut data = vec![0u8; 256 * 1024];
4560 // First 128 KB: homogeneous low-entropy run (compressible, banks
4561 // the savings the upstream zstd gate needs). Second 128 KB: low-entropy run
4562 // for its first half, then a counter sequence: a clear intra-block
4563 // fingerprint transition at the 192 KB midpoint for the chunk
4564 // splitter to find.
4565 for (i, byte) in data.iter_mut().enumerate() {
4566 *byte = if i < 192 * 1024 {
4567 (i & 0x07) as u8
4568 } else {
4569 (i % 251 + 1) as u8
4570 };
4571 }
4572
4573 // Directly assert the chunk splitter resolves the second block's
4574 // intra-block transition into a sub-block boundary once savings have
4575 // accrued (the compressible first block banks well over the gate).
4576 let second_block = &data[128 * 1024..];
4577 let split = super::optimal_block_size(
4578 CompressionLevel::Level(5),
4579 second_block,
4580 second_block.len(),
4581 MAX_BLOCK_SIZE as usize,
4582 100,
4583 );
4584 assert!(
4585 split < MAX_BLOCK_SIZE as usize,
4586 "second upstream zstd block must chunk-split at its intra-block transition, got {split}",
4587 );
4588
4589 let mut compressed = Vec::new();
4590 let mut compressor = FrameCompressor::new(CompressionLevel::Level(5));
4591 compressor.set_source(data.as_slice());
4592 compressor.set_drain(&mut compressed);
4593 compressor.compress();
4594
4595 let mut decoder = FrameDecoder::new();
4596 let mut source = compressed.as_slice();
4597 decoder
4598 .reset(&mut source)
4599 .expect("frame header should parse");
4600 while !decoder.is_finished() {
4601 decoder
4602 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4603 .expect("decode should succeed");
4604 }
4605 let mut decoded = Vec::with_capacity(data.len());
4606 decoder.collect_to_writer(&mut decoded).unwrap();
4607 assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4608 }
4609
4610 /// Outside-diff coverage for the FAST one-shot path.
4611 /// `compress_slice_to_vec` / `compress_independent_frame` on a Fast level
4612 /// routes through `run_borrowed_block_loop` (not the owned loop the test
4613 /// above covers), which must honour `optimal_block_size` and emit a
4614 /// sub-`MAX_BLOCK_SIZE` boundary rather than fixed 128 KiB blocks. A
4615 /// 256 KiB input is two 128 KiB blocks when unsplit; a chunk boundary in
4616 /// the second block yields >= 3 decoded blocks, asserted on the round-trip.
4617 #[test]
4618 fn fast_oneshot_borrowed_split_emits_subblock() {
4619 use crate::encoding::CompressionLevel;
4620 // First 192 KiB: homogeneous zero run (banks the savings the split
4621 // gate needs). The second 128 KiB block flips to a counter sequence
4622 // at its 64 KiB midpoint (the 192 KiB mark) — a fingerprint
4623 // transition the Fast from-borders splitter (split level 0) resolves
4624 // into a sub-block boundary.
4625 let mut data = vec![0u8; 256 * 1024];
4626 for (i, byte) in data.iter_mut().enumerate() {
4627 if i >= 192 * 1024 {
4628 *byte = (i % 251 + 1) as u8;
4629 }
4630 }
4631
4632 // Pin the splitter decision for the Fast path directly (mirrors the
4633 // greedy test): the second upstream zstd block must resolve to a sub-block
4634 // boundary, so the >= 3 block count below cannot pass vacuously.
4635 let second_block = &data[128 * 1024..];
4636 assert!(
4637 super::optimal_block_size(
4638 CompressionLevel::Fastest,
4639 second_block,
4640 second_block.len(),
4641 MAX_BLOCK_SIZE as usize,
4642 100,
4643 ) < MAX_BLOCK_SIZE as usize,
4644 "fixture must resolve to a sub-block split in the second upstream zstd block",
4645 );
4646
4647 // Drive the borrowed one-shot route explicitly (Fast level ->
4648 // run_borrowed_block_loop via compress_independent_frame).
4649 let mut compressor: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4650 let frame = compressor.compress_independent_frame(&data);
4651
4652 let mut decoder = FrameDecoder::new();
4653 let mut source = frame.as_slice();
4654 decoder
4655 .reset(&mut source)
4656 .expect("frame header should parse");
4657 while !decoder.is_finished() {
4658 decoder
4659 .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
4660 .expect("decode should succeed");
4661 }
4662 let mut decoded = Vec::with_capacity(data.len());
4663 decoder.collect_to_writer(&mut decoded).unwrap();
4664 assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
4665 assert!(
4666 decoder.blocks_decoded() >= 3,
4667 "fast one-shot borrowed path must split the second upstream zstd block \
4668 (256 KiB unsplit = 2 blocks), got {} blocks",
4669 decoder.blocks_decoded(),
4670 );
4671 }
4672
4673 /// Regression: `set_compression_level` followed by `compress()` must
4674 /// refresh `state.strategy_tag` through the reset-time sync so the
4675 /// literal-compression gates (`min_literals_to_compress`,
4676 /// `min_gain`) use the NEW level's strategy. Picks a level pair
4677 /// that genuinely crosses strategy bands — `Fastest` resolves to
4678 /// `Fast`, `Level(20)` resolves to `BtUltra2` — so a missed sync
4679 /// would leave the construction-time tag visible and trip the
4680 /// assertion. `CompressionLevel::Best` would also pass type-wise
4681 /// but resolves to `Lazy` today, which keeps `min_literals_to_compress`
4682 /// in the same `shift=3 → 64-byte` band as `Fast` and weakens the
4683 /// signal that the gate floor actually moved.
4684 #[cfg(feature = "std")]
4685 #[test]
4686 fn set_compression_level_then_compress_refreshes_strategy_tag() {
4687 use super::CompressionLevel;
4688 use crate::encoding::strategy::StrategyTag;
4689
4690 let data = vec![0xABu8; 256];
4691 let mut out = Vec::new();
4692 let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
4693 let initial_tag = compressor.state.strategy_tag;
4694 assert_eq!(
4695 initial_tag,
4696 StrategyTag::for_compression_level(CompressionLevel::Fastest),
4697 "construction-time strategy_tag must reflect initial level",
4698 );
4699
4700 // Switch to a level whose resolved strategy lives in a different
4701 // band, then run a full compress cycle — the matcher.reset()
4702 // inside `compress` is the only site that can refresh the tag.
4703 let new_level = CompressionLevel::Level(20);
4704 compressor.set_compression_level(new_level);
4705 compressor.set_source(data.as_slice());
4706 compressor.set_drain(&mut out);
4707 compressor.compress();
4708
4709 let new_tag = compressor.state.strategy_tag;
4710 let expected = StrategyTag::for_compression_level(new_level);
4711 assert_eq!(
4712 new_tag, expected,
4713 "strategy_tag must follow set_compression_level → compress, \
4714 got {new_tag:?} expected {expected:?}",
4715 );
4716 assert_eq!(
4717 expected,
4718 StrategyTag::BtUltra2,
4719 "test fixture invariant: Level(20) must resolve to BtUltra2 \
4720 so the post-switch tag visibly crosses the band boundary",
4721 );
4722 assert_ne!(
4723 new_tag, initial_tag,
4724 "test fixture invariant: chosen levels must resolve to \
4725 different StrategyTag variants",
4726 );
4727 }
4728
4729 /// Magicless mode (`ZSTD_f_zstd1_magicless`): encoded frame
4730 /// MUST NOT start with the 4-byte magic prefix, AND must
4731 /// round-trip through a magicless-aware decoder.
4732 #[test]
4733 fn magicless_frame_omits_magic_and_roundtrips() {
4734 use crate::common::MAGIC_NUM;
4735 let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
4736
4737 // Encode with magicless = true.
4738 let mut output: Vec<u8> = Vec::new();
4739 let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
4740 compressor.set_magicless(true);
4741 compressor.set_source(input.as_slice());
4742 compressor.set_drain(&mut output);
4743 compressor.compress();
4744
4745 // 1. Encoded output must NOT begin with the zstd magic number.
4746 assert!(
4747 !output.starts_with(&MAGIC_NUM.to_le_bytes()),
4748 "magicless frame must omit the 4-byte magic prefix",
4749 );
4750
4751 // 2. A magicless-aware decoder must round-trip the payload.
4752 let mut decoder = crate::decoding::FrameDecoder::new();
4753 decoder.set_magicless(true);
4754 let mut cursor: &[u8] = output.as_slice();
4755 decoder.init(&mut cursor).expect("magicless init");
4756 decoder
4757 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
4758 .expect("decode_blocks");
4759 let mut decoded: Vec<u8> = Vec::new();
4760 decoder
4761 .collect_to_writer(&mut decoded)
4762 .expect("collect_to_writer");
4763 assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
4764
4765 // 3. A standard (magicful) decoder MUST reject a magicless
4766 // frame at the header-read step — the first 4 bytes are
4767 // the frame-header descriptor + window / dictionary / FCS
4768 // metadata, not the magic. We accept either
4769 // `BadMagicNumber` (typical case: first 4 bytes don't
4770 // match `MAGIC_NUM` and don't fall in the skippable-frame
4771 // magic range) or `SkipFrame` (rare: the first 4 bytes
4772 // coincidentally land in `0x184D2A50..=0x184D2A5F`). Both
4773 // prove the standard decoder did not treat the bytes as a
4774 // real magicful frame.
4775 use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
4776 let mut std_decoder = crate::decoding::FrameDecoder::new();
4777 let std_init = std_decoder.init(output.as_slice());
4778 match std_init {
4779 Err(FrameDecoderError::ReadFrameHeaderError(
4780 ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
4781 )) => {}
4782 other => panic!(
4783 "standard decoder must reject a magicless frame with \
4784 ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
4785 ),
4786 }
4787 }
4788
4789 /// A reused `FrameCompressor` must emit byte-identical frames to a
4790 /// fresh compressor per input across both the borrowed (Fast) and
4791 /// owned (Dfast/Lazy/Greedy/Uncompressed) backends. This proves
4792 /// `prepare_frame` fully resets the per-frame state (matcher window,
4793 /// content hasher, FSE/Huffman seeds) between independent frames; a
4794 /// missed reset would corrupt frame N>=2's header checksum or matches.
4795 /// Each emitted frame must also round-trip.
4796 #[test]
4797 fn compress_independent_frame_reuse_matches_fresh_and_roundtrips() {
4798 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4799 let levels = [
4800 CompressionLevel::Uncompressed,
4801 CompressionLevel::Fastest,
4802 CompressionLevel::Default,
4803 CompressionLevel::Better,
4804 CompressionLevel::Best,
4805 CompressionLevel::Level(5),
4806 ];
4807 let inputs: Vec<Vec<u8>> = vec![
4808 Vec::new(),
4809 vec![0x00],
4810 b"the quick brown fox jumps over the lazy dog\n".to_vec(),
4811 vec![0x7Eu8; 50_000], // highly compressible
4812 generate_data(0xABCD, 70_000), // pseudo-random
4813 generate_data(0x1234, 200_000),
4814 ];
4815 for level in levels {
4816 let mut cctx: FrameCompressor = FrameCompressor::new(level);
4817 for data in &inputs {
4818 let reused = cctx.compress_independent_frame(data);
4819 let fresh = compress_slice_to_vec(data, level);
4820 assert_eq!(
4821 reused,
4822 fresh,
4823 "reused frame != fresh frame for len={} level={:?}",
4824 data.len(),
4825 level,
4826 );
4827 let mut decoder = FrameDecoder::new();
4828 let mut decoded = Vec::with_capacity(data.len());
4829 decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
4830 assert_eq!(
4831 decoded,
4832 *data,
4833 "roundtrip failed for len={} level={:?}",
4834 data.len(),
4835 level,
4836 );
4837 }
4838 }
4839 }
4840
4841 /// `compress_independent_frame_into` must replace (not append to) the
4842 /// caller's buffer each call, so a smaller frame after a larger one
4843 /// yields exactly the smaller frame, and the reused buffer's content
4844 /// matches a fresh compression of the same input.
4845 #[test]
4846 fn compress_independent_frame_into_replaces_buffer_contents() {
4847 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
4848 let large = vec![0x11u8; 40_000];
4849 let small = b"short payload".to_vec();
4850 let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Default);
4851 let mut out = Vec::new();
4852 cctx.compress_independent_frame_into(&large, &mut out);
4853 let frame_large = out.clone();
4854 // Reusing the same buffer for a smaller frame must clear it first.
4855 cctx.compress_independent_frame_into(&small, &mut out);
4856 assert_eq!(
4857 out,
4858 compress_slice_to_vec(&small, CompressionLevel::Default),
4859 "reused buffer must hold exactly the second frame",
4860 );
4861 // The first frame, captured before reuse, still round-trips.
4862 let mut decoder = FrameDecoder::new();
4863 let mut decoded = Vec::with_capacity(large.len());
4864 decoder
4865 .decode_all_to_vec(&frame_large, &mut decoded)
4866 .unwrap();
4867 assert_eq!(decoded, large);
4868 }
4869
4870 /// A sticky dictionary set once on a reused compressor must be primed
4871 /// into every independent frame (mirroring `ZSTD_CCtx_loadDictionary`):
4872 /// each frame decodes with the dictionary and is byte-identical to a
4873 /// fresh compressor carrying the same dictionary. This proves
4874 /// `prepare_frame` re-primes the dictionary (matcher content + offset
4875 /// history + entropy seed) every call rather than only on the first.
4876 #[test]
4877 fn compress_independent_frame_reuses_sticky_dictionary() {
4878 use crate::encoding::CompressionLevel;
4879 let dict_raw = include_bytes!("../../dict_tests/dictionary");
4880 let dict_content = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
4881 let mut payload_a = Vec::new();
4882 for _ in 0..8 {
4883 payload_a.extend_from_slice(&dict_content.dict_content[..2048]);
4884 }
4885 let payload_b = b"a different second frame payload, still dict-attached".to_vec();
4886 let inputs = [payload_a, payload_b];
4887
4888 let mut cctx: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4889 cctx.set_dictionary_from_bytes(dict_raw)
4890 .expect("dictionary bytes should parse");
4891
4892 for data in &inputs {
4893 let reused = cctx.compress_independent_frame(data);
4894 // Fresh compressor carrying the same sticky dictionary.
4895 let mut fresh_enc: FrameCompressor = FrameCompressor::new(CompressionLevel::Fastest);
4896 fresh_enc
4897 .set_dictionary_from_bytes(dict_raw)
4898 .expect("dictionary bytes should parse");
4899 let fresh = fresh_enc.compress_independent_frame(data);
4900 assert_eq!(
4901 reused,
4902 fresh,
4903 "reused dict frame != fresh dict frame, len={}",
4904 data.len(),
4905 );
4906 // Round-trip with the dictionary on the decode side.
4907 let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
4908 let mut decoder = FrameDecoder::new();
4909 decoder.add_dict(dict_for_decoder).unwrap();
4910 let mut decoded = Vec::with_capacity(data.len());
4911 decoder.decode_all_to_vec(&reused, &mut decoded).unwrap();
4912 assert_eq!(&decoded, data, "dict roundtrip failed, len={}", data.len());
4913 }
4914 }
4915}