structured_zstd/decoding/frame_decoder.rs
1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::block_decoder::BlockDecoder;
10use crate::decoding::buffer_backend::BufferBackend;
11use crate::decoding::dictionary::{Dictionary, DictionaryHandle};
12use crate::decoding::errors::{DecodeBlockContentError, FrameDecoderError};
13use crate::decoding::flat_buf::FlatBuf;
14use crate::decoding::ringbuffer::RingBuffer;
15use crate::decoding::scratch::DecoderScratch;
16use crate::io::{Error, Read, Write};
17use alloc::collections::BTreeMap;
18use alloc::vec::Vec;
19use core::convert::TryInto;
20
21use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
22
23/// Build the block-header decode error. With the `lsm` feature it captures
24/// the failing block's index and frame offset (block-precise recovery);
25/// without it, the legacy positionless variant — so the default build's
26/// error surface stays byte-identical to the upstream zstd.
27#[cfg(feature = "lsm")]
28fn block_header_decode_error(
29 source: crate::decoding::errors::BlockHeaderReadError,
30 block_index: u32,
31 frame_offset: u32,
32) -> FrameDecoderError {
33 FrameDecoderError::FailedToReadBlockHeaderAt {
34 source,
35 block_index,
36 frame_offset,
37 }
38}
39#[cfg(not(feature = "lsm"))]
40fn block_header_decode_error(
41 source: crate::decoding::errors::BlockHeaderReadError,
42 _block_index: u32,
43 _frame_offset: u32,
44) -> FrameDecoderError {
45 FrameDecoderError::FailedToReadBlockHeader(source)
46}
47
48/// Build the block-body decode error. With `lsm` it captures the block
49/// index, frame offset, and the failing block's structural metadata
50/// (reconstructed from its header); without it, the legacy variant.
51#[cfg(feature = "lsm")]
52fn block_body_decode_error(
53 source: DecodeBlockContentError,
54 block_index: u32,
55 frame_offset: u32,
56 header: &crate::blocks::block::BlockHeader,
57 header_size: u8,
58) -> FrameDecoderError {
59 use crate::blocks::block::BlockType;
60 // Physical wire body vs the raw `Block_Size` field: RLE writes a single
61 // body byte while `Block_Size` carries the repeat count; Raw/Compressed
62 // bodies match the field.
63 let (body_size, block_size_field) = match header.block_type {
64 BlockType::RLE => (1u32, header.decompressed_size),
65 _ => (header.content_size, header.content_size),
66 };
67 FrameDecoderError::FailedToReadBlockBodyAt {
68 source,
69 block_index,
70 frame_offset,
71 block: crate::encoding::frame_emit_info::FrameBlock {
72 offset_in_frame: frame_offset,
73 header_size,
74 body_size,
75 block_size_field,
76 block_type: header.block_type,
77 last_block: header.last_block,
78 // Raw/RLE carry their regenerated size in the header;
79 // a Compressed block's is unknown until decoded, so
80 // `read_block_header` leaves `decompressed_size` 0 here.
81 decompressed_size: header.decompressed_size,
82 },
83 }
84}
85#[cfg(not(feature = "lsm"))]
86fn block_body_decode_error(
87 source: DecodeBlockContentError,
88 _block_index: u32,
89 _frame_offset: u32,
90 _header: &crate::blocks::block::BlockHeader,
91 _header_size: u8,
92) -> FrameDecoderError {
93 FrameDecoderError::FailedToReadBlockBody(source)
94}
95
96/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
97///
98/// This decoder is able to decode frames only partially and gives control
99/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
100/// It reads bytes as needed from a provided source and can be read from to collect partial results.
101///
102/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
103/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
104///
105/// Workflow is as follows:
106/// ```
107/// use structured_zstd::decoding::BlockDecodingStrategy;
108///
109/// # #[cfg(feature = "std")]
110/// use std::io::{Read, Write};
111///
112/// // no_std environments can use the crate's own Read traits
113/// # #[cfg(not(feature = "std"))]
114/// use structured_zstd::io::{Read, Write};
115///
116/// fn decode_this(mut file: impl Read) {
117/// //Create a new decoder
118/// let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
119/// let mut result = Vec::new();
120///
121/// // Use reset or init to make the decoder ready to decode the frame from the io::Read
122/// frame_dec.reset(&mut file).unwrap();
123///
124/// // Loop until the frame has been decoded completely
125/// while !frame_dec.is_finished() {
126/// // decode (roughly) batch_size many bytes
127/// frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
128///
129/// // read from the decoder to collect bytes from the internal buffer
130/// let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
131///
132/// // then do something with it
133/// do_something(&result[0..bytes_read]);
134/// }
135///
136/// // handle the last chunk of data
137/// while frame_dec.can_collect() > 0 {
138/// let x = frame_dec.read(result.as_mut_slice()).unwrap();
139///
140/// do_something(&result[0..x]);
141/// }
142/// }
143///
144/// fn do_something(data: &[u8]) {
145/// # #[cfg(feature = "std")]
146/// std::io::stdout().write_all(data).unwrap();
147/// }
148/// ```
149pub struct FrameDecoder {
150 state: Option<FrameDecoderState>,
151 /// Test-only observability: frames decoded via `run_direct_decode`.
152 /// The direct and buffered paths are byte-identical, so dispatch
153 /// regressions (e.g. re-excluding dictionary frames from the direct
154 /// gate) are invisible to output assertions; tests pin the path here.
155 #[cfg(test)]
156 direct_frames: u64,
157 // Registered dictionaries are stored by shared handle (Arc/Rc) so a
158 // single content copy is referenced by every frame the decoder decodes
159 // (upstream zstd `ZSTD_refDDict`), rather than re-copied into the decode buffer
160 // per frame. `add_dict` wraps an owned `Dictionary` into a handle.
161 owned_dicts: BTreeMap<u32, DictionaryHandle>,
162 #[cfg(target_has_atomic = "ptr")]
163 shared_dicts: BTreeMap<u32, DictionaryHandle>,
164 #[cfg(not(target_has_atomic = "ptr"))]
165 shared_dicts: (),
166 /// `ZSTD_f_zstd1_magicless` — when true, [`init`] / [`reset`]
167 /// expect frames without the 4-byte magic number prefix.
168 /// Default false (standard zstd format).
169 magicless: bool,
170 /// How the optional content checksum is handled. Default
171 /// [`ContentChecksum::EmitOnly`] (compute + expose, no error on
172 /// mismatch). Set via [`Self::set_content_checksum`].
173 content_checksum: ContentChecksum,
174 /// Pinned `Dictionary_ID` expectation set via
175 /// [`Self::expect_dict_id`]. `None` (default) disables the
176 /// check; `Some(0)` matches frames whose header omits the
177 /// optional dict_id (treated as "no dictionary"). Validated in
178 /// [`Self::reset`] AFTER the frame header parses successfully
179 /// and BEFORE any block decode work.
180 #[cfg(feature = "lsm")]
181 expect_dict_id: Option<u32>,
182 /// Pinned `Window_Descriptor` byte expectation set via
183 /// [`Self::expect_window_descriptor`]. `None` (default)
184 /// disables the check. Validated in [`Self::reset`] AFTER the
185 /// frame header parses successfully and BEFORE any block
186 /// decode work. Single-segment frames (which omit the
187 /// `Window_Descriptor` byte from the wire) surface as
188 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`]
189 /// with `found: None`.
190 #[cfg(feature = "lsm")]
191 expect_window_descriptor: Option<u8>,
192 /// When `true`, the per-block decode loop XXH64-hashes each
193 /// block's decompressed bytes and stores the low-32-bit digest in
194 /// [`Self::computed_block_checksums`]. Default `false` (zero
195 /// cost). Set via [`Self::enable_per_block_checksums`]. Gated on
196 /// `all(lsm, hash)` because XXH64 lives behind the `hash`
197 /// feature.
198 #[cfg(all(feature = "lsm", feature = "hash"))]
199 per_block_checksums_enabled: bool,
200 /// Per-block XXH64 (low 32 bits) digests captured during the
201 /// current frame's decode when `per_block_checksums_enabled` is
202 /// set. Reset at the start of every new frame. Gated on
203 /// `all(lsm, hash)` (see `per_block_checksums_enabled`).
204 #[cfg(all(feature = "lsm", feature = "hash"))]
205 computed_block_checksums: alloc::vec::Vec<u32>,
206}
207
208/// How the decoder treats a frame's optional XXH64 content checksum
209/// (RFC 8878 Content_Checksum_flag). The XXH64 pass over the decompressed
210/// output is a measurable share of decode time, so it is made skippable.
211///
212/// ```
213/// use structured_zstd::decoding::{ContentChecksum, FrameDecoder};
214/// let mut decoder = FrameDecoder::new();
215/// decoder.set_content_checksum(ContentChecksum::Verify);
216/// ```
217#[derive(Copy, Clone, PartialEq, Eq, Debug, Default)]
218pub enum ContentChecksum {
219 /// Skip the XXH64 pass entirely: no compute, no verify.
220 /// `get_calculated_checksum()` returns `None`.
221 None,
222 /// Compute the checksum and expose it via the accessors, but do not
223 /// error on a mismatch. This is the default and matches the historical
224 /// behaviour (callers verify manually if they wish).
225 #[default]
226 EmitOnly,
227 /// Compute the checksum and compare it against the frame's stored value;
228 /// a disagreement fails the decode with
229 /// [`FrameDecoderError::ChecksumMismatch`](crate::decoding::errors::FrameDecoderError::ChecksumMismatch).
230 /// Without the `hash` feature there is no way to compute a digest, so
231 /// `Verify` cannot detect a mismatch and behaves like `None`.
232 Verify,
233}
234
235/// Decode-relevant identity of a frame, used to reject a [`ResumeState`]
236/// captured from one frame being applied to a frame of a different shape. Covers
237/// every header field that changes how blocks decode (buffer sizing, backend
238/// kind, entropy/dictionary context, trailing-checksum handling, declared
239/// content size, magicless framing).
240///
241/// This is a SHAPE guard, not a content-unique fingerprint: two distinct frames
242/// that happen to share all these header fields produce the same key (no cheap
243/// header field uniquely identifies frame content). It catches the realistic
244/// accidental misuse — applying a snapshot to a frame with a different
245/// window/dictionary/size — with a typed error instead of byte-wrong output.
246/// Pairing a `ResumeState` with the correct frame's compressed source and
247/// `window_prime` remains the caller's contract.
248#[cfg(feature = "lsm")]
249#[derive(Clone, Copy, PartialEq, Eq, Debug)]
250struct FrameKey {
251 window_size: u64,
252 frame_content_size: u64,
253 /// `Dictionary_ID` declared in the frame header (`None` when omitted).
254 dictionary_id: Option<u32>,
255 /// Dictionary actually applied to the decoder (`state.using_dict`). This is
256 /// distinct from `dictionary_id`: a frame with a dictless header can still
257 /// be decoded with an explicit dictionary via `reset_with_dict_handle` /
258 /// `force_dict`, and two such decodes with different dictionaries must NOT
259 /// compare equal — keying only on the header field would miss that.
260 active_dictionary_id: Option<u32>,
261 single_segment: bool,
262 content_checksum: bool,
263 magicless: bool,
264}
265
266#[cfg(feature = "lsm")]
267impl FrameKey {
268 fn from_state(state: &FrameDecoderState, magicless: bool) -> FrameKey {
269 let header = &state.frame_header;
270 FrameKey {
271 window_size: header.window_size().unwrap_or(0),
272 frame_content_size: header.frame_content_size(),
273 dictionary_id: header.dictionary_id(),
274 active_dictionary_id: state.using_dict,
275 single_segment: header.descriptor.single_segment_flag(),
276 content_checksum: header.descriptor.content_checksum_flag(),
277 magicless,
278 }
279 }
280}
281
282/// XXH64 of a contiguous byte slice — the resume-side counterpart to
283/// [`DecoderScratchKind::window_tail_hash`]. Streaming XXH64 is chunk-boundary
284/// independent, so this single-slice hash equals the emit-side two-slice hash
285/// over the same bytes.
286#[cfg(all(feature = "lsm", feature = "hash"))]
287fn xxh64_of(bytes: &[u8]) -> u64 {
288 use core::hash::Hasher;
289 let mut h = twox_hash::XxHash64::with_seed(0);
290 h.write(bytes);
291 h.finish()
292}
293
294/// Cross-block decode state needed to resume a cold partial decode at an inner
295/// block boundary, emitted by [`FrameDecoder::decode_blocks_partial`] when its
296/// `emit_resume` argument is `true` (returned in
297/// [`PartialDecode::resume_state`]) and fed back via that same method's
298/// [`resume`](FrameDecoder::decode_blocks_partial) argument
299/// ([`ResumeInput`]).
300///
301/// A zstd block does not carry all the state required to decode it in
302/// isolation: besides the shared match window (the decompressed output history),
303/// a Compressed block may reuse the previous block's entropy tables via
304/// `Repeat_Mode` (literals Huffman + the LL/OF/ML FSE distributions) and always
305/// continues the running repeat-offset history. This snapshot carries exactly
306/// that carry-over state plus the resume coordinates, so resuming is
307/// byte-identical to a contiguous decode even across a dropped decoder. The
308/// window itself is NOT stored here — the caller supplies it back through
309/// [`ResumeInput::window_prime`] from the decompressed output it already
310/// persists. Neither is the dictionary: for a dictionary frame the caller
311/// re-attaches it to the resuming decoder via [`FrameDecoder::reset`] /
312/// [`FrameDecoder::reset_with_dict_handle`] (it already holds the dictionary
313/// from encode time), and the snapshot records only the dictionary's identity
314/// so a resume under a different dictionary is rejected.
315///
316/// Behind the `lsm` Cargo feature.
317#[cfg(feature = "lsm")]
318#[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
319pub struct ResumeState {
320 /// Identity of the frame this state was captured from. Compared against the
321 /// frame currently reset into the decoder before any state is restored, so a
322 /// snapshot from a different frame shape is rejected with
323 /// [`FrameDecoderError::ResumeFrameMismatch`] instead of silently producing
324 /// byte-wrong output.
325 frame_key: FrameKey,
326 /// Index of the block to resume AT (the first block NOT yet decoded).
327 block_index: u32,
328 /// Cumulative decompressed byte count produced before `block_index`.
329 output_offset: u64,
330 /// FSE tables (LL/OF/ML) as of the last decoded block — the source for a
331 /// `Repeat_Mode` resume block.
332 fse: crate::decoding::scratch::FSEScratch,
333 /// Huffman literals table as of the last decoded block — the source for a
334 /// treeless (repeat) literals resume block.
335 huf: crate::decoding::scratch::HuffmanScratch,
336 /// Running repeat-offset history (`offset_hist`) as of the last decoded
337 /// block.
338 offset_hist: [u32; 3],
339 /// XXH64 of the exact window-prime bytes (the last `min(window_size,
340 /// output_offset)` decompressed bytes) captured at emit. Verified at resume
341 /// against the caller-supplied [`ResumeInput::window_prime`]: a content
342 /// mismatch (wrong frame, wrong or corrupted prime) is a near-unique
343 /// (≈2⁻⁶⁴) signal and is rejected with
344 /// [`FrameDecoderError::ResumeFrameMismatch`]. This is the content-exact
345 /// guard; [`FrameKey`] is the cheap shape pre-check that works without the
346 /// `hash` feature. Behind `all(lsm, hash)`.
347 #[cfg(feature = "hash")]
348 window_hash: u64,
349}
350
351#[cfg(feature = "lsm")]
352impl ResumeState {
353 /// Inner block index this state resumes at (the first block not yet
354 /// decoded). Pass it as the `end_block` lower bound (and as `start_block`)
355 /// of the resuming
356 /// [`decode_blocks_partial`](FrameDecoder::decode_blocks_partial) call.
357 pub fn block_index(&self) -> u32 {
358 self.block_index
359 }
360
361 /// Cumulative decompressed byte count produced before
362 /// [`block_index`](Self::block_index) — i.e. the decompressed offset at
363 /// which the resumed output begins. Equals
364 /// `FrameEmitInfo::decompressed_byte_range(block_index).start`. Use it to
365 /// slice the `window_prime` tail the resumed call needs.
366 pub fn output_offset(&self) -> u64 {
367 self.output_offset
368 }
369}
370
371// Manual Debug: the entropy tables are large internal scratch with no useful
372// Debug surface; only the resume coordinates are worth printing (and this lets
373// `PartialDecode` keep its derived Debug).
374#[cfg(feature = "lsm")]
375impl core::fmt::Debug for ResumeState {
376 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
377 f.debug_struct("ResumeState")
378 .field("block_index", &self.block_index)
379 .field("output_offset", &self.output_offset)
380 .finish_non_exhaustive()
381 }
382}
383
384/// Resume input fed to [`FrameDecoder::decode_blocks_partial`]'s `resume`
385/// argument to continue a cold partial decode without re-decompressing the
386/// preceding blocks.
387///
388/// Behind the `lsm` Cargo feature.
389#[cfg(feature = "lsm")]
390#[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
391pub struct ResumeInput<'a> {
392 /// The caller's already-decompressed output ending just before
393 /// [`ResumeState::block_index`]. Must contain at least the last
394 /// `min(window_size, output_offset)` bytes (a full match window, or the
395 /// whole prefix when it is shorter than one window); anything beyond the
396 /// last `window_size` bytes is ignored, so passing the entire prefix is
397 /// also valid (capped internally, bounding resume memory to one window).
398 pub window_prime: &'a [u8],
399 /// Cross-block entropy/repcode state emitted by the prior
400 /// [`decode_blocks_partial`](FrameDecoder::decode_blocks_partial) call.
401 pub state: &'a ResumeState,
402}
403
404/// Backend-tagged decode scratch — chosen at frame-reset time based
405/// on the parsed `FrameHeader.descriptor.single_segment_flag()` and
406/// kept stable through the lifetime of the frame. The match in each
407/// helper below dispatches **once per call** (e.g. once per block in
408/// `decode_block_content`, once per drain in `drain_to_writer`) —
409/// never inside the hot push/repeat loop, which is fully
410/// monomorphised through the `DecoderScratch<B>` generic.
411enum DecoderScratchKind {
412 Ring(DecoderScratch<RingBuffer>),
413 Flat(DecoderScratch<FlatBuf>),
414}
415
416impl DecoderScratchKind {
417 fn new_ring(window_size: usize) -> Self {
418 // Lazy ring-buffer allocation: do NOT `reserve(window_size)` here.
419 // The direct-decode path (`run_direct_decode`) writes through
420 // `UserSliceBackend` and never touches the ring; allocating it
421 // eagerly wastes one full window of peak memory on the common
422 // direct-eligible frame. On the non-direct path the window is
423 // pre-reserved once at frame entry (`decode_all_impl` and
424 // `decode_blocks` both call `DecoderScratchKind::reserve_buffer`
425 // before any block writes), so multi-block frames pay one
426 // amortised grow instead of repeated `reserve_amortized` steps
427 // per block. Issue #279 round 2.
428 let s = DecoderScratch::<RingBuffer>::new(window_size);
429 Self::Ring(s)
430 }
431
432 /// Construct a flat-backed scratch for a single-segment frame.
433 /// `frame_content_size` is the upcoming output size in bytes
434 /// (== `window_size` when the flag is set).
435 ///
436 /// Lazy buffer allocation (mirrors [`Self::new_ring`]): do NOT
437 /// pre-size the `FlatBuf`. The direct-decode path
438 /// (`run_direct_decode`) writes through `UserSliceBackend` and never
439 /// touches this buffer, so eagerly allocating a full FCS wastes one
440 /// whole content-size of peak memory on the common direct-eligible
441 /// single-segment frame. The non-direct fallback reserves it once via
442 /// `reserve_buffer(window_size)` at frame entry before any block
443 /// write (`FlatBuf::reserve` adds the `WILDCOPY_OVERLENGTH` slack),
444 /// and every inline-exec site (trait method and per-kernel macros)
445 /// now carries a tight-tail bounded copy, so a tight buffer can never
446 /// overshoot regardless of construction-time slack.
447 fn new_flat(frame_content_size: usize) -> Self {
448 let s = DecoderScratch::<FlatBuf>::new(frame_content_size);
449 Self::Flat(s)
450 }
451
452 /// Reset (or transition between) backends for a new frame.
453 /// Reuses the existing `DecoderScratch` allocations (FSE / HUF
454 /// tables, sequence vec, etc.) when the backend kind is unchanged
455 /// — only the underlying buffer is re-sized for the new frame.
456 /// Building a fresh `DecoderScratch` on every frame would
457 /// re-allocate everything and was measured at +255 % vs ring on
458 /// small frames; reusing it keeps the small-frame cost flat.
459 fn reset(&mut self, frame: &frame::FrameHeader, window_size: usize) {
460 if frame.descriptor.single_segment_flag() {
461 match self {
462 Self::Flat(s) => {
463 s.reset(window_size);
464 // `DecoderScratch::reset` clears the backing buffer and
465 // updates `window_size` WITHOUT reserving it (it may still
466 // resize the per-block scratch Vecs up to
467 // `min(window_size, MAX_BLOCK_SIZE)`). Backing-buffer
468 // capacity is decided one layer up: direct-eligible frames
469 // never touch it, and the non-direct path pre-reserves once
470 // via `reserve_buffer(window_size)` at frame entry.
471 }
472 Self::Ring(_) => *self = Self::new_flat(window_size),
473 }
474 } else {
475 match self {
476 Self::Ring(s) => s.reset(window_size),
477 Self::Flat(_) => *self = Self::new_ring(window_size),
478 }
479 }
480 }
481
482 fn init_from_dict(&mut self, dict: &DictionaryHandle) {
483 match self {
484 Self::Ring(s) => s.init_from_dict(dict),
485 Self::Flat(s) => s.init_from_dict(dict),
486 }
487 }
488
489 #[inline]
490 fn buffer_len(&self) -> usize {
491 match self {
492 Self::Ring(s) => s.buffer.len(),
493 Self::Flat(s) => s.buffer.len(),
494 }
495 }
496
497 fn workspace_bytes(&self) -> usize {
498 match self {
499 Self::Ring(s) => s.workspace_bytes(),
500 Self::Flat(s) => s.workspace_bytes(),
501 }
502 }
503
504 /// Pre-reserve the backing buffer to `window_size` in a single
505 /// allocation. Called once on the non-direct (`decode_blocks`) path
506 /// after direct-eligibility is ruled out, so multi-segment fallback
507 /// decodes don't pay repeated `reserve_amortized` grow steps
508 /// (128 KiB → 256 KiB → ... → window) as blocks accumulate.
509 ///
510 /// Direct-eligible frames never call this and pay zero backing-buffer
511 /// allocation for the window, on BOTH backends: `new_ring` and
512 /// `new_flat` are each lazy (no pre-reserve), so a direct-eligible
513 /// frame writes only through `UserSliceBackend` and leaves this
514 /// buffer empty.
515 ///
516 /// `window_size` is the TARGET visible-window capacity: callers pass
517 /// the full window, and the method itself computes the shortfall past
518 /// the bytes already buffered before calling the backend's
519 /// ADDITIONAL-semantics `reserve_exact`. That keeps re-entries (the
520 /// decode_all fallback loop runs `decode_blocks` once per strategy
521 /// chunk, and streaming callers invoke it per call) from growing a
522 /// window-full buffer toward 2x window, while per-block growth keeps
523 /// the amortized `reserve`.
524 #[inline]
525 fn reserve_buffer(&mut self, window_size: usize) {
526 // Exact growth: this is the one-shot pre-reservation, and a request
527 // landing one slack past the retained capacity (e.g. a dictionary
528 // prefix already loaded into the buffer) must not DOUBLE a
529 // window-sized allocation through the amortized policy. Per-block
530 // growth keeps the amortized `reserve`.
531 //
532 // `reserve_exact` takes ADDITIONAL capacity, so request only the
533 // shortfall past the bytes already buffered: the decode_all
534 // fallback loop re-enters `decode_blocks` once per strategy chunk,
535 // and re-requesting the full window each iteration would grow a
536 // window-sized buffer toward 2x window.
537 match self {
538 Self::Ring(s) => {
539 let additional = window_size.saturating_sub(s.buffer.len());
540 s.buffer.reserve_exact(additional);
541 }
542 Self::Flat(s) => {
543 let additional = window_size.saturating_sub(s.buffer.len());
544 s.buffer.reserve_exact(additional);
545 }
546 }
547 }
548
549 /// Last `n` bytes of the visible buffer as `(s1, s2)` (wrap-aware).
550 /// Routes through whichever backend the current scratch holds.
551 #[cfg(all(feature = "lsm", feature = "hash"))]
552 fn last_n_as_slices(&self, n: usize) -> (&[u8], &[u8]) {
553 match self {
554 Self::Ring(s) => s.buffer.last_n_as_slices(n),
555 Self::Flat(s) => s.buffer.last_n_as_slices(n),
556 }
557 }
558
559 fn buffer_drain(&mut self) -> Vec<u8> {
560 match self {
561 Self::Ring(s) => s.buffer.drain(),
562 Self::Flat(s) => s.buffer.drain(),
563 }
564 }
565
566 fn buffer_drain_to_window_size(&mut self) -> Option<Vec<u8>> {
567 match self {
568 Self::Ring(s) => s.buffer.drain_to_window_size(),
569 Self::Flat(s) => s.buffer.drain_to_window_size(),
570 }
571 }
572
573 fn buffer_drain_to_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
574 match self {
575 Self::Ring(s) => s.buffer.drain_to_writer(sink),
576 Self::Flat(s) => s.buffer.drain_to_writer(sink),
577 }
578 }
579
580 fn buffer_drain_to_window_size_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
581 match self {
582 Self::Ring(s) => s.buffer.drain_to_window_size_writer(sink),
583 Self::Flat(s) => s.buffer.drain_to_window_size_writer(sink),
584 }
585 }
586
587 fn buffer_can_drain(&self) -> usize {
588 match self {
589 Self::Ring(s) => s.buffer.can_drain(),
590 Self::Flat(s) => s.buffer.can_drain(),
591 }
592 }
593
594 fn buffer_can_drain_to_window_size(&self) -> Option<usize> {
595 match self {
596 Self::Ring(s) => s.buffer.can_drain_to_window_size(),
597 Self::Flat(s) => s.buffer.can_drain_to_window_size(),
598 }
599 }
600
601 fn buffer_read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
602 match self {
603 Self::Ring(s) => s.buffer.read(target),
604 Self::Flat(s) => s.buffer.read(target),
605 }
606 }
607
608 fn buffer_read_all(&mut self, target: &mut [u8]) -> Result<usize, Error> {
609 match self {
610 Self::Ring(s) => s.buffer.read_all(target),
611 Self::Flat(s) => s.buffer.read_all(target),
612 }
613 }
614
615 /// Drop visible output beyond `window_size` without producing it,
616 /// keeping the most recent `window_size` bytes available to back
617 /// future match copies. Used by `decode_blocks_partial` to bound
618 /// memory while decoding the leading (skipped) blocks into the window.
619 #[cfg(feature = "lsm")]
620 fn buffer_drop_to_window_size(&mut self) -> usize {
621 match self {
622 Self::Ring(s) => s.buffer.drop_to_window_size(),
623 Self::Flat(s) => s.buffer.drop_to_window_size(),
624 }
625 }
626
627 /// Drop exactly `n` bytes from the front of the visible output without
628 /// producing them. Used by `decode_blocks_partial` to discard the
629 /// leading blocks' window-context bytes once the in-range blocks are
630 /// decoded (match resolution complete), leaving only the in-range output.
631 #[cfg(feature = "lsm")]
632 fn buffer_discard_front(&mut self, n: usize) {
633 match self {
634 Self::Ring(s) => s.buffer.discard_front(n),
635 Self::Flat(s) => s.buffer.discard_front(n),
636 }
637 }
638
639 /// Prime the match window with the caller's already-decompressed tail for
640 /// a resumed partial decode. Routes through whichever backend the current
641 /// scratch holds. See [`DecodeBuffer::prime_window`].
642 #[cfg(feature = "lsm")]
643 fn prime_window(&mut self, prefix: &[u8], total_output: u64) {
644 match self {
645 Self::Ring(s) => s.buffer.prime_window(prefix, total_output),
646 Self::Flat(s) => s.buffer.prime_window(prefix, total_output),
647 }
648 }
649
650 /// Total decompressed bytes produced so far (the buffer's running output
651 /// counter, unaffected by window drops / drains). Used to stamp a captured
652 /// [`ResumeState`]'s `output_offset`.
653 #[cfg(feature = "lsm")]
654 fn total_output(&self) -> u64 {
655 match self {
656 Self::Ring(s) => s.buffer.total_output(),
657 Self::Flat(s) => s.buffer.total_output(),
658 }
659 }
660
661 /// Clone the cross-block entropy/repcode state (FSE + Huffman tables +
662 /// `offset_hist`) out of the live scratch for a [`ResumeState`] snapshot.
663 #[cfg(feature = "lsm")]
664 fn export_entropy(
665 &self,
666 ) -> (
667 crate::decoding::scratch::FSEScratch,
668 crate::decoding::scratch::HuffmanScratch,
669 [u32; 3],
670 ) {
671 let (fse_src, huf_src, offset_hist) = match self {
672 Self::Ring(s) => (&s.fse, &s.huf, s.offset_hist),
673 Self::Flat(s) => (&s.fse, &s.huf, s.offset_hist),
674 };
675 let mut fse = crate::decoding::scratch::FSEScratch::new();
676 fse.reinit_from(fse_src);
677 let mut huf = crate::decoding::scratch::HuffmanScratch::new();
678 huf.reinit_resolved_from(huf_src);
679 (fse, huf, offset_hist)
680 }
681
682 /// Install entropy/repcode state from a [`ResumeState`] into the live
683 /// scratch so a `Repeat_Mode` / treeless resume block resolves against the
684 /// same tables a contiguous decode would have carried over.
685 #[cfg(feature = "lsm")]
686 fn restore_entropy(&mut self, state: &ResumeState) {
687 match self {
688 Self::Ring(s) => {
689 s.fse.reinit_from(&state.fse);
690 s.huf.reinit_resolved_from(&state.huf);
691 s.offset_hist = state.offset_hist;
692 }
693 Self::Flat(s) => {
694 s.fse.reinit_from(&state.fse);
695 s.huf.reinit_resolved_from(&state.huf);
696 s.offset_hist = state.offset_hist;
697 }
698 }
699 }
700
701 /// XXH64 of the window-prime bytes for a [`ResumeState`]: the last
702 /// `min(window_size, buffer_len)` bytes of the current buffer, which at emit
703 /// time are exactly the match-window context the resume block will see.
704 /// Wrap-aware via `last_n_as_slices` — streaming XXH64 over the two slices
705 /// equals a single hash over the contiguous `window_prime` at resume.
706 #[cfg(all(feature = "lsm", feature = "hash"))]
707 fn window_tail_hash(&self, window_size: usize) -> u64 {
708 use core::hash::Hasher;
709 let n = core::cmp::min(window_size, self.buffer_len());
710 let (s1, s2) = self.last_n_as_slices(n);
711 let mut h = twox_hash::XxHash64::with_seed(0);
712 h.write(s1);
713 h.write(s2);
714 h.finish()
715 }
716
717 fn decode_block_content<R: Read>(
718 &mut self,
719 decoder: &mut BlockDecoder,
720 header: &crate::blocks::block::BlockHeader,
721 source: R,
722 ) -> Result<u64, DecodeBlockContentError> {
723 match self {
724 Self::Ring(s) => decoder.decode_block_content(header, s, source),
725 Self::Flat(s) => decoder.decode_block_content(header, s, source),
726 }
727 }
728
729 #[cfg(feature = "hash")]
730 fn hash_finish(&self) -> u64 {
731 use core::hash::Hasher;
732 match self {
733 Self::Ring(s) => s.buffer.hash.finish(),
734 Self::Flat(s) => s.buffer.hash.finish(),
735 }
736 }
737
738 /// Forward the drain-time hash toggle to the inner `DecodeBuffer`
739 /// (streaming path). Called by the frame layer from the decoder's
740 /// `ContentChecksum` mode before each decode.
741 #[cfg(feature = "hash")]
742 fn set_compute_hash(&mut self, compute: bool) {
743 match self {
744 Self::Ring(s) => s.buffer.set_compute_hash(compute),
745 Self::Flat(s) => s.buffer.set_compute_hash(compute),
746 }
747 }
748}
749
750struct FrameDecoderState {
751 pub frame_header: frame::FrameHeader,
752 decoder_scratch: DecoderScratchKind,
753 frame_finished: bool,
754 block_counter: usize,
755 bytes_read_counter: u64,
756 check_sum: Option<u32>,
757 using_dict: Option<u32>,
758}
759
760pub enum BlockDecodingStrategy {
761 All,
762 UptoBlocks(usize),
763 UptoBytes(usize),
764}
765
766/// Outcome of [`FrameDecoder::decode_blocks_partial`]: the decompressed
767/// bytes of the requested inner-block range plus where (if anywhere)
768/// decoding stopped early.
769///
770/// Behind the `lsm` Cargo feature.
771#[cfg(feature = "lsm")]
772#[derive(Debug)]
773pub struct PartialDecode {
774 /// Decompressed bytes of the in-range blocks actually decoded, in
775 /// frame order, as one contiguous buffer. `data.len()` equals the sum
776 /// of the decompressed sizes of blocks `start_block .. start_block +
777 /// blocks_decoded`.
778 pub data: alloc::vec::Vec<u8>,
779 /// First block whose output is in [`data`](Self::data): the requested
780 /// `start_block` on a fresh decode, or [`ResumeState::block_index`] when
781 /// resuming (the caller-supplied `start_block` is ignored in resume mode).
782 pub start_block: u32,
783 /// Number of in-range blocks successfully decoded into
784 /// [`data`](Self::data).
785 pub blocks_decoded: u32,
786 /// `Some((block_index, error))` if decoding stopped on a failing block
787 /// before reaching `end_block` (a corrupt block inside the range, or a
788 /// leading block needed for window context). `None` if the requested
789 /// range decoded cleanly or the frame's last block was reached first.
790 ///
791 /// When the failing block is a leading context block
792 /// (`block_index < start_block`), the in-range window could not be
793 /// built so [`data`](Self::data) is empty and `blocks_decoded` is 0.
794 pub stopped_at: Option<(u32, FrameDecoderError)>,
795 /// `true` if the frame's last block was reached during this decode.
796 pub frame_finished: bool,
797 /// Cross-block carry-over state for resuming the next extent. Feed it back
798 /// (with the matching `window_prime`) via the `resume` argument of a
799 /// later [`FrameDecoder::decode_blocks_partial`] to continue from
800 /// [`ResumeState::block_index`] without re-decompressing the prefix.
801 ///
802 /// `None` in two cases: emission was not requested (`emit_resume = false`),
803 /// OR this decode reached the frame's last block ([`frame_finished`] is
804 /// `true`) — there is no following block to resume from, so no snapshot is
805 /// emitted even with `emit_resume = true`. Callers walking a frame
806 /// incrementally should therefore stop when `frame_finished` is set rather
807 /// than treat a `None` here as "emission disabled".
808 ///
809 /// [`frame_finished`]: Self::frame_finished
810 pub resume_state: Option<ResumeState>,
811}
812
813impl FrameDecoderState {
814 /// Window size to actually reserve for this frame's decode buffer.
815 /// A declared content size caps the useful window: matches can never
816 /// reference further back than the bytes that will ever exist, so an
817 /// encoder-declared window above the FCS (e.g. a level-preset window
818 /// on a smaller input) must not inflate the reservation. Every
819 /// `reserve_buffer` site routes through this so the cap is uniform
820 /// across `decode_all_impl`, `decode_blocks`, and the partial path.
821 fn useful_window_size(&self) -> usize {
822 let window_size = self.frame_header.window_size().unwrap_or(0);
823 if self.frame_header.fcs_declared() {
824 window_size.min(self.frame_header.frame_content_size()) as usize
825 } else {
826 window_size as usize
827 }
828 }
829
830 /// Construct a new frame decoder state, reading the frame header
831 /// from `source`. When `magicless` is `true`, the 4-byte magic
832 /// number prefix is NOT consumed (upstream zstd `ZSTD_f_zstd1_magicless`).
833 /// Crate-internal — reached only via `FrameDecoder::init` /
834 /// `FrameDecoder::init_with_dict_handle`. The decode buffer is
835 /// allocated lazily on BOTH backends (`new_ring` and `new_flat`):
836 /// direct-eligible frames pay zero buffer allocation, and the
837 /// non-direct fallback reserves `window_size` once in
838 /// `decode_all_impl` / `decode_blocks` via `reserve_buffer` before
839 /// any block write.
840 pub(crate) fn new_with_format(
841 source: impl Read,
842 magicless: bool,
843 ) -> Result<FrameDecoderState, FrameDecoderError> {
844 let (frame, header_size) = frame::read_frame_header_with_format(source, magicless)?;
845 let window_size = frame.window_size()?;
846
847 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
848 return Err(FrameDecoderError::WindowSizeTooBig {
849 requested: window_size,
850 });
851 }
852
853 let decoder_scratch = if frame.descriptor.single_segment_flag() {
854 DecoderScratchKind::new_flat(window_size as usize)
855 } else {
856 DecoderScratchKind::new_ring(window_size as usize)
857 };
858 Ok(FrameDecoderState {
859 frame_header: frame,
860 frame_finished: false,
861 block_counter: 0,
862 decoder_scratch,
863 bytes_read_counter: u64::from(header_size),
864 check_sum: None,
865 using_dict: None,
866 })
867 }
868
869 /// Reset this state for a new frame read from `source`, reusing
870 /// existing allocations. When `magicless` is `true`, the frame
871 /// header is read WITHOUT expecting a magic-number prefix
872 /// (upstream zstd `ZSTD_f_zstd1_magicless`). Crate-internal — reached
873 /// only via `FrameDecoder::reset`.
874 ///
875 /// `DecodeBuffer::reset` no longer reserves window_size for either
876 /// backend — capacity decisions live one layer up. Both backends are
877 /// lazy: direct-eligible frames pay zero backing-buffer allocation
878 /// here (they write through `UserSliceBackend`), and the non-direct
879 /// path is pre-reserved by `decode_all_impl` / `decode_blocks` via
880 /// `DecoderScratchKind::reserve_buffer(window_size)` before any block
881 /// write. A reused scratch whose new frame fits within prior capacity
882 /// reuses it; a larger one grows on that same `reserve_buffer` call.
883 pub(crate) fn reset_with_format(
884 &mut self,
885 source: impl Read,
886 magicless: bool,
887 ) -> Result<(), FrameDecoderError> {
888 let (frame_header, header_size) = frame::read_frame_header_with_format(source, magicless)?;
889 let window_size = frame_header.window_size()?;
890
891 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
892 return Err(FrameDecoderError::WindowSizeTooBig {
893 requested: window_size,
894 });
895 }
896
897 self.decoder_scratch
898 .reset(&frame_header, window_size as usize);
899 self.frame_header = frame_header;
900 self.frame_finished = false;
901 self.block_counter = 0;
902 self.bytes_read_counter = u64::from(header_size);
903 self.check_sum = None;
904 self.using_dict = None;
905 Ok(())
906 }
907}
908
909impl Default for FrameDecoder {
910 fn default() -> Self {
911 Self::new()
912 }
913}
914
915impl FrameDecoder {
916 /// This will create a new decoder without allocating anything yet.
917 /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
918 /// else they just reset these buffers with not further allocations
919 pub fn new() -> FrameDecoder {
920 FrameDecoder {
921 state: None,
922 #[cfg(test)]
923 direct_frames: 0,
924 owned_dicts: BTreeMap::new(),
925 #[cfg(target_has_atomic = "ptr")]
926 shared_dicts: BTreeMap::new(),
927 #[cfg(not(target_has_atomic = "ptr"))]
928 shared_dicts: (),
929 magicless: false,
930 content_checksum: ContentChecksum::EmitOnly,
931 #[cfg(feature = "lsm")]
932 expect_dict_id: None,
933 #[cfg(feature = "lsm")]
934 expect_window_descriptor: None,
935 #[cfg(all(feature = "lsm", feature = "hash"))]
936 per_block_checksums_enabled: false,
937 #[cfg(all(feature = "lsm", feature = "hash"))]
938 computed_block_checksums: alloc::vec::Vec::new(),
939 }
940 }
941
942 /// Heap bytes currently held by the decoder's lazily-grown workspace:
943 /// the decode-window buffer plus the per-block literal/content buffers
944 /// and the entropy tables. Returns 0 before the first frame is initialised
945 /// (no workspace allocated yet). The window allocation dominates and grows
946 /// with the frame's window size; this is the value to track for decode-time
947 /// memory pressure, mirroring the workspace term of upstream
948 /// `ZSTD_sizeof_DCtx`. Shared dictionaries (ref-counted handles) are not
949 /// counted, matching upstream excluding `refDDict` memory.
950 pub fn workspace_size(&self) -> usize {
951 self.state
952 .as_ref()
953 .map_or(0, |s| s.decoder_scratch.workspace_bytes())
954 }
955
956 /// Select how the frame's optional content checksum is handled
957 /// (compute, expose, verify, or skip). See [`ContentChecksum`].
958 /// Default [`ContentChecksum::EmitOnly`]. Takes effect on the next
959 /// decode; safe to call between frames on a reused decoder.
960 pub fn set_content_checksum(&mut self, mode: ContentChecksum) {
961 self.content_checksum = mode;
962 }
963
964 /// Opt in to per-block XXH64 verification during decode.
965 /// Default off; zero cost when disabled. Each block's decompressed
966 /// bytes are XXH64-hashed (low 32 bits) and appended to
967 /// [`Self::computed_block_checksums`] as the decode progresses.
968 /// Callers compare the captured digests against externally-stored
969 /// expected values (e.g. from a per-block sidecar in the
970 /// containing application protocol).
971 ///
972 /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
973 /// primitive lives behind the `hash` feature, so this method
974 /// only compiles when both are enabled.
975 #[cfg(all(feature = "lsm", feature = "hash"))]
976 pub fn enable_per_block_checksums(&mut self) {
977 self.per_block_checksums_enabled = true;
978 }
979
980 /// Per-block XXH64 (low 32 bits) digests captured during the
981 /// current frame's decode. Empty unless
982 /// [`Self::enable_per_block_checksums`] was called before
983 /// [`Self::decode_all`] / [`Self::reset`].
984 ///
985 /// Reset at the start of every new frame.
986 ///
987 /// Behind `all(feature = "lsm", feature = "hash")`.
988 #[cfg(all(feature = "lsm", feature = "hash"))]
989 pub fn computed_block_checksums(&self) -> &[u32] {
990 &self.computed_block_checksums
991 }
992
993 /// Pin the expected `Dictionary_ID` for the next frame.
994 ///
995 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
996 /// validate it against the parsed frame header BEFORE any
997 /// block decode work runs. A mismatch returns
998 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedDictId`]
999 /// before any block decode and before any output is produced.
1000 /// Scratch buffer allocation / reservation for the decode
1001 /// pipeline happens during frame-header parsing, which is
1002 /// already complete when this validation fires — the cost of
1003 /// scratch sizing is paid even on a mismatched header. The
1004 /// guarantee is "no block decode, no XXH64 init, no partial
1005 /// output", not "zero allocation".
1006 ///
1007 /// `Some(0)` is treated as "no dictionary expected": a frame
1008 /// whose header omits the optional `Dictionary_ID` field
1009 /// (flag value 0) passes the check; a frame that carries an
1010 /// explicit non-zero id fails.
1011 ///
1012 /// `None` (default) disables the check.
1013 ///
1014 /// Primary use case: post-AEAD-decrypt sanity check in
1015 /// wire-format consumers (e.g. lsm-tree's encrypted block
1016 /// format pins the `dict_id` baked into the AAD against the
1017 /// inner zstd frame's `dict_id` to defeat dict-substitution
1018 /// attacks).
1019 ///
1020 /// NOT a replacement for AEAD authentication. NOT the same
1021 /// semantic as upstream zstd `ZSTD_d_windowLogMax` (which is a
1022 /// ceiling-style limit, separate concern).
1023 #[cfg(feature = "lsm")]
1024 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1025 pub fn expect_dict_id(&mut self, expected: Option<u32>) {
1026 self.expect_dict_id = expected;
1027 }
1028
1029 /// Pin the expected raw `Window_Descriptor` byte (RFC 8878
1030 /// §3.1.1.1.2 layout: `(exp << 3) | mantissa`) for the next
1031 /// frame.
1032 ///
1033 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
1034 /// validate it against the parsed frame header BEFORE any
1035 /// block decode work runs. A mismatch returns
1036 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`].
1037 ///
1038 /// Single-segment frames omit the `Window_Descriptor` byte
1039 /// from the wire entirely. Setting an expectation while
1040 /// receiving a single-segment frame fails the check with
1041 /// `found: None` — there is no on-wire byte to match against,
1042 /// which is reported explicitly rather than silently passing.
1043 ///
1044 /// `None` (default) disables the check.
1045 ///
1046 /// Byte-exact equality, NOT a ceiling. Upstream zstd
1047 /// `ZSTD_d_windowLogMax` is a separate ceiling-style limit
1048 /// available through the C FFI surface; this method is for
1049 /// strict equality validation against a pinned expectation
1050 /// (e.g. lsm-tree's wire format pins the window descriptor
1051 /// from the AAD to defeat decompression-bomb-swap attacks).
1052 #[cfg(feature = "lsm")]
1053 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1054 pub fn expect_window_descriptor(&mut self, expected: Option<u8>) {
1055 self.expect_window_descriptor = expected;
1056 }
1057
1058 /// Validate the just-parsed frame header against any pinned
1059 /// expectations set via [`Self::expect_dict_id`] /
1060 /// [`Self::expect_window_descriptor`].
1061 ///
1062 /// Returns the typed error variant on mismatch and leaves
1063 /// `self.state` in a re-resettable shape — a subsequent
1064 /// `reset()` will overwrite `frame_header` from the new source
1065 /// without needing intermediate cleanup.
1066 #[cfg(feature = "lsm")]
1067 fn validate_expectations(
1068 &self,
1069 frame_header: &frame::FrameHeader,
1070 ) -> Result<(), FrameDecoderError> {
1071 if let Some(expected) = self.expect_dict_id {
1072 let found = frame_header.dictionary_id();
1073 // `Some(0)` is the "no dictionary expected" sentinel —
1074 // matches a frame whose header omits the optional
1075 // dict_id field (which is reported as `None` by the
1076 // parser). All other values must match exactly.
1077 let matches = match (expected, found) {
1078 (0, None) => true,
1079 (e, Some(f)) => e == f,
1080 _ => false,
1081 };
1082 if !matches {
1083 return Err(FrameDecoderError::UnexpectedDictId {
1084 expected: Some(expected),
1085 found,
1086 });
1087 }
1088 }
1089 if let Some(expected) = self.expect_window_descriptor {
1090 let found = frame_header.window_descriptor();
1091 if found != Some(expected) {
1092 return Err(FrameDecoderError::UnexpectedWindowDescriptor { expected, found });
1093 }
1094 }
1095 Ok(())
1096 }
1097
1098 /// Enable or disable magicless frame format
1099 /// (`ZSTD_f_zstd1_magicless`). When set to `true`, subsequent
1100 /// [`init`] / [`reset`] calls expect the frame header to begin
1101 /// directly with the frame-header descriptor — no 4-byte magic
1102 /// number prefix. Default false. Must match the encoder's
1103 /// magicless setting; the format is unambiguous only when the
1104 /// caller knows it out-of-band.
1105 ///
1106 /// Note: magicless mode also disables skippable-frame detection.
1107 /// The `0x184D2A50..=0x184D2A5F` skippable-frame magic range is
1108 /// only recognised when the 4-byte magic prefix is consumed, so
1109 /// `decode_all` / `init` / `reset` will treat a skippable frame
1110 /// at the head of a magicless stream as a malformed frame header
1111 /// (bad descriptor / window-size error) instead of skipping it.
1112 /// Mixed-format streams that interleave skippable frames must be
1113 /// pre-split by the caller; `set_magicless(true)` is only safe
1114 /// when the entire stream is known to be magicless zstd frames.
1115 pub fn set_magicless(&mut self, magicless: bool) {
1116 self.magicless = magicless;
1117 }
1118
1119 #[cfg(target_has_atomic = "ptr")]
1120 fn shared_dict_exists(&self, dict_id: u32) -> bool {
1121 self.shared_dicts.contains_key(&dict_id)
1122 }
1123
1124 #[cfg(not(target_has_atomic = "ptr"))]
1125 fn shared_dict_exists(&self, _dict_id: u32) -> bool {
1126 false
1127 }
1128
1129 fn validate_registered_dictionary(dict: &Dictionary) -> Result<(), FrameDecoderError> {
1130 use crate::decoding::errors::DictionaryDecodeError as dict_err;
1131
1132 if dict.id == 0 {
1133 return Err(FrameDecoderError::from(dict_err::ZeroDictionaryId));
1134 }
1135 if let Some(index) = dict.offset_hist.iter().position(|&rep| rep == 0) {
1136 return Err(FrameDecoderError::from(
1137 dict_err::ZeroRepeatOffsetInDictionary { index: index as u8 },
1138 ));
1139 }
1140 Ok(())
1141 }
1142
1143 /// init() will allocate all needed buffers if it is the first time this decoder is used
1144 /// else they just reset these buffers with not further allocations
1145 ///
1146 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
1147 ///
1148 /// equivalent to reset()
1149 pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
1150 self.reset(source)
1151 }
1152
1153 /// Initialize the decoder for a new frame using a pre-parsed dictionary handle.
1154 ///
1155 /// If the frame header has a dictionary ID, this validates it against
1156 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
1157 ///
1158 /// If the header omits the optional dictionary ID, this still applies the
1159 /// provided dictionary handle.
1160 ///
1161 /// # Warning
1162 ///
1163 /// This method always applies `dict` unless the frame header contains a
1164 /// non-matching dictionary ID. Callers must only use this API when they
1165 /// already know the frame was encoded with the provided dictionary, even if
1166 /// the frame header omits the dictionary ID or encodes an explicit
1167 /// dictionary ID of `0`.
1168 ///
1169 /// Passing a dictionary for a frame that was not encoded with it can
1170 /// silently corrupt the decoded output.
1171 pub fn init_with_dict_handle(
1172 &mut self,
1173 source: impl Read,
1174 dict: &DictionaryHandle,
1175 ) -> Result<(), FrameDecoderError> {
1176 self.reset_with_dict_handle(source, dict)
1177 }
1178
1179 /// reset() will allocate all needed buffers if it is the first time this decoder is used
1180 /// else they just reset these buffers with not further allocations
1181 ///
1182 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
1183 ///
1184 /// equivalent to init()
1185 pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
1186 use FrameDecoderError as err;
1187 // Fresh frame → start with an empty per-block checksum vec so
1188 // the values for the next frame don't carry over from the
1189 // previous one.
1190 #[cfg(all(feature = "lsm", feature = "hash"))]
1191 self.computed_block_checksums.clear();
1192 let magicless = self.magicless;
1193 let dict_id = match &mut self.state {
1194 Some(s) => {
1195 s.reset_with_format(source, magicless)?;
1196 s.frame_header.dictionary_id()
1197 }
1198 None => {
1199 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
1200 self.state
1201 .as_ref()
1202 .and_then(|state| state.frame_header.dictionary_id())
1203 }
1204 };
1205 // Validate any pinned expectations BEFORE block decode work
1206 // runs. Catches dict_id substitution / window-descriptor
1207 // tampering on inputs already authenticated by an outer
1208 // layer (e.g. AEAD). Returning here leaves `self.state` in
1209 // a re-resettable shape — next `reset()` re-parses the
1210 // frame header without intermediate cleanup.
1211 #[cfg(feature = "lsm")]
1212 if let Some(state) = self.state.as_ref() {
1213 self.validate_expectations(&state.frame_header)?;
1214 }
1215 if let Some(dict_id) = dict_id {
1216 let state = self.state.as_mut().expect("state initialized");
1217 let owned_dicts = &self.owned_dicts;
1218 #[cfg(target_has_atomic = "ptr")]
1219 let shared_dicts = &self.shared_dicts;
1220 let dict = owned_dicts
1221 .get(&dict_id)
1222 .or_else(|| {
1223 #[cfg(target_has_atomic = "ptr")]
1224 {
1225 shared_dicts.get(&dict_id)
1226 }
1227 #[cfg(not(target_has_atomic = "ptr"))]
1228 {
1229 None
1230 }
1231 })
1232 .ok_or(err::DictNotProvided { dict_id })?;
1233 state.decoder_scratch.init_from_dict(dict);
1234 state.using_dict = Some(dict_id);
1235 }
1236 Ok(())
1237 }
1238
1239 /// Reset this decoder for a new frame using a pre-parsed dictionary handle.
1240 ///
1241 /// If the frame header has a dictionary ID, this validates it against
1242 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
1243 ///
1244 /// If the header omits the optional dictionary ID, this still applies the
1245 /// provided dictionary handle.
1246 ///
1247 /// # Warning
1248 ///
1249 /// This method always applies `dict` unless the frame header contains a
1250 /// non-matching dictionary ID. Callers must only use this API when they
1251 /// already know the frame was encoded with the provided dictionary, even if
1252 /// the frame header omits the dictionary ID or encodes an explicit
1253 /// dictionary ID of `0`.
1254 ///
1255 /// Passing a dictionary for a frame that was not encoded with it can
1256 /// silently corrupt the decoded output.
1257 pub fn reset_with_dict_handle(
1258 &mut self,
1259 source: impl Read,
1260 dict: &DictionaryHandle,
1261 ) -> Result<(), FrameDecoderError> {
1262 use FrameDecoderError as err;
1263 // Fresh frame → drop the previous frame's per-block checksum
1264 // digests so the next decode starts with an empty vec.
1265 // Mirrors the same clear in `reset()`; reset_with_dict_handle
1266 // is a parallel entry point so it needs its own call.
1267 #[cfg(all(feature = "lsm", feature = "hash"))]
1268 self.computed_block_checksums.clear();
1269 Self::validate_registered_dictionary(dict.as_dict())?;
1270 let magicless = self.magicless;
1271 // Scope the &mut borrow of `self.state` to the header parse
1272 // alone, so the subsequent `validate_expectations(&self, ...)`
1273 // call below can take a fresh shared borrow of self without
1274 // tripping the borrow checker.
1275 match &mut self.state {
1276 Some(s) => s.reset_with_format(source, magicless)?,
1277 None => {
1278 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
1279 }
1280 }
1281 // Single source of truth: route through the same
1282 // `validate_expectations` used by `reset()`. Routing through
1283 // the helper keeps the two code paths from drifting (e.g.,
1284 // if expect-semantics or error wiring changes later).
1285 #[cfg(feature = "lsm")]
1286 {
1287 let header = &self
1288 .state
1289 .as_ref()
1290 .expect("state populated by reset_with_format/new_with_format")
1291 .frame_header;
1292 self.validate_expectations(header)?;
1293 }
1294 let state = self
1295 .state
1296 .as_mut()
1297 .expect("state populated by reset_with_format/new_with_format");
1298 if let Some(dict_id) = state.frame_header.dictionary_id()
1299 && dict_id != dict.id()
1300 {
1301 return Err(err::DictIdMismatch {
1302 expected: dict_id,
1303 provided: dict.id(),
1304 });
1305 }
1306 state.decoder_scratch.init_from_dict(dict);
1307 state.using_dict = Some(dict.id());
1308 Ok(())
1309 }
1310
1311 /// Add a dictionary that can be selected dynamically by frame dictionary ID.
1312 ///
1313 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
1314 /// registered (either as owned or shared).
1315 pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
1316 Self::validate_registered_dictionary(&dict)?;
1317 let dict_id = dict.id;
1318 if self.owned_dicts.contains_key(&dict_id) || self.shared_dict_exists(dict_id) {
1319 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
1320 }
1321 self.owned_dicts
1322 .insert(dict_id, DictionaryHandle::from_dictionary(dict));
1323 Ok(())
1324 }
1325
1326 /// Parse and add a serialized dictionary blob.
1327 pub fn add_dict_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), FrameDecoderError> {
1328 let dict = Dictionary::decode_dict(raw_dictionary)?;
1329 self.add_dict(dict)
1330 }
1331
1332 /// Add a pre-parsed dictionary handle for reuse across decoders.
1333 ///
1334 /// This API is available on targets with pointer-width atomics
1335 /// (`target_has_atomic = "ptr"`).
1336 ///
1337 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
1338 /// registered (either as owned or shared).
1339 #[cfg(target_has_atomic = "ptr")]
1340 pub fn add_dict_handle(&mut self, dict: DictionaryHandle) -> Result<(), FrameDecoderError> {
1341 Self::validate_registered_dictionary(dict.as_dict())?;
1342 let dict_id = dict.id();
1343 if self.owned_dicts.contains_key(&dict_id) || self.shared_dicts.contains_key(&dict_id) {
1344 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
1345 }
1346 self.shared_dicts.insert(dict_id, dict);
1347 Ok(())
1348 }
1349
1350 pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
1351 use FrameDecoderError as err;
1352 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
1353 let owned_dicts = &self.owned_dicts;
1354 #[cfg(target_has_atomic = "ptr")]
1355 let shared_dicts = &self.shared_dicts;
1356
1357 let dict = owned_dicts
1358 .get(&dict_id)
1359 .or_else(|| {
1360 #[cfg(target_has_atomic = "ptr")]
1361 {
1362 shared_dicts.get(&dict_id)
1363 }
1364 #[cfg(not(target_has_atomic = "ptr"))]
1365 {
1366 None
1367 }
1368 })
1369 .ok_or(err::DictNotProvided { dict_id })?;
1370 state.decoder_scratch.init_from_dict(dict);
1371 state.using_dict = Some(dict_id);
1372
1373 Ok(())
1374 }
1375
1376 /// Returns how many bytes the frame contains after decompression
1377 pub fn content_size(&self) -> u64 {
1378 match &self.state {
1379 None => 0,
1380 Some(s) => s.frame_header.frame_content_size(),
1381 }
1382 }
1383
1384 /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
1385 pub fn get_checksum_from_data(&self) -> Option<u32> {
1386 let state = self.state.as_ref()?;
1387
1388 state.check_sum
1389 }
1390
1391 /// Returns the checksum that was calculated while decoding.
1392 /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder.
1393 /// Returns `None` when the frame header has `content_checksum_flag = 0`:
1394 /// no hash is computed for such frames (the post-decode XXH64 pass was a
1395 /// 63 % decode-wall hotspot on flag-off frames; skipping it when the
1396 /// frame format declares no trailing digest avoids that wasted work).
1397 #[cfg(feature = "hash")]
1398 pub fn get_calculated_checksum(&self) -> Option<u32> {
1399 let state = self.state.as_ref()?;
1400 // `ContentChecksum::None` skips the XXH64 pass entirely, so there is
1401 // no calculated digest to report.
1402 if self.content_checksum == ContentChecksum::None {
1403 return None;
1404 }
1405 if !state.frame_header.descriptor.content_checksum_flag() {
1406 return None;
1407 }
1408 let cksum_64bit = state.decoder_scratch.hash_finish();
1409 //truncate to lower 32bit because reasons...
1410 Some(cksum_64bit as u32)
1411 }
1412
1413 /// Compare the frame's stored content checksum against the digest the
1414 /// decoder computed, returning [`FrameDecoderError::ChecksumMismatch`] on
1415 /// disagreement. No-op unless the mode is [`ContentChecksum::Verify`] and
1416 /// the frame carries a trailing checksum.
1417 ///
1418 /// [`decode_all`](Self::decode_all) and the streaming reader call this
1419 /// automatically. Callers driving [`decode_blocks`](Self::decode_blocks)
1420 /// directly invoke it themselves once per frame, after the frame is fully
1421 /// decoded AND fully drained (e.g. via [`collect`](Self::collect)), so both
1422 /// the stored value and the running digest are final.
1423 #[cfg(feature = "hash")]
1424 pub fn verify_content_checksum(&self) -> Result<(), FrameDecoderError> {
1425 if self.content_checksum != ContentChecksum::Verify {
1426 return Ok(());
1427 }
1428 let Some(state) = self.state.as_ref() else {
1429 return Ok(());
1430 };
1431 if !state.frame_header.descriptor.content_checksum_flag() {
1432 return Ok(());
1433 }
1434 let Some(expected) = state.check_sum else {
1435 return Ok(());
1436 };
1437 let calculated = state.decoder_scratch.hash_finish() as u32;
1438 if expected != calculated {
1439 return Err(FrameDecoderError::ChecksumMismatch {
1440 expected,
1441 calculated,
1442 });
1443 }
1444 Ok(())
1445 }
1446
1447 /// Counter for how many bytes have been consumed while decoding the frame
1448 pub fn bytes_read_from_source(&self) -> u64 {
1449 let state = match &self.state {
1450 None => return 0,
1451 Some(s) => s,
1452 };
1453 state.bytes_read_counter
1454 }
1455
1456 /// Test-only: number of frames decoded through the single-copy direct
1457 /// path (`run_direct_decode`). Lets cross-module tests assert that a
1458 /// given decode took the decode-in-place path rather than the ring drain.
1459 #[cfg(test)]
1460 pub(crate) fn direct_frames(&self) -> u64 {
1461 self.direct_frames
1462 }
1463
1464 /// Whether the current frames last block has been decoded yet
1465 /// If this returns true you can call the drain* functions to get all content
1466 /// (the read() function will drain automatically if this returns true)
1467 pub fn is_finished(&self) -> bool {
1468 let state = match &self.state {
1469 None => return true,
1470 Some(s) => s,
1471 };
1472 if state.frame_header.descriptor.content_checksum_flag() {
1473 state.frame_finished && state.check_sum.is_some()
1474 } else {
1475 state.frame_finished
1476 }
1477 }
1478
1479 /// Counter for how many blocks have already been decoded
1480 pub fn blocks_decoded(&self) -> usize {
1481 let state = match &self.state {
1482 None => return 0,
1483 Some(s) => s,
1484 };
1485 state.block_counter
1486 }
1487
1488 /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
1489 /// The Strategy influences how many blocks will be decoded before the function returns
1490 /// This is important if you want to manage memory consumption carefully. If you don't care
1491 /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
1492 pub fn decode_blocks(
1493 &mut self,
1494 mut source: impl Read,
1495 strat: BlockDecodingStrategy,
1496 ) -> Result<bool, FrameDecoderError> {
1497 use FrameDecoderError as err;
1498 // Apply the content-checksum mode to the streaming drain hash before
1499 // any block decodes into the ring. Hash only when a digest is both
1500 // wanted (mode != None) AND present in the frame (content_checksum_flag
1501 // set) — a flag-off frame has nothing to verify or expose, so hashing
1502 // it is wasted work. Mirrors the direct path and get_calculated_checksum.
1503 #[cfg(feature = "hash")]
1504 let checksum_mode = self.content_checksum;
1505 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
1506 #[cfg(feature = "hash")]
1507 {
1508 let compute_hash = checksum_mode != ContentChecksum::None
1509 && state.frame_header.descriptor.content_checksum_flag();
1510 state.decoder_scratch.set_compute_hash(compute_hash);
1511 }
1512
1513 // Streaming entry point: pre-reserve the backing buffer to
1514 // the FCS-capped window so multi-block frames don't pay repeated
1515 // `reserve_amortized` grow steps (128 KiB → 256 KiB → ... →
1516 // window) as blocks accumulate. `decode_all` does the same up
1517 // front in `decode_all_impl`; this mirrors it for callers
1518 // driving `decode_blocks` directly. Idempotent — the
1519 // backend's `reserve` early-returns when capacity is already
1520 // sufficient.
1521 let useful_window = state.useful_window_size();
1522 state.decoder_scratch.reserve_buffer(useful_window);
1523
1524 let mut block_dec = decoding::block_decoder::new();
1525
1526 let buffer_size_before = state.decoder_scratch.buffer_len();
1527 let block_counter_before = state.block_counter;
1528 loop {
1529 vprintln!("################");
1530 vprintln!("Next Block: {}", state.block_counter);
1531 vprintln!("################");
1532 // Capture the failing-block coordinates BEFORE the header read so
1533 // the error carries where it happened: `bytes_read_counter` is the
1534 // frame-absolute offset of this block's header (not yet advanced),
1535 // `block_counter` its 0-based index. Used by both the header- and
1536 // body-error builders below (block-precise recovery under `lsm`).
1537 let block_index = state.block_counter as u32;
1538 let block_frame_offset = state.bytes_read_counter as u32;
1539 let (block_header, block_header_size) =
1540 block_dec.read_block_header(&mut source).map_err(|source| {
1541 block_header_decode_error(source, block_index, block_frame_offset)
1542 })?;
1543 state.bytes_read_counter += u64::from(block_header_size);
1544
1545 vprintln!();
1546 vprintln!(
1547 "Found {} block with size: {}, which will be of size: {}",
1548 block_header.block_type,
1549 block_header.content_size,
1550 block_header.decompressed_size
1551 );
1552
1553 #[cfg(all(feature = "lsm", feature = "hash"))]
1554 let len_before_block: Option<usize> = if self.per_block_checksums_enabled {
1555 Some(state.decoder_scratch.buffer_len())
1556 } else {
1557 None
1558 };
1559 let bytes_read_in_block_body = state
1560 .decoder_scratch
1561 .decode_block_content(&mut block_dec, &block_header, &mut source)
1562 .map_err(|source| {
1563 block_body_decode_error(
1564 source,
1565 block_index,
1566 block_frame_offset,
1567 &block_header,
1568 block_header_size,
1569 )
1570 })?;
1571 state.bytes_read_counter += bytes_read_in_block_body;
1572
1573 // Per-block XXH64 (low 32 bits) of the just-decompressed
1574 // bytes. Hashed from `last_n_as_slices` so RingBuffer wrap
1575 // is handled in-place, no extra copy.
1576 #[cfg(all(feature = "lsm", feature = "hash"))]
1577 if let Some(len_before_block) = len_before_block {
1578 let added = state.decoder_scratch.buffer_len() - len_before_block;
1579 let (s1, s2) = state.decoder_scratch.last_n_as_slices(added);
1580 let mut h = twox_hash::XxHash64::with_seed(0);
1581 use core::hash::Hasher;
1582 h.write(s1);
1583 h.write(s2);
1584 self.computed_block_checksums.push(h.finish() as u32);
1585 }
1586
1587 state.block_counter += 1;
1588
1589 vprintln!("Output: {}", state.decoder_scratch.buffer_len());
1590
1591 if block_header.last_block {
1592 state.frame_finished = true;
1593 if state.frame_header.descriptor.content_checksum_flag() {
1594 let mut chksum = [0u8; 4];
1595 source
1596 .read_exact(&mut chksum)
1597 .map_err(err::FailedToReadChecksum)?;
1598 state.bytes_read_counter += 4;
1599 let chksum = u32::from_le_bytes(chksum);
1600 state.check_sum = Some(chksum);
1601 }
1602 break;
1603 }
1604
1605 match strat {
1606 BlockDecodingStrategy::All => { /* keep going */ }
1607 BlockDecodingStrategy::UptoBlocks(n) => {
1608 if state.block_counter - block_counter_before >= n {
1609 break;
1610 }
1611 }
1612 BlockDecodingStrategy::UptoBytes(n) => {
1613 if state.decoder_scratch.buffer_len() - buffer_size_before >= n {
1614 break;
1615 }
1616 }
1617 }
1618 }
1619
1620 Ok(state.frame_finished)
1621 }
1622
1623 /// Decode the inner blocks `[start_block, end_block)` of the current
1624 /// frame and return their decompressed bytes as one contiguous buffer.
1625 ///
1626 /// Serves two consumer needs with one call:
1627 ///
1628 /// - **Range-query performance:** decode only the inner zstd blocks that
1629 /// cover a key range instead of the whole frame. Blocks before
1630 /// `start_block` are decoded into the window (zstd blocks share one
1631 /// window, so a leading block's bytes may be the match source for an
1632 /// in-range block and cannot simply be skipped) but their output is not
1633 /// returned; blocks at or after `end_block` are not decoded at all,
1634 /// which is the trailing-block work saving. Map a decompressed byte
1635 /// offset to a block index with
1636 /// [`FrameEmitInfo::decompressed_byte_range`].
1637 /// - **Best-effort recovery:** if a block decode fails, decoding stops,
1638 /// the clean prefix of in-range output is preserved in
1639 /// [`PartialDecode::data`], and the failure is reported via
1640 /// [`PartialDecode::stopped_at`]. Passing `(0, u32::MAX)` decodes the
1641 /// whole frame, stopping at the first corrupt block (pure recovery).
1642 ///
1643 /// `end_block` is exclusive; pass `u32::MAX` to decode to the end of the
1644 /// frame. Call on a freshly [`reset`](Self::reset) decoder (it decodes
1645 /// from the frame's first block).
1646 ///
1647 /// # Resume (cold incremental / top-up)
1648 ///
1649 /// A plain call drains its in-range output from the match window on return,
1650 /// so two consecutive calls cannot resume one another and growing a decoded
1651 /// extent would mean re-decoding the covering prefix from block 0
1652 /// (`O(extent)` per growth, `O(N²)` for a forward walk). The `resume` /
1653 /// `emit_resume` arguments make a symmetric one-call grow-loop possible:
1654 ///
1655 /// - `emit_resume = true` captures the cross-block carry-over state (entropy
1656 /// tables + repcode history + the next block index / output offset) into
1657 /// [`PartialDecode::resume_state`]. The entropy-table snapshot clone is
1658 /// only paid when this is set. The snapshot is `None` when the decode
1659 /// reaches the frame's last block ([`PartialDecode::frame_finished`]):
1660 /// there is no following block to resume from, so an incremental walk
1661 /// stops on `frame_finished` rather than on a `None` snapshot.
1662 /// - `resume = Some(`[`ResumeInput`]`)` continues from a previously emitted
1663 /// [`ResumeState`] WITHOUT re-decompressing the preceding blocks: the
1664 /// match window is primed from [`ResumeInput::window_prime`] and the
1665 /// entropy/repcode tables are restored from the state, so a `Repeat_Mode`
1666 /// resume block resolves byte-identically to a contiguous decode — even
1667 /// across a dropped (cold) decoder.
1668 ///
1669 /// When `resume` is `Some`, decoding resumes at
1670 /// [`ResumeState::block_index`] and the `start_block` argument is ignored
1671 /// (pass `resume.state.block_index()`); position `source` at that block's
1672 /// compressed frame offset
1673 /// ([`FrameEmitInfo::blocks`]`[block_index].offset_in_frame`). After a
1674 /// resumed call, [`bytes_read_from_source`](Self::bytes_read_from_source)
1675 /// and any `stopped_at` offsets are relative to the repositioned `source`.
1676 ///
1677 /// **Dictionaries:** [`ResumeState`] does NOT carry the dictionary content.
1678 /// For a dictionary frame, attach the dictionary to the resuming decoder the
1679 /// same way as for a fresh decode — [`reset`](Self::reset) with the
1680 /// dictionary registered (or
1681 /// [`reset_with_dict_handle`](Self::reset_with_dict_handle)) BEFORE this
1682 /// call — so dict-sourced matches near the frame start resolve. The caller
1683 /// already holds the dictionary (it supplied it at encode time), so
1684 /// re-supplying it on resume is free; storing it in the snapshot would only
1685 /// duplicate it. The resume guard records the applied dictionary's identity
1686 /// and rejects ([`FrameDecoderError::ResumeFrameMismatch`]) a resume whose
1687 /// active dictionary differs from the one the snapshot was captured under.
1688 ///
1689 /// # Errors
1690 ///
1691 /// Returns [`FrameDecoderError::NotYetInitialized`] if the decoder has not
1692 /// been reset, [`FrameDecoderError::InvalidBlockRange`] if the effective
1693 /// start exceeds `end_block`, [`FrameDecoderError::ResumeWindowTooShort`]
1694 /// if `resume`'s `window_prime` is shorter than the match window the resume
1695 /// block can reach back into (`min(window_size, output_offset)`), and
1696 /// [`FrameDecoderError::ResumeFrameMismatch`] if the snapshot was captured
1697 /// from a frame with a different decode shape / dictionary, or (with the
1698 /// `hash` feature) a `window_prime` whose content does not match what was
1699 /// captured — all rejected up front rather than silently mis-resolving
1700 /// matches. A corrupt block is NOT an `Err` here: it is reported via
1701 /// [`PartialDecode::stopped_at`] so the clean prefix survives.
1702 ///
1703 /// [`FrameEmitInfo::decompressed_byte_range`]: crate::encoding::frame_emit_info::FrameEmitInfo::decompressed_byte_range
1704 /// [`FrameEmitInfo::blocks`]: crate::encoding::frame_emit_info::FrameEmitInfo::blocks
1705 #[cfg(feature = "lsm")]
1706 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1707 pub fn decode_blocks_partial(
1708 &mut self,
1709 mut source: impl Read,
1710 start_block: u32,
1711 end_block: u32,
1712 resume: Option<ResumeInput<'_>>,
1713 emit_resume: bool,
1714 ) -> Result<PartialDecode, FrameDecoderError> {
1715 use FrameDecoderError as err;
1716 #[cfg(feature = "hash")]
1717 let checksum_mode = self.content_checksum;
1718 let magicless = self.magicless;
1719 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
1720
1721 // Honor the checksum mode before any drain/read can hash: `None` must
1722 // compute no XXH64. `decode_blocks` sets this; the partial path must too,
1723 // or a reused scratch keeps hashing with the default-enabled state.
1724 #[cfg(feature = "hash")]
1725 {
1726 let compute_hash = checksum_mode != ContentChecksum::None
1727 && state.frame_header.descriptor.content_checksum_flag();
1728 state.decoder_scratch.set_compute_hash(compute_hash);
1729 }
1730
1731 // Mirror `decode_blocks`: pre-reserve the backing buffer to the
1732 // FCS-capped window so multi-block frames don't pay repeated grow
1733 // steps. The RAW frame window stays separately bound — the resume
1734 // logic below bounds match reach by the frame's window semantics,
1735 // not by the (possibly smaller) reservation cap.
1736 let window_size = state.frame_header.window_size().unwrap_or(0) as usize;
1737 let useful_window = state.useful_window_size();
1738 state.decoder_scratch.reserve_buffer(useful_window);
1739
1740 // Cold resume: prime the match window + restore entropy/repcode state +
1741 // advance the block cursor BEFORE the loop, so the first in-range block
1742 // resolves its matches and `Repeat_Mode` tables against the caller's
1743 // persisted state instead of re-decoded prefix blocks. The effective
1744 // start is the resume state's block index (the passed `start_block` is
1745 // ignored in resume mode, per the doc).
1746 let effective_start = if let Some(r) = resume {
1747 // Reject a snapshot captured from a different frame shape BEFORE
1748 // touching any decoder state: restoring entropy/repcode tables that
1749 // belong to another frame would silently produce byte-wrong output.
1750 let current_key = FrameKey::from_state(state, magicless);
1751 if current_key != r.state.frame_key {
1752 return Err(err::ResumeFrameMismatch);
1753 }
1754 let output_offset = r.state.output_offset;
1755 // The window the resume block can reach back into is bounded by the
1756 // smaller of the frame's window_size and the bytes produced so far.
1757 let required = core::cmp::min(window_size as u64, output_offset) as usize;
1758 if r.window_prime.len() < required {
1759 return Err(err::ResumeWindowTooShort {
1760 got: r.window_prime.len(),
1761 need: required,
1762 });
1763 }
1764 // Only the most recent `window_size` bytes can ever back a match
1765 // (offset <= window_size by the frame invariant); load just those
1766 // even if the caller handed us a longer prefix, bounding resume
1767 // memory to one window regardless of the skipped prefix's size.
1768 let prime = if r.window_prime.len() > window_size {
1769 &r.window_prime[r.window_prime.len() - window_size..]
1770 } else {
1771 r.window_prime
1772 };
1773 // Content-exact identity: the primed window must hash to what was
1774 // captured at emit. Catches a same-shape-but-different-frame
1775 // snapshot and a wrong/corrupted window_prime (which FrameKey alone
1776 // cannot), before any state is restored. O(window) one-time per
1777 // resume — negligible next to the decode it guards.
1778 #[cfg(feature = "hash")]
1779 if xxh64_of(prime) != r.state.window_hash {
1780 return Err(err::ResumeFrameMismatch);
1781 }
1782 // Validate the effective range (resume mode begins at the resume
1783 // block, ignoring the caller's `start_block`) BEFORE mutating the
1784 // decoder: an inverted `end_block` must fail without priming the
1785 // window / entropy or advancing the cursor, leaving the decoder
1786 // re-resettable rather than in a half-resumed state.
1787 let effective_start = r.state.block_index;
1788 if effective_start > end_block {
1789 return Err(err::InvalidBlockRange {
1790 start_block: effective_start,
1791 end_block,
1792 });
1793 }
1794 state.decoder_scratch.restore_entropy(r.state);
1795 state.decoder_scratch.prime_window(prime, output_offset);
1796 state.block_counter = effective_start as usize;
1797 // The caller repositions `source` to the resume block; report
1798 // consumed bytes relative to that point (reset left this at the
1799 // frame-header size).
1800 state.bytes_read_counter = 0;
1801 effective_start
1802 } else {
1803 // Fresh decode: validate the caller's range (no state to mutate).
1804 if start_block > end_block {
1805 return Err(err::InvalidBlockRange {
1806 start_block,
1807 end_block,
1808 });
1809 }
1810 start_block
1811 };
1812
1813 let mut block_dec = decoding::block_decoder::new();
1814
1815 // Bytes of prefix-window output that physically precede the first
1816 // in-range block in the buffer. Captured at the prefix → in-range
1817 // transition (after leading blocks were dropped to the window) so we
1818 // can discard exactly those bytes once decoding is done. `None` until
1819 // the first in-range block is reached.
1820 let mut prefix_window_len: Option<usize> = None;
1821 // Exact count of clean in-range decompressed bytes (sum of per-block
1822 // length deltas of the in-range blocks that succeeded). Any partial
1823 // bytes of a failing in-range block are excluded — the fused executor
1824 // rolls the buffer back to the pre-block checkpoint on a sequence
1825 // error, and anything left over is never counted here, so it is not
1826 // drained into `data`.
1827 let mut subset_bytes: u64 = 0;
1828 let mut blocks_decoded: u32 = 0;
1829 let mut stopped_at: Option<(u32, FrameDecoderError)> = None;
1830
1831 loop {
1832 let block_index = state.block_counter as u32;
1833 // Stop before decoding `end_block`: the trailing blocks are never
1834 // touched (the perf win), and the frame's tail is left unread.
1835 if block_index >= end_block || state.frame_finished {
1836 break;
1837 }
1838 let in_range = block_index >= effective_start;
1839 // Snapshot the window length at the prefix → in-range boundary.
1840 if in_range && prefix_window_len.is_none() {
1841 prefix_window_len = Some(state.decoder_scratch.buffer_len());
1842 }
1843
1844 let block_frame_offset = state.bytes_read_counter as u32;
1845 let (block_header, block_header_size) = match block_dec.read_block_header(&mut source) {
1846 Ok(v) => v,
1847 Err(e) => {
1848 stopped_at = Some((
1849 block_index,
1850 block_header_decode_error(e, block_index, block_frame_offset),
1851 ));
1852 break;
1853 }
1854 };
1855 state.bytes_read_counter += u64::from(block_header_size);
1856
1857 let len_before = state.decoder_scratch.buffer_len();
1858 match state.decoder_scratch.decode_block_content(
1859 &mut block_dec,
1860 &block_header,
1861 &mut source,
1862 ) {
1863 Ok(body_read) => state.bytes_read_counter += body_read,
1864 Err(e) => {
1865 stopped_at = Some((
1866 block_index,
1867 block_body_decode_error(
1868 e,
1869 block_index,
1870 block_frame_offset,
1871 &block_header,
1872 block_header_size,
1873 ),
1874 ));
1875 break;
1876 }
1877 }
1878 let produced = state.decoder_scratch.buffer_len() - len_before;
1879 // Per-block XXH64 capture, mirroring `decode_blocks`: hash this
1880 // block's just-decoded bytes BEFORE any window drop so the digest
1881 // count stays 1:1 with the blocks decoded on this path too. Covers
1882 // context (out-of-range) blocks as well, matching `decode_blocks`
1883 // which hashes every block it decodes.
1884 #[cfg(all(feature = "lsm", feature = "hash"))]
1885 if self.per_block_checksums_enabled {
1886 use core::hash::Hasher;
1887 let (s1, s2) = state.decoder_scratch.last_n_as_slices(produced);
1888 let mut h = twox_hash::XxHash64::with_seed(0);
1889 h.write(s1);
1890 h.write(s2);
1891 self.computed_block_checksums.push(h.finish() as u32);
1892 }
1893 state.block_counter += 1;
1894 if in_range {
1895 subset_bytes += produced as u64;
1896 blocks_decoded += 1;
1897 }
1898
1899 if block_header.last_block {
1900 state.frame_finished = true;
1901 if state.frame_header.descriptor.content_checksum_flag() {
1902 let mut chksum = [0u8; 4];
1903 match source.read_exact(&mut chksum) {
1904 Ok(()) => {
1905 state.bytes_read_counter += 4;
1906 state.check_sum = Some(u32::from_le_bytes(chksum));
1907 }
1908 // A trailing-checksum read failure does not invalidate
1909 // the decoded bytes; surface it so the caller knows the
1910 // frame tail was truncated, but keep `data`.
1911 Err(e) => {
1912 stopped_at = Some((block_index, err::FailedToReadChecksum(e)));
1913 }
1914 }
1915 }
1916 break;
1917 }
1918
1919 // Leading (out-of-range) block: bound memory to the window. We
1920 // must NOT drop once in-range, or the in-range output we are about
1921 // to return would be discarded.
1922 if !in_range {
1923 state.decoder_scratch.buffer_drop_to_window_size();
1924 }
1925 }
1926
1927 // Emit cross-block carry-over state for a later resume, if requested.
1928 // Captured AFTER the loop (entropy tables / repcode history are final)
1929 // but BEFORE the drain — the drain only touches the visible output, not
1930 // the entropy state or `total_output_counter`. `block_counter` /
1931 // `total_output()` give the resume coordinates: the next block to decode
1932 // and the cumulative decompressed offset before it (clean even after an
1933 // early stop, since a failed block rolls both back to its checkpoint).
1934 // Suppress the snapshot on the terminal block: `block_counter` is then
1935 // one past the last block (EOF), for which there is no next-block source
1936 // position to resume from. A resume needs a real following block.
1937 let resume_state = if emit_resume && !state.frame_finished {
1938 let (fse, huf, offset_hist) = state.decoder_scratch.export_entropy();
1939 Some(ResumeState {
1940 frame_key: FrameKey::from_state(state, magicless),
1941 block_index: state.block_counter as u32,
1942 output_offset: state.decoder_scratch.total_output(),
1943 fse,
1944 huf,
1945 offset_hist,
1946 #[cfg(feature = "hash")]
1947 window_hash: state.decoder_scratch.window_tail_hash(window_size),
1948 })
1949 } else {
1950 None
1951 };
1952
1953 // The visible buffer is now `[prefix window][in-range clean][maybe
1954 // trailing garbage from a failed in-range block]`. Drop the prefix
1955 // window from the front (match resolution is complete, so it is no
1956 // longer needed), then drain exactly the clean in-range byte count.
1957 let w = prefix_window_len.unwrap_or(0);
1958 state.decoder_scratch.buffer_discard_front(w);
1959 let mut data = alloc::vec![0u8; subset_bytes as usize];
1960 state
1961 .decoder_scratch
1962 .buffer_read_all(&mut data)
1963 .map_err(err::FailedToDrainDecodebuffer)?;
1964
1965 // Clear anything still buffered so a later `read()`/`collect()` on this
1966 // decoder cannot surface out-of-range bytes: the leading-block window
1967 // when no in-range block was reached (`prefix_window_len` stayed
1968 // `None`, so `w` was 0), or trailing garbage from a failed in-range
1969 // block. Only the returned `data` is the partial decode's output.
1970 let residual = state.decoder_scratch.buffer_len();
1971 state.decoder_scratch.buffer_discard_front(residual);
1972
1973 Ok(PartialDecode {
1974 data,
1975 start_block: effective_start,
1976 blocks_decoded,
1977 stopped_at,
1978 frame_finished: state.frame_finished,
1979 resume_state,
1980 })
1981 }
1982
1983 /// Collect bytes and retain window_size bytes while decoding is still going on.
1984 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1985 pub fn collect(&mut self) -> Option<Vec<u8>> {
1986 let finished = self.is_finished();
1987 let state = self.state.as_mut()?;
1988 if finished {
1989 Some(state.decoder_scratch.buffer_drain())
1990 } else {
1991 state.decoder_scratch.buffer_drain_to_window_size()
1992 }
1993 }
1994
1995 /// Collect bytes and retain window_size bytes while decoding is still going on.
1996 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1997 pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
1998 let finished = self.is_finished();
1999 let state = match &mut self.state {
2000 None => return Ok(0),
2001 Some(s) => s,
2002 };
2003 if finished {
2004 state.decoder_scratch.buffer_drain_to_writer(w)
2005 } else {
2006 state.decoder_scratch.buffer_drain_to_window_size_writer(w)
2007 }
2008 }
2009
2010 /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
2011 /// because window_size bytes need to be retained for decoding.
2012 /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
2013 pub fn can_collect(&self) -> usize {
2014 let finished = self.is_finished();
2015 let state = match &self.state {
2016 None => return 0,
2017 Some(s) => s,
2018 };
2019 if finished {
2020 state.decoder_scratch.buffer_can_drain()
2021 } else {
2022 state
2023 .decoder_scratch
2024 .buffer_can_drain_to_window_size()
2025 .unwrap_or(0)
2026 }
2027 }
2028
2029 /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
2030 /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
2031 ///
2032 /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
2033 /// which try to serve an old-style c api
2034 ///
2035 /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
2036 /// input will not make any progress!
2037 ///
2038 /// Note that no kind of block can be bigger than 128kb.
2039 /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
2040 ///
2041 /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
2042 pub fn decode_from_to(
2043 &mut self,
2044 source: &[u8],
2045 target: &mut [u8],
2046 ) -> Result<(usize, usize), FrameDecoderError> {
2047 use FrameDecoderError as err;
2048 let bytes_read_at_start = match &self.state {
2049 Some(s) => s.bytes_read_counter,
2050 None => 0,
2051 };
2052
2053 if !self.is_finished() || self.state.is_none() {
2054 let mut mt_source = source;
2055
2056 if self.state.is_none() {
2057 self.init(&mut mt_source)?;
2058 }
2059
2060 //pseudo block to scope "state" so we can borrow self again after the block
2061 {
2062 let state = match &mut self.state {
2063 Some(s) => s,
2064 None => panic!("Bug in library"),
2065 };
2066 let mut block_dec = decoding::block_decoder::new();
2067
2068 // Honour the content-checksum mode on this hand-rolled decode
2069 // loop (it does not go through `decode_blocks`): hash only when
2070 // a digest is wanted and the frame carries one. `None` skips the
2071 // XXH64 pass; verification happens after the final drain below.
2072 #[cfg(feature = "hash")]
2073 {
2074 let compute_hash = self.content_checksum != ContentChecksum::None
2075 && state.frame_header.descriptor.content_checksum_flag();
2076 state.decoder_scratch.set_compute_hash(compute_hash);
2077 }
2078
2079 if state.frame_header.descriptor.content_checksum_flag()
2080 && state.frame_finished
2081 && state.check_sum.is_none()
2082 {
2083 // The trailing checksum arrived on a separate call (the last
2084 // block finished earlier). Consume it and fall through to the
2085 // shared `self.read` + post-drain verify below — NOT an early
2086 // return — so any output still buffered from a prior
2087 // small-`target` call is flushed on this call too, and the
2088 // checksum is verified through the one shared path.
2089 if mt_source.len() >= 4 {
2090 let chksum = mt_source[..4].try_into().expect("optimized away");
2091 state.bytes_read_counter += 4;
2092 let chksum = u32::from_le_bytes(chksum);
2093 state.check_sum = Some(chksum);
2094 mt_source = &mt_source[4..];
2095 }
2096 }
2097
2098 loop {
2099 // The frame is fully decoded (last block seen, trailer
2100 // consumed above); no more blocks to read. Any leftover
2101 // bytes are not a block header — stop before misreading them.
2102 if state.frame_finished {
2103 break;
2104 }
2105 //check if there are enough bytes for the next header
2106 if mt_source.len() < 3 {
2107 break;
2108 }
2109 let block_index = state.block_counter as u32;
2110 let block_frame_offset = state.bytes_read_counter as u32;
2111 let (block_header, block_header_size) = block_dec
2112 .read_block_header(&mut mt_source)
2113 .map_err(|source| {
2114 block_header_decode_error(source, block_index, block_frame_offset)
2115 })?;
2116
2117 // check the needed size for the block before updating counters.
2118 // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
2119 if mt_source.len() < block_header.content_size as usize {
2120 break;
2121 }
2122 state.bytes_read_counter += u64::from(block_header_size);
2123
2124 let bytes_read_in_block_body = state
2125 .decoder_scratch
2126 .decode_block_content(&mut block_dec, &block_header, &mut mt_source)
2127 .map_err(|source| {
2128 block_body_decode_error(
2129 source,
2130 block_index,
2131 block_frame_offset,
2132 &block_header,
2133 block_header_size,
2134 )
2135 })?;
2136 state.bytes_read_counter += bytes_read_in_block_body;
2137 state.block_counter += 1;
2138
2139 if block_header.last_block {
2140 state.frame_finished = true;
2141 if state.frame_header.descriptor.content_checksum_flag() {
2142 //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
2143 if mt_source.len() >= 4 {
2144 let chksum = mt_source[..4].try_into().expect("optimized away");
2145 state.bytes_read_counter += 4;
2146 let chksum = u32::from_le_bytes(chksum);
2147 state.check_sum = Some(chksum);
2148 }
2149 }
2150 break;
2151 }
2152 }
2153 }
2154 }
2155
2156 let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
2157 // Once the frame is fully decoded and drained, the running digest is
2158 // final: validate it in `Verify` mode (no-op otherwise). Same finish
2159 // point as the streaming reader.
2160 #[cfg(feature = "hash")]
2161 if self.is_finished() && self.can_collect() == 0 {
2162 self.verify_content_checksum()?;
2163 }
2164 let bytes_read_at_end = match &mut self.state {
2165 Some(s) => s.bytes_read_counter,
2166 None => panic!("Bug in library"),
2167 };
2168 let read_len = bytes_read_at_end - bytes_read_at_start;
2169 Ok((read_len as usize, result_len))
2170 }
2171
2172 /// Decode multiple frames into the output slice.
2173 ///
2174 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
2175 /// skipped during decode.
2176 ///
2177 /// `output` must be large enough to hold the decompressed data. If you don't know
2178 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
2179 ///
2180 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
2181 ///
2182 /// Returns the number of bytes written to `output`.
2183 pub fn decode_all(
2184 &mut self,
2185 input: &[u8],
2186 output: &mut [u8],
2187 ) -> Result<usize, FrameDecoderError> {
2188 #[cfg(not(feature = "lsm"))]
2189 {
2190 self.decode_all_impl(input, output, |this, src| this.init(src))
2191 }
2192 #[cfg(feature = "lsm")]
2193 {
2194 self.decode_all_impl(input, output, |this, src| this.init(src), None)
2195 }
2196 }
2197
2198 /// Decode multiple frames into the output slice, invoking `visitor`
2199 /// for every skippable frame encountered before advancing past it.
2200 ///
2201 /// `input` must contain an exact number of frames. Skippable frames
2202 /// (RFC 8878 §3.1.2 magic numbers `0x184D2A50..=0x184D2A5F`) are
2203 /// allowed and will be both visited AND skipped: the visitor gets
2204 /// `(magic_variant, payload)` where `magic_variant` is the low
2205 /// nibble of the magic (`magic - 0x184D2A50`, range `0..=15`) and
2206 /// `payload` is a borrowed slice of the on-wire payload bytes (the
2207 /// skippable frame's `Frame_Size` field worth of data) into
2208 /// `input` — no allocation.
2209 ///
2210 /// The visitor sees skippable frames in stream order; interleaved
2211 /// regular zstd frames continue to decompress into `output` exactly
2212 /// as `decode_all` does.
2213 ///
2214 /// `output` must be large enough to hold the decompressed data.
2215 /// Returns the number of bytes written to `output`.
2216 ///
2217 /// # Example
2218 ///
2219 /// ```ignore
2220 /// use structured_zstd::decoding::FrameDecoder;
2221 ///
2222 /// let mut decoder = FrameDecoder::new();
2223 /// let mut output = vec![0u8; 1024];
2224 /// let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
2225 /// let n = decoder.decode_all_with_skippable_visitor(
2226 /// input,
2227 /// &mut output,
2228 /// |variant, payload| collected.push((variant, payload.to_vec())),
2229 /// )?;
2230 /// ```
2231 #[cfg(feature = "lsm")]
2232 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
2233 pub fn decode_all_with_skippable_visitor<F>(
2234 &mut self,
2235 input: &[u8],
2236 output: &mut [u8],
2237 mut visitor: F,
2238 ) -> Result<usize, FrameDecoderError>
2239 where
2240 F: FnMut(u8, &[u8]),
2241 {
2242 self.decode_all_impl(
2243 input,
2244 output,
2245 |this, src| this.init(src),
2246 Some(&mut visitor),
2247 )
2248 }
2249
2250 /// Decode multiple frames into the output slice using a pre-parsed dictionary handle.
2251 ///
2252 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
2253 /// skipped during decode.
2254 ///
2255 /// `output` must be large enough to hold the decompressed data. If you don't know
2256 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
2257 ///
2258 /// This calls [`FrameDecoder::init_with_dict_handle`], and all bytes currently in the
2259 /// decoder will be lost.
2260 ///
2261 /// # Warning
2262 ///
2263 /// Each decoded frame is initialized with `dict`, even when a frame header
2264 /// omits the optional dictionary ID. Callers must only use this API when
2265 /// they already know the input frames were encoded with the provided
2266 /// dictionary; otherwise decoded output can be silently corrupted.
2267 pub fn decode_all_with_dict_handle(
2268 &mut self,
2269 input: &[u8],
2270 output: &mut [u8],
2271 dict: &DictionaryHandle,
2272 ) -> Result<usize, FrameDecoderError> {
2273 #[cfg(not(feature = "lsm"))]
2274 {
2275 self.decode_all_impl(input, output, |this, src| {
2276 this.init_with_dict_handle(src, dict)
2277 })
2278 }
2279 #[cfg(feature = "lsm")]
2280 {
2281 self.decode_all_impl(
2282 input,
2283 output,
2284 |this, src| this.init_with_dict_handle(src, dict),
2285 None,
2286 )
2287 }
2288 }
2289
2290 /// Whether the decoder sits at the very start of an initialised frame:
2291 /// the header has been read (state populated) but no block has been
2292 /// decoded and the frame is not finished. In this state the wrapped
2293 /// source is positioned exactly after the frame header, so
2294 /// [`Self::decode_current_frame_to_vec`] can decode the rest of the frame
2295 /// straight from the remaining source bytes.
2296 pub(crate) fn is_at_frame_start(&self) -> bool {
2297 self.state
2298 .as_ref()
2299 .is_some_and(|s| s.block_counter == 0 && !s.frame_finished)
2300 }
2301
2302 /// Decode the CURRENT (already-initialised) frame, APPENDING the
2303 /// decompressed bytes to `output`, and return the number appended.
2304 ///
2305 /// `input` must be the frame's post-header bytes (the wrapped source after
2306 /// `init` consumed the header). Unlike [`Self::decode_all_to_vec`] this
2307 /// neither re-reads a header nor requires the caller to pre-reserve
2308 /// capacity: a frame that declares its content size decodes DIRECTLY into
2309 /// freshly-grown `output` capacity via the single-copy direct path
2310 /// ([`Self::run_direct_decode`]) — bypassing the `Ring`/`FlatBuf` →
2311 /// `read()` drain copy the streaming loop pays — while an unsized frame
2312 /// falls back to the window-bounded ring drain (still one copy, into
2313 /// `output`). Backs [`StreamingDecoder`](crate::decoding::StreamingDecoder)'s
2314 /// `read_to_end` fast path; the caller must ensure
2315 /// [`Self::is_at_frame_start`].
2316 ///
2317 /// # Errors
2318 ///
2319 /// Propagates any [`FrameDecoderError`] from block decode, content-size
2320 /// mismatch, or (in `Verify` mode) checksum validation.
2321 pub(crate) fn decode_current_frame_to_vec(
2322 &mut self,
2323 mut input: &[u8],
2324 output: &mut Vec<u8>,
2325 dict: Option<&DictionaryHandle>,
2326 ) -> Result<usize, FrameDecoderError> {
2327 let start_len = output.len();
2328 // The current frame is already initialised (its header consumed by the
2329 // caller, WITH `dict` applied if the decoder was constructed with one).
2330 // Decode it, then decode any FOLLOWING concatenated / skippable frames
2331 // in `input` so the whole source is consumed to EOF and nothing is
2332 // dropped (matching `read_to_end` semantics).
2333 self.decode_one_frame_to_vec(&mut input, output)?;
2334 self.decode_concatenated_frames_to_vec(&mut input, output, dict)?;
2335 Ok(output.len() - start_len)
2336 }
2337
2338 /// Initialise and decode every frame remaining in `input` (concatenated /
2339 /// skippable), APPENDING to `output`. `input` is advanced as frames are
2340 /// consumed; on return it is empty. Re-initialisation honours `dict`: when
2341 /// `Some`, each following frame is initialised via
2342 /// [`Self::init_with_dict_handle`] so a forced dictionary is preserved even
2343 /// for frames that omit the dictionary id (plain [`Self::init`] would
2344 /// resolve dictionaries by id only). Backs the `read_to_end` fast path (the
2345 /// frames after the current one) and its mid-frame fallback (the frames
2346 /// after the partially-read one).
2347 pub(crate) fn decode_concatenated_frames_to_vec(
2348 &mut self,
2349 input: &mut &[u8],
2350 output: &mut Vec<u8>,
2351 dict: Option<&DictionaryHandle>,
2352 ) -> Result<usize, FrameDecoderError> {
2353 let start_len = output.len();
2354 while !input.is_empty() {
2355 let init_result = match dict {
2356 Some(d) => self.init_with_dict_handle(&mut *input, d),
2357 None => self.init(&mut *input),
2358 };
2359 match init_result {
2360 Ok(_) => {}
2361 Err(FrameDecoderError::ReadFrameHeaderError(
2362 crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
2363 )) => {
2364 *input = input
2365 .get(length as usize..)
2366 .ok_or(FrameDecoderError::FailedToSkipFrame)?;
2367 continue;
2368 }
2369 Err(e) => return Err(e),
2370 }
2371 self.decode_one_frame_to_vec(&mut *input, output)?;
2372 }
2373 Ok(output.len() - start_len)
2374 }
2375
2376 /// Decode the single CURRENT (already-initialised) frame, APPENDING to
2377 /// `output`. Helper for [`Self::decode_current_frame_to_vec`].
2378 fn decode_one_frame_to_vec(
2379 &mut self,
2380 input: &mut &[u8],
2381 output: &mut Vec<u8>,
2382 ) -> Result<usize, FrameDecoderError> {
2383 let frame_start = output.len();
2384 let (content_size, fcs_declared) = {
2385 let s = self.state.as_ref().expect("frame is initialised");
2386 (
2387 s.frame_header.frame_content_size(),
2388 s.frame_header.fcs_declared(),
2389 )
2390 };
2391 // Direct path: a declared, non-empty content size that FITS in `usize`
2392 // (and whose end offset does not overflow). `usize::try_from` guards the
2393 // 32-bit / oversized-FCS truncation; an unrepresentable size falls
2394 // through to the window-bounded ring drain rather than allocating a
2395 // truncated buffer that would violate `run_direct_decode`'s precondition.
2396 //
2397 // Plausibility gate: the direct path `resize`s `output` to the declared
2398 // size up front, so a tiny/truncated frame declaring a huge (but
2399 // representable) FCS would allocate + zero that whole size before the
2400 // body is validated. zstd's per-block ceiling is MAX_BLOCK_SIZE from as
2401 // little as ~4 input bytes, so the declared size cannot legitimately
2402 // exceed `input.len() * (MAX_BLOCK_SIZE / 4)`. Anything larger falls
2403 // through to the ring drain, which grows only as real bytes are produced
2404 // and errors out cheaply on truncated input. `input` spans the remaining
2405 // source (this frame plus any following ones), so the bound only ever
2406 // over-permits — a legitimate frame is never forced off the direct path.
2407 // saturating_mul is intentional: an overflow means the available input
2408 // is so large that any representable FCS is plausible (cap = "no limit").
2409 const MAX_DECOMPRESSION_RATIO: usize = (crate::common::MAX_BLOCK_SIZE / 4) as usize;
2410 if content_size > 0
2411 && let Ok(cs) = usize::try_from(content_size)
2412 && cs <= input.len().saturating_mul(MAX_DECOMPRESSION_RATIO)
2413 && let Some(frame_end) = frame_start.checked_add(cs)
2414 {
2415 // Reserve exactly the frame's content and decode straight into it
2416 // (single copy, no ring). The direct path writes precisely
2417 // `content_size` bytes (erroring otherwise), so the grown region is
2418 // fully written.
2419 output.resize(frame_end, 0);
2420 // On error, drop the just-grown (zeroed) tail before propagating so
2421 // callers never observe bytes that were never decoded.
2422 let written =
2423 match self.run_direct_decode(&mut *input, &mut output[frame_start..], content_size)
2424 {
2425 Ok(n) => n,
2426 Err(e) => {
2427 output.truncate(frame_start);
2428 return Err(e);
2429 }
2430 };
2431 output.truncate(frame_start + written);
2432 #[cfg(feature = "hash")]
2433 self.verify_content_checksum()?;
2434 return Ok(written);
2435 }
2436 // The ring-drain fallback below pre-reserves `useful_window_size()`
2437 // (= `window.min(FCS)`), which for a single-segment frame is the
2438 // declared FCS itself — so a truncated single-segment frame lying about
2439 // its size would still allocate the pledged window before the body
2440 // errors, sidestepping the direct-path gate above. Reject such a frame
2441 // up front when its declared (FCS-bearing) window exceeds what the
2442 // available input could plausibly produce. Frames without a declared
2443 // size keep their window-descriptor reservation (already capped at
2444 // `MAXIMUM_ALLOWED_WINDOW_SIZE` at init); a small-window multi-segment
2445 // frame still falls through to the ring drain, which errors cheaply on
2446 // the truncated body.
2447 if fcs_declared
2448 && let Some(state) = self.state.as_ref()
2449 && state.useful_window_size() > input.len().saturating_mul(MAX_DECOMPRESSION_RATIO)
2450 {
2451 return Err(FrameDecoderError::FrameContentSizeMismatch {
2452 declared: content_size,
2453 produced: 0,
2454 });
2455 }
2456 // No declared size, explicit FCS=0, or an unrepresentable FCS: window-
2457 // bounded ring drain, appended directly to `output` via
2458 // `collect_to_writer` (no staging buffer).
2459 loop {
2460 self.decode_blocks(&mut *input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
2461 self.collect_to_writer(&mut *output)
2462 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
2463 if self.is_finished() {
2464 // Final flush of the retained window tail.
2465 self.collect_to_writer(&mut *output)
2466 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
2467 break;
2468 }
2469 }
2470 let produced = (output.len() - frame_start) as u64;
2471 // A declared content size MUST match what the body produced — otherwise
2472 // accept the same corrupt frames `decode_all_impl` rejects (e.g. an
2473 // explicit FCS=0 whose body emits bytes). Use `fcs_declared()` so an
2474 // on-wire FCS=0 is validated, while an unknown size is not.
2475 if fcs_declared && produced != content_size {
2476 return Err(FrameDecoderError::FrameContentSizeMismatch {
2477 declared: content_size,
2478 produced,
2479 });
2480 }
2481 #[cfg(feature = "hash")]
2482 self.verify_content_checksum()?;
2483 Ok(produced as usize)
2484 }
2485
2486 /// Default-feature decode_all_impl: no visitor parameter so the
2487 /// no-lsm build's call surface and codegen are byte-identical to
2488 /// the pre-#172 implementation. Compiles only when `lsm` is OFF.
2489 #[cfg(not(feature = "lsm"))]
2490 fn decode_all_impl(
2491 &mut self,
2492 mut input: &[u8],
2493 mut output: &mut [u8],
2494 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
2495 ) -> Result<usize, FrameDecoderError> {
2496 let mut total_bytes_written = 0;
2497 while !input.is_empty() {
2498 match init_frame(self, &mut input) {
2499 Ok(_) => {}
2500 Err(FrameDecoderError::ReadFrameHeaderError(
2501 crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
2502 )) => {
2503 input = input
2504 .get(length as usize..)
2505 .ok_or(FrameDecoderError::FailedToSkipFrame)?;
2506 continue;
2507 }
2508 Err(e) => return Err(e),
2509 };
2510 // Per-frame direct-path dispatch. Now safe to route the
2511 // public `decode_all` here because
2512 // `UserSliceBackend::exec_sequence_inline` returns
2513 // `Result<(), ExecuteSequencesError>` instead of
2514 // panicking on capacity overflow; the error propagates
2515 // up as `FrameDecoderError`. Eligibility (FCS > 0,
2516 // remaining `output` slice holds the declared content)
2517 // puts the frame on the fast path that bypasses the
2518 // FlatBuf/Ring -> `read()` drain copy. Ineligible frames
2519 // (no FCS, output too small) fall through to the legacy
2520 // `decode_blocks` + `read` drain loop below. Dictionary
2521 // frames are eligible: `run_direct_decode` hands the
2522 // shared dict handle to its buffer, and beyond-prefix
2523 // offsets resolve through `repeat_from_dict`.
2524 let (content_size, fcs_declared) = {
2525 let state_ref = self.state.as_ref().expect("init populated state");
2526 (
2527 state_ref.frame_header.frame_content_size(),
2528 state_ref.frame_header.fcs_declared(),
2529 )
2530 };
2531 // Direct decode requires only that the caller slice holds the
2532 // declared content; the inline sequence-exec path no longer
2533 // needs `WILDCOPY_OVERLENGTH` trailing slack because the
2534 // trailing sequence(s) take the bounded (non-overshooting)
2535 // copy in `UserSliceBackend::exec_sequence_bounded`. This is
2536 // the universal "decode into an FCS-sized buffer" case (a
2537 // caller sizing `output` to exactly `frame_content_size`),
2538 // so dropping the slack requirement halves its peak alloc.
2539 //
2540 // Per-block checksums collected inside `run_direct_decode`
2541 // post-loop (over recorded (start, end) ranges of `output`)
2542 // so the direct path stays eligible AND keeps the
2543 // window-size cap (`drop_to_window_size`) between blocks
2544 // that the spec relies on for `offset <= window_size`
2545 // validation. Path choice no longer alters checksum
2546 // semantics.
2547 let direct_eligible = content_size > 0 && (output.len() as u64) >= content_size;
2548 if direct_eligible {
2549 let written = self.run_direct_decode(&mut input, output, content_size)?;
2550 output = &mut output[written..];
2551 total_bytes_written += written;
2552 // Per-frame content-checksum verification (no-op unless the
2553 // mode is `Verify` and the frame carries a checksum).
2554 #[cfg(feature = "hash")]
2555 self.verify_content_checksum()?;
2556 continue;
2557 }
2558 // Non-direct fallback: pre-reserve the backing buffer to
2559 // `window_size` in a single allocation before block decode
2560 // starts, so multi-segment frames don't pay repeated
2561 // `reserve_amortized` grow steps as blocks accumulate (each
2562 // block only reserves MAX_BLOCK_SIZE = 128 KiB, so a window
2563 // > 128 KiB otherwise grows through several intermediate
2564 // sizes with `alloc_zeroed + memcpy` each time).
2565 if let Some(state) = self.state.as_mut() {
2566 // FCS-capped via `useful_window_size` — the same cap
2567 // `decode_blocks` applies, so its per-iteration reserve in
2568 // the loop below cannot grow the buffer back to the raw
2569 // frame window.
2570 let useful_window = state.useful_window_size();
2571 state.decoder_scratch.reserve_buffer(useful_window);
2572 }
2573 let frame_start_total = total_bytes_written;
2574 loop {
2575 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
2576 let bytes_written = self
2577 .read(output)
2578 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
2579 output = &mut output[bytes_written..];
2580 total_bytes_written += bytes_written;
2581 if self.can_collect() != 0 {
2582 return Err(FrameDecoderError::TargetTooSmall);
2583 }
2584 if self.is_finished() {
2585 break;
2586 }
2587 }
2588 // Per-frame FCS validation on the legacy fallback path.
2589 // Use `fcs_declared()` (NOT `content_size > 0`) so an
2590 // empty frame with explicit FCS=0 on the wire still gets
2591 // validated.
2592 if fcs_declared {
2593 let produced = (total_bytes_written - frame_start_total) as u64;
2594 if produced != content_size {
2595 return Err(FrameDecoderError::FrameContentSizeMismatch {
2596 declared: content_size,
2597 produced,
2598 });
2599 }
2600 }
2601 // Per-frame content-checksum verification on the drain path: the
2602 // frame is fully decoded and drained here (is_finished + nothing
2603 // left to collect), so the running digest and stored value are
2604 // final. No-op unless the mode is `Verify`.
2605 #[cfg(feature = "hash")]
2606 self.verify_content_checksum()?;
2607 }
2608
2609 Ok(total_bytes_written)
2610 }
2611
2612 /// `lsm`-feature decode_all_impl: adds the optional skippable
2613 /// visitor parameter consumed by
2614 /// [`Self::decode_all_with_skippable_visitor`]. Mirrors the no-lsm
2615 /// variant including the direct-path dispatch + FCS-validation
2616 /// rationale comments, so the two functions stay in sync; the only
2617 /// behavioral difference is the SkipFrame arm, which uses
2618 /// `split_at(length)` (single bounds check) instead of two
2619 /// separate `get(..length)` / `get(length..)` slices and invokes
2620 /// the visitor (when `Some`) on the borrowed payload before
2621 /// advancing past it.
2622 #[cfg(feature = "lsm")]
2623 #[allow(clippy::type_complexity)]
2624 fn decode_all_impl(
2625 &mut self,
2626 mut input: &[u8],
2627 mut output: &mut [u8],
2628 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
2629 mut skippable_visitor: Option<&mut dyn FnMut(u8, &[u8])>,
2630 ) -> Result<usize, FrameDecoderError> {
2631 let mut total_bytes_written = 0;
2632 while !input.is_empty() {
2633 match init_frame(self, &mut input) {
2634 Ok(_) => {}
2635 Err(FrameDecoderError::ReadFrameHeaderError(
2636 crate::decoding::errors::ReadFrameHeaderError::SkipFrame {
2637 magic_number,
2638 length,
2639 },
2640 )) => {
2641 let length = length as usize;
2642 // Visitor sees the payload slice BEFORE we advance
2643 // past it. Borrowed slice — no allocation. The
2644 // variant is the low nibble of the magic number
2645 // (RFC 8878 §3.1.2). `read_frame_header` only emits
2646 // SkipFrame for magic in 0x184D2A50..=0x184D2A5F, so
2647 // the subtraction fits in 0..=15.
2648 if input.len() < length {
2649 return Err(FrameDecoderError::FailedToSkipFrame);
2650 }
2651 let (payload, rest) = input.split_at(length);
2652 if let Some(visitor) = skippable_visitor.as_mut() {
2653 let variant = (magic_number - 0x184D2A50) as u8;
2654 visitor(variant, payload);
2655 }
2656 input = rest;
2657 continue;
2658 }
2659 Err(e) => return Err(e),
2660 };
2661 // Per-frame direct-path dispatch. Now safe to route the
2662 // public `decode_all` here because
2663 // `UserSliceBackend::exec_sequence_inline` returns
2664 // `Result<(), ExecuteSequencesError>` instead of
2665 // panicking on capacity overflow; the error propagates
2666 // up as `FrameDecoderError`. Eligibility (FCS > 0,
2667 // remaining `output` slice holds the declared content)
2668 // puts the frame on the fast path that bypasses the
2669 // FlatBuf/Ring -> `read()` drain copy. Ineligible frames
2670 // (no FCS, output too small) fall through to the legacy
2671 // `decode_blocks` + `read` drain loop below. Dictionary
2672 // frames are eligible (see the no-lsm path above).
2673 let (content_size, fcs_declared) = {
2674 let state_ref = self.state.as_ref().expect("init populated state");
2675 (
2676 state_ref.frame_header.frame_content_size(),
2677 state_ref.frame_header.fcs_declared(),
2678 )
2679 };
2680 // Only `cap >= frame_content_size` needed; the trailing
2681 // sequence(s) take the bounded copy in
2682 // `UserSliceBackend::exec_sequence_bounded`, so no
2683 // `WILDCOPY_OVERLENGTH` trailing slack is required (see the
2684 // no-lsm path above).
2685 let direct_eligible = content_size > 0 && (output.len() as u64) >= content_size;
2686 if direct_eligible {
2687 let written = self.run_direct_decode(&mut input, output, content_size)?;
2688 output = &mut output[written..];
2689 total_bytes_written += written;
2690 // Per-frame content-checksum verification (no-op unless the
2691 // mode is `Verify` and the frame carries a checksum).
2692 #[cfg(feature = "hash")]
2693 self.verify_content_checksum()?;
2694 continue;
2695 }
2696 // Non-direct fallback: pre-reserve the backing buffer to
2697 // `window_size` once so the per-block growth cycle is
2698 // skipped (see same comment on the no-lsm path above).
2699 if let Some(state) = self.state.as_mut() {
2700 // FCS-capped via `useful_window_size` — the same cap
2701 // `decode_blocks` applies, so its per-iteration reserve in
2702 // the loop below cannot grow the buffer back to the raw
2703 // frame window.
2704 let useful_window = state.useful_window_size();
2705 state.decoder_scratch.reserve_buffer(useful_window);
2706 }
2707 let frame_start_total = total_bytes_written;
2708 loop {
2709 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
2710 let bytes_written = self
2711 .read(output)
2712 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
2713 output = &mut output[bytes_written..];
2714 total_bytes_written += bytes_written;
2715 if self.can_collect() != 0 {
2716 return Err(FrameDecoderError::TargetTooSmall);
2717 }
2718 if self.is_finished() {
2719 break;
2720 }
2721 }
2722 // Per-frame FCS validation on the legacy fallback path.
2723 // Use `fcs_declared()` (NOT `content_size > 0`) so an
2724 // empty frame with explicit FCS=0 on the wire still gets
2725 // validated.
2726 if fcs_declared {
2727 let produced = (total_bytes_written - frame_start_total) as u64;
2728 if produced != content_size {
2729 return Err(FrameDecoderError::FrameContentSizeMismatch {
2730 declared: content_size,
2731 produced,
2732 });
2733 }
2734 }
2735 // Per-frame content-checksum verification on the drain path: the
2736 // frame is fully decoded and drained here (is_finished + nothing
2737 // left to collect), so the running digest and stored value are
2738 // final. No-op unless the mode is `Verify`.
2739 #[cfg(feature = "hash")]
2740 self.verify_content_checksum()?;
2741 }
2742
2743 Ok(total_bytes_written)
2744 }
2745
2746 /// Decode multiple frames into the output slice using a serialized dictionary.
2747 ///
2748 /// # Warning
2749 ///
2750 /// Each decoded frame is initialized with the parsed dictionary, even when a
2751 /// frame header omits the optional dictionary ID. Callers must only use this
2752 /// API when they already know the input frames were encoded with that
2753 /// dictionary; otherwise decoded output can be silently corrupted.
2754 pub fn decode_all_with_dict_bytes(
2755 &mut self,
2756 input: &[u8],
2757 output: &mut [u8],
2758 raw_dictionary: &[u8],
2759 ) -> Result<usize, FrameDecoderError> {
2760 let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
2761 self.decode_all_with_dict_handle(input, output, &dict)
2762 }
2763
2764 /// Decode multiple frames into the extra capacity of the output vector.
2765 ///
2766 /// `input` must contain an exact number of frames.
2767 ///
2768 /// `output` must have enough spare capacity to hold the decompressed
2769 /// data. This adds no extra slack: exact-fit output is now eligible
2770 /// for the direct decode path, so a `Vec::with_capacity(fcs)` is
2771 /// decoded straight into without a growth/reallocation. It will NOT
2772 /// grow the vector to fit the decompressed payload itself; the
2773 /// caller's pre-allocated capacity must already cover the data. If
2774 /// you don't know how large the output will be, use
2775 /// [`FrameDecoder::decode_blocks`] instead.
2776 ///
2777 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
2778 ///
2779 /// The length of the output vector is updated to include the
2780 /// decompressed data. The length is not changed if an error occurs.
2781 pub fn decode_all_to_vec(
2782 &mut self,
2783 input: &[u8],
2784 output: &mut Vec<u8>,
2785 ) -> Result<(), FrameDecoderError> {
2786 let len = output.len();
2787 let cap = output.capacity();
2788 output.resize(cap, 0);
2789 match self.decode_all(input, &mut output[len..]) {
2790 Ok(bytes_written) => {
2791 let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
2792 output.resize(new_len, 0);
2793 Ok(())
2794 }
2795 Err(e) => {
2796 output.resize(len, 0);
2797 Err(e)
2798 }
2799 }
2800 }
2801
2802 /// Single-frame direct-decode path. Decodes one zstd frame into
2803 /// `output[..content_size]` via a stack-local
2804 /// `DecodeBuffer<UserSliceBackend>`, bypassing the per-block
2805 /// FlatBuf/Ring -> `read()` drain copy.
2806 ///
2807 /// # Preconditions (caller-enforced)
2808 ///
2809 /// - `self.init` (or `init_with_dict_handle`) was called for
2810 /// this frame so `self.state` is populated.
2811 /// - `content_size` matches `self.state.frame_header
2812 /// .frame_content_size()` and is `> 0` (caller already passed
2813 /// the eligibility gate).
2814 /// - `output.len() >= content_size`. No `WILDCOPY_OVERLENGTH`
2815 /// trailing slack is required: the trailing sequence(s) take the
2816 /// bounded (non-overshooting) copy in
2817 /// [`UserSliceBackend::exec_sequence_bounded`].
2818 ///
2819 /// Dictionary frames are supported: the scratch buffer's shared
2820 /// dict handle is forwarded to the stack-local `DecodeBuffer`, so
2821 /// offsets reaching past the frame's own output resolve through
2822 /// `repeat_from_dict` (the ext-dict slow path).
2823 ///
2824 /// On return, `input` points at the byte immediately after the
2825 /// frame's checksum (or after the last block, when the frame
2826 /// has `content_checksum_flag = 0`). `self.state.frame_finished`
2827 /// is set so [`Self::is_finished`] reports `true`.
2828 fn run_direct_decode(
2829 &mut self,
2830 input: &mut &[u8],
2831 output: &mut [u8],
2832 content_size: u64,
2833 ) -> Result<usize, FrameDecoderError> {
2834 #[cfg(test)]
2835 {
2836 self.direct_frames += 1;
2837 }
2838 use super::block_decoder;
2839 use super::decode_buffer::DecodeBuffer;
2840 use super::scratch::DirectScratch;
2841 use super::user_slice_buf::UserSliceBackend;
2842 use crate::io::Read;
2843 use FrameDecoderError as err;
2844
2845 let state = self
2846 .state
2847 .as_mut()
2848 .expect("caller ensures init populated state");
2849
2850 // Borrow persistent fields out of whichever scratch variant
2851 // `init` produced (Flat for single_segment, Ring for
2852 // multi-segment) — both expose the same HUF/FSE/Vec
2853 // fields; only `buffer` differs and we don't use that here.
2854 // Macro-style binding avoids the closure / generic
2855 // gymnastics of returning multiple `&mut` from a match arm.
2856 let (huf, fse, offset_hist, literals_buffer, block_content_buffer, window_size, dict) =
2857 match &mut state.decoder_scratch {
2858 DecoderScratchKind::Flat(s) => (
2859 &mut s.huf,
2860 &mut s.fse,
2861 &mut s.offset_hist,
2862 &mut s.literals_buffer,
2863 &mut s.block_content_buffer,
2864 s.buffer.window_size,
2865 s.buffer.dict.clone(),
2866 ),
2867 DecoderScratchKind::Ring(s) => (
2868 &mut s.huf,
2869 &mut s.fse,
2870 &mut s.offset_hist,
2871 &mut s.literals_buffer,
2872 &mut s.block_content_buffer,
2873 s.buffer.window_size,
2874 s.buffer.dict.clone(),
2875 ),
2876 };
2877 let backend = UserSliceBackend::from_slice(output);
2878 let mut buffer = DecodeBuffer::from_backend(backend, window_size);
2879 // Dictionary matches on the direct path: hand the shared handle
2880 // (refcount bump, no copy) to the stack-local buffer so offsets
2881 // reaching past the frame's own output resolve through
2882 // `repeat_from_dict` — the same ext-dict slow path the
2883 // FlatBuf/Ring backends use. The per-sequence hot path is
2884 // untouched: the inline-exec dispatch already routes
2885 // beyond-prefix offsets to the cold `repeat()` fallback.
2886 if let Some(handle) = dict {
2887 buffer.set_dict(handle);
2888 }
2889 let mut direct = DirectScratch {
2890 huf,
2891 fse,
2892 offset_hist,
2893 literals_buffer,
2894 block_content_buffer,
2895 buffer,
2896 };
2897
2898 // Block loop. Mirrors `decode_blocks` (without the
2899 // strategy-bounded early exit — we always decode the whole
2900 // frame in one shot for the direct path). Keeps
2901 // `state.bytes_read_counter` / `state.block_counter` in
2902 // sync with `decode_blocks` so post-call accessors
2903 // (`bytes_read_from_source`, `blocks_decoded`) return
2904 // accurate values.
2905 let mut block_dec = block_decoder::new();
2906 // Track total output bytes against the declared
2907 // `frame_content_size` via the buffer's actual write
2908 // counter — `BlockHeader.decompressed_size` is 0 for
2909 // Compressed blocks (the header parser can't know the
2910 // expanded size before decoding the body), so per-header
2911 // tracking would always count 0 for those blocks and
2912 // miscount frames that aren't pure Raw/RLE.
2913 let mut produced: u64 = 0;
2914 // Per-block output ranges captured during the direct-path
2915 // loop. After the loop we re-borrow `output` (post-drop of
2916 // `direct`) and XXH64 each range into
2917 // `self.computed_block_checksums`, so the digests vector
2918 // stays consistent with the legacy `decode_blocks` path
2919 // regardless of which dispatch the frame took.
2920 // `Vec::new()` does not allocate, so this stays free when
2921 // `per_block_checksums_enabled` is false: the `push` and the
2922 // post-loop hashing loop are both gated by the same flag.
2923 #[cfg(all(feature = "lsm", feature = "hash"))]
2924 let mut block_ranges: alloc::vec::Vec<(usize, usize)> = alloc::vec::Vec::new();
2925 // Frame-level XXH64, accumulated PER BLOCK right after each block
2926 // decodes — the bytes are still cache-resident then. The previous
2927 // shape hashed the whole output once after the loop, which re-read
2928 // the entire frame cold: a full extra memory pass that the
2929 // reference implementation does not make (it hashes incrementally
2930 // per block). Invisible on outputs that fit L3, ~1.14x wall on a
2931 // 100 MiB all-raw decode and the dominant CI gap on
2932 // bandwidth-limited hosts.
2933 #[cfg(feature = "hash")]
2934 let mut running_hash: Option<twox_hash::XxHash64> =
2935 if state.frame_header.descriptor.content_checksum_flag()
2936 && self.content_checksum != ContentChecksum::None
2937 {
2938 Some(twox_hash::XxHash64::with_seed(0))
2939 } else {
2940 None
2941 };
2942 loop {
2943 #[cfg(all(feature = "lsm", feature = "hash"))]
2944 let produced_before: Option<usize> = if self.per_block_checksums_enabled {
2945 Some(produced as usize)
2946 } else {
2947 None
2948 };
2949 // Failing-block coordinates captured before the header read (see
2950 // the `decode_blocks` loop for the rationale).
2951 let block_index = state.block_counter as u32;
2952 let block_frame_offset = state.bytes_read_counter as u32;
2953 let (block_header, hsize) =
2954 block_dec.read_block_header(&mut *input).map_err(|source| {
2955 block_header_decode_error(source, block_index, block_frame_offset)
2956 })?;
2957 state.bytes_read_counter += u64::from(hsize);
2958 // Pre-flight FCS check ONLY for Raw / RLE blocks where
2959 // `decompressed_size` is the actual block output size.
2960 // For Compressed blocks the header field is 0; the
2961 // post-decode check below catches overflow via the
2962 // backend's actual write counter delta.
2963 let block_upper = u64::from(block_header.decompressed_size);
2964 if block_upper > 0 && produced + block_upper > content_size {
2965 // Frame is corrupt — Raw/RLE block headers claim
2966 // more output than the FCS allows.
2967 return Err(err::FrameContentSizeMismatch {
2968 declared: content_size,
2969 produced: produced + block_upper,
2970 });
2971 }
2972 // Slice-source fast path: consume the block body
2973 // straight from `input` without copying into the
2974 // persistent `block_content_buffer`.
2975 let body_consumed = match block_dec.decode_block_content_from_slice(
2976 &block_header,
2977 &mut direct,
2978 &mut *input,
2979 ) {
2980 Ok(n) => n,
2981 // Defense-in-depth: RLE / Raw block whose declared
2982 // `decompressed_size` slipped past the per-block
2983 // pre-flight above and tripped the backend's
2984 // fallible write surface.
2985 Err(crate::decoding::errors::DecodeBlockContentError::BackendOverflow {
2986 ..
2987 }) => {
2988 // Use saturating_add on the
2989 // `produced + decompressed_size` sum. Each block
2990 // is bounded by 128 KiB (MAX_BLOCK_SIZE), but
2991 // accumulated `produced` can grow toward
2992 // u64::MAX across adversarial frames. Saturating
2993 // avoids a panic on the error path itself.
2994 return Err(err::FrameContentSizeMismatch {
2995 declared: content_size,
2996 produced: produced
2997 .saturating_add(u64::from(block_header.decompressed_size)),
2998 });
2999 }
3000 // Compressed-block in-block overshoot: the sequence
3001 // executor (upstream zstd-inline path) or the match-repeat
3002 // fallback tripped the fixed-capacity backend's per-write
3003 // check. Unlike Raw/RLE, a Compressed block carries no
3004 // header-declared output size, so `produced` is computed
3005 // from the partial fill: `tail` bytes were written before
3006 // the failing op, and `requested` is what overflowed —
3007 // their sum is a strict lower bound on the frame's true
3008 // expanded size and is always > `content_size` (the
3009 // direct path is only entered when the slice is sized to
3010 // `content_size + WILDCOPY_OVERLENGTH`, so any overflow
3011 // means the frame exceeded the declared FCS, never a
3012 // caller-undersized buffer). Folds into the same
3013 // `FrameContentSizeMismatch` contract as Raw/RLE.
3014 Err(crate::decoding::errors::DecodeBlockContentError::DecompressBlockError(
3015 crate::decoding::errors::DecompressBlockError::ExecuteSequencesError(ref e),
3016 )) if e.output_overflow_requested().is_some() => {
3017 let requested = e
3018 .output_overflow_requested()
3019 .expect("guard guarantees Some") as u64;
3020 let tail = direct.buffer.buffer_ref().tail() as u64;
3021 return Err(err::FrameContentSizeMismatch {
3022 declared: content_size,
3023 produced: tail.saturating_add(requested),
3024 });
3025 }
3026 Err(e) => {
3027 return Err(block_body_decode_error(
3028 e,
3029 block_index,
3030 block_frame_offset,
3031 &block_header,
3032 hsize,
3033 ));
3034 }
3035 };
3036 // Hash this block's freshly-written bytes while they are hot
3037 // (see `running_hash` above). `tail()` is the physical write
3038 // cursor: `drop_to_window_size` below only advances the head,
3039 // so `[prev_tail, tail)` is exactly this block's output.
3040 #[cfg(feature = "hash")]
3041 if let Some(hasher) = running_hash.as_mut() {
3042 use core::hash::Hasher;
3043 hasher.write(direct.buffer.buffer_ref().written_since(produced as usize));
3044 }
3045 produced = direct.buffer.buffer_ref().tail() as u64;
3046 // Post-decode FCS overflow check.
3047 if produced > content_size {
3048 return Err(err::FrameContentSizeMismatch {
3049 declared: content_size,
3050 produced,
3051 });
3052 }
3053 state.bytes_read_counter += body_consumed;
3054 state.block_counter += 1;
3055 #[cfg(all(feature = "lsm", feature = "hash"))]
3056 if let Some(produced_before) = produced_before {
3057 block_ranges.push((produced_before, produced as usize));
3058 }
3059 // Cap the visible buffer at window_size between blocks
3060 // so the next block's match-offset validation matches
3061 // the spec's `offset <= window_size` rule.
3062 direct.buffer.drop_to_window_size();
3063 if block_header.last_block {
3064 if state.frame_header.descriptor.content_checksum_flag() {
3065 let mut chksum = [0u8; 4];
3066 input
3067 .read_exact(&mut chksum)
3068 .map_err(err::FailedToReadChecksum)?;
3069 state.bytes_read_counter += 4;
3070 state.check_sum = Some(u32::from_le_bytes(chksum));
3071 }
3072 break;
3073 }
3074 }
3075 // Final sanity: blocks summed to exactly `content_size`.
3076 if produced != content_size {
3077 return Err(err::FrameContentSizeMismatch {
3078 declared: content_size,
3079 produced,
3080 });
3081 }
3082
3083 let written = content_size as usize;
3084 state.frame_finished = true;
3085 // Drop the stack-local DirectScratch (and its DecodeBuffer
3086 // borrow on `output`) so we can re-borrow `output` for the
3087 // hash pass below.
3088 drop(direct);
3089 // Per-block XXH64 (low 32 bits) over the captured ranges.
3090 // Mirrors `decode_blocks`' per-block hashing so the digests
3091 // vector stays identical regardless of which dispatch path
3092 // the frame took. Ranges were recorded inside the loop while
3093 // `direct` held a mutable borrow on `output`; now that the
3094 // borrow is dropped we can read the slices directly.
3095 #[cfg(all(feature = "lsm", feature = "hash"))]
3096 if self.per_block_checksums_enabled {
3097 use core::hash::Hasher;
3098 for (start, end) in &block_ranges {
3099 let mut h = twox_hash::XxHash64::with_seed(0);
3100 h.write(&output[*start..*end]);
3101 self.computed_block_checksums.push(h.finish() as u32);
3102 }
3103 }
3104 #[cfg(feature = "hash")]
3105 if let Some(hasher) = running_hash {
3106 // Propagate the per-block-accumulated hasher state (see the
3107 // `running_hash` rationale above the loop) so the frame-tail
3108 // XXH64 check and `get_calculated_checksum()` read the digest.
3109 // `running_hash` is `None` for flag-off frames or
3110 // `ContentChecksum::None` — nothing to verify there, and
3111 // `get_calculated_checksum()` returns `None`, matching the skip.
3112 match &mut state.decoder_scratch {
3113 DecoderScratchKind::Flat(s) => s.buffer.hash = hasher,
3114 DecoderScratchKind::Ring(s) => s.buffer.hash = hasher,
3115 }
3116 }
3117 Ok(written)
3118 }
3119}
3120
3121/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
3122/// this will retain window_size bytes, else it will drain it completely
3123impl Read for FrameDecoder {
3124 fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
3125 let state = match &mut self.state {
3126 None => return Ok(0),
3127 Some(s) => s,
3128 };
3129 if state.frame_finished {
3130 state.decoder_scratch.buffer_read_all(target)
3131 } else {
3132 state.decoder_scratch.buffer_read(target)
3133 }
3134 }
3135}
3136
3137#[cfg(test)]
3138mod tests {
3139 extern crate std;
3140
3141 use super::{DictionaryHandle, FrameDecoder};
3142 use crate::encoding::{CompressionLevel, FrameCompressor};
3143 use alloc::vec::Vec;
3144
3145 #[test]
3146 fn decode_all_tight_and_slack_outputs_match_on_single_segment_frame() {
3147 // Roundtrip a small payload through the encoder, then decode
3148 // it via `decode_all` on two output shapes that select
3149 // different internal sequence-exec paths within the direct
3150 // decode:
3151 // 1. Tight output (exactly `frame_content_size`, no
3152 // WILDCOPY_OVERLENGTH slack) → direct path whose trailing
3153 // sequence(s) take the bounded (non-overshooting) copy in
3154 // `UserSliceBackend::exec_sequence_bounded`.
3155 // 2. Output with WILDCOPY slack → direct path whose
3156 // sequences all take the SIMD wildcopy fast path.
3157 // Both must produce identical output bytes — the bounded tail
3158 // copy must reconstruct the same data as the overshooting fast
3159 // path. This is the regression gate for the relaxed
3160 // direct-decode gate (`cap >= content_size`).
3161 let payload: Vec<u8> = (0..4096u32).map(|i| (i & 0xFF) as u8).collect();
3162 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3163 compressor.set_source(payload.as_slice());
3164 let mut compressed = Vec::new();
3165 compressor.set_drain(&mut compressed);
3166 compressor.compress();
3167
3168 // Baseline: tight output → legacy drain path.
3169 let mut dec_a = FrameDecoder::new();
3170 let mut out_a = alloc::vec![0u8; payload.len()];
3171 let n_a = dec_a
3172 .decode_all(compressed.as_slice(), &mut out_a)
3173 .expect("decode_all (legacy drain) should succeed");
3174 assert_eq!(n_a, payload.len());
3175 assert_eq!(&out_a[..n_a], payload.as_slice());
3176
3177 // Direct: output with WILDCOPY slack → direct path.
3178 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3179 let mut dec_b = FrameDecoder::new();
3180 let mut out_b = alloc::vec![0u8; payload.len() + slack];
3181 let n_b = dec_b
3182 .decode_all(compressed.as_slice(), &mut out_b)
3183 .expect("decode_all (direct path) should succeed");
3184 assert_eq!(
3185 n_b,
3186 payload.len(),
3187 "direct decode produced wrong byte count"
3188 );
3189 assert_eq!(&out_b[..n_b], payload.as_slice());
3190 }
3191
3192 #[test]
3193 fn decode_all_tight_output_overlapping_tail_match_roundtrips() {
3194 // The bounded tail copy must handle an OVERLAPPING match
3195 // (offset < match_length) as the trailing sequence when the
3196 // output slice is sized to exactly `frame_content_size`. A long
3197 // run of a single byte at the end of the payload encodes as an
3198 // offset-1 match whose length far exceeds the offset, so the
3199 // bounded copy's overlapping (forward byte-by-byte) branch is
3200 // exercised at the buffer tail where the SIMD overshoot would
3201 // otherwise run past `cap`. Decoding into a tight buffer and
3202 // matching the original payload byte-for-byte is the regression
3203 // gate for the overlap branch of `exec_sequence_bounded`.
3204 let mut payload: Vec<u8> = (0..256u32).map(|i| (i & 0xFF) as u8).collect();
3205 payload.extend(core::iter::repeat_n(0xABu8, 8192));
3206 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3207 compressor.set_source(payload.as_slice());
3208 let mut compressed = Vec::new();
3209 compressor.set_drain(&mut compressed);
3210 compressor.compress();
3211
3212 // Anti-vacuous precondition: the 8 KiB trailing run of a single
3213 // byte must compress to a Compressed block dominated by ONE long
3214 // offset-1 (overlapping, offset < match_length) match — not a Raw
3215 // block. If the encoder ever stopped emitting that overlapping
3216 // tail match the test would pass without exercising
3217 // `exec_sequence_bounded`'s overlapping forward-copy branch, so
3218 // gate on the output being a tiny fraction of the input (a raw
3219 // block would be ~`payload.len()`; an offset-1 run match is tens
3220 // of bytes).
3221 assert!(
3222 compressed.len() < payload.len() / 8,
3223 "expected an overlapping-tail match to dominate the frame \
3224 (compressed={} payload={}); the bounded overlap branch would \
3225 not be exercised otherwise",
3226 compressed.len(),
3227 payload.len(),
3228 );
3229
3230 // Tight output: exactly content_size, no WILDCOPY slack.
3231 let mut dec = FrameDecoder::new();
3232 let mut out = alloc::vec![0u8; payload.len()];
3233 let n = dec
3234 .decode_all(compressed.as_slice(), &mut out)
3235 .expect("tight-output decode with overlapping tail match should succeed");
3236 assert_eq!(n, payload.len());
3237 assert_eq!(out, payload, "bounded overlap tail copy corrupted output");
3238 }
3239
3240 #[test]
3241 fn decode_all_multi_segment_frame_decodes_correctly() {
3242 // Multi-segment frame: payload large enough that the
3243 // encoder's default frame layout has `single_segment_flag =
3244 // false` and `window_size < frame_content_size`. The direct
3245 // path must cap the visible buffer at window_size after each
3246 // block (drop_to_window_size) so match-offset validation
3247 // matches the spec rule `offset <= window_size`, and still
3248 // produce the same bytes as decode_all on the
3249 // FlatBuf/Ring-backed path.
3250 //
3251 // Make the payload structured so multi-segment behavior
3252 // actually kicks in: 2 MiB of repeating + random-ish bytes
3253 // forces window_size lower than content_size at the encoder.
3254 let mut payload: Vec<u8> = Vec::with_capacity(2 * 1024 * 1024);
3255 for i in 0..payload.capacity() {
3256 payload.push((i.wrapping_mul(2_654_435_761) & 0xFF) as u8);
3257 }
3258 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3259 compressor.set_source(payload.as_slice());
3260 let mut compressed = Vec::new();
3261 compressor.set_drain(&mut compressed);
3262 compressor.compress();
3263
3264 // Baseline: decode_all through the FlatBuf+drain path.
3265 let mut dec_a = FrameDecoder::new();
3266 let mut out_a = alloc::vec![0u8; payload.len()];
3267 let n_a = dec_a
3268 .decode_all(compressed.as_slice(), &mut out_a)
3269 .expect("decode_all should succeed");
3270 assert_eq!(n_a, payload.len());
3271 assert_eq!(&out_a[..n_a], payload.as_slice());
3272
3273 // Direct path: must give identical bytes via UserSliceBackend
3274 // + per-block drop_to_window_size.
3275 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3276 let mut dec_b = FrameDecoder::new();
3277 let mut out_b = alloc::vec![0u8; payload.len() + slack];
3278 let n_b = dec_b
3279 .decode_all(compressed.as_slice(), &mut out_b)
3280 .expect("decode_all should succeed on multi-segment frame");
3281 assert_eq!(n_b, payload.len(), "wrong byte count on direct path");
3282 assert_eq!(&out_b[..n_b], payload.as_slice());
3283
3284 // Sanity-check: confirm the encoded frame really IS
3285 // multi-segment. If a future encoder default changes,
3286 // catching the assumption here is better than silently
3287 // testing single_segment on this name.
3288 let mut sanity = FrameDecoder::new();
3289 sanity.init(&mut compressed.as_slice()).unwrap();
3290 assert!(
3291 !sanity
3292 .state
3293 .as_ref()
3294 .unwrap()
3295 .frame_header
3296 .descriptor
3297 .single_segment_flag(),
3298 "test precondition violated: frame is single-segment, rename or resize"
3299 );
3300 }
3301
3302 #[cfg(feature = "hash")]
3303 #[test]
3304 fn decode_all_propagates_checksum_into_persistent_scratch() {
3305 // Direct path on a checksum-flagged frame: the FrameCompressor
3306 // under `feature = "hash"` sets content_checksum_flag, so the
3307 // decoded frame has a recorded checksum. After
3308 // decode_all we must be able to verify it matches via
3309 // the public get_calculated_checksum() accessor — the digest
3310 // is computed by walking output at end of decode and stored
3311 // into the persistent scratch's hasher.
3312 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3313 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3314 compressor.set_content_checksum(true);
3315 compressor.set_source(payload.as_slice());
3316 let mut compressed = Vec::new();
3317 compressor.set_drain(&mut compressed);
3318 compressor.compress();
3319
3320 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3321 let mut dec = FrameDecoder::new();
3322 let mut out = alloc::vec![0u8; payload.len() + slack];
3323 let n = dec
3324 .decode_all(compressed.as_slice(), &mut out)
3325 .expect("decode_all with checksum must succeed");
3326 assert_eq!(n, payload.len());
3327 assert_eq!(&out[..n], payload.as_slice());
3328
3329 // Both sides must report the same checksum: the frame header
3330 // carries the stored u32, and get_calculated_checksum reads
3331 // the running digest the direct path just propagated.
3332 let stored = dec.get_checksum_from_data();
3333 let calculated = dec.get_calculated_checksum();
3334 assert!(stored.is_some(), "frame must carry stored checksum");
3335 assert!(
3336 calculated.is_some(),
3337 "direct path must propagate calculated checksum"
3338 );
3339 assert_eq!(
3340 stored, calculated,
3341 "stored vs calculated checksum mismatch on direct path"
3342 );
3343 }
3344
3345 #[cfg(feature = "hash")]
3346 #[test]
3347 fn verify_mode_accepts_a_valid_frame() {
3348 use crate::decoding::ContentChecksum;
3349 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3350 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3351 compressor.set_content_checksum(true);
3352 compressor.set_source(payload.as_slice());
3353 let mut compressed = Vec::new();
3354 compressor.set_drain(&mut compressed);
3355 compressor.compress();
3356
3357 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3358 let mut dec = FrameDecoder::new();
3359 dec.set_content_checksum(ContentChecksum::Verify);
3360 let mut out = alloc::vec![0u8; payload.len() + slack];
3361 let n = dec
3362 .decode_all(compressed.as_slice(), &mut out)
3363 .expect("Verify mode must accept a frame with a correct checksum");
3364 assert_eq!(&out[..n], payload.as_slice());
3365 }
3366
3367 #[cfg(feature = "hash")]
3368 #[test]
3369 fn verify_mode_rejects_a_corrupted_checksum() {
3370 use crate::decoding::ContentChecksum;
3371 use crate::decoding::errors::FrameDecoderError;
3372 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3373 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3374 compressor.set_content_checksum(true);
3375 compressor.set_source(payload.as_slice());
3376 let mut compressed = Vec::new();
3377 compressor.set_drain(&mut compressed);
3378 compressor.compress();
3379
3380 // Flip a bit in the trailing 4-byte content checksum: the frame body
3381 // still decodes to the correct bytes, but the stored digest no longer
3382 // matches the one the decoder computes.
3383 let last = compressed.len() - 1;
3384 compressed[last] ^= 0xFF;
3385
3386 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3387 let mut dec = FrameDecoder::new();
3388 dec.set_content_checksum(ContentChecksum::Verify);
3389 let mut out = alloc::vec![0u8; payload.len() + slack];
3390 let err = dec
3391 .decode_all(compressed.as_slice(), &mut out)
3392 .expect_err("Verify mode must reject a corrupted checksum");
3393 assert!(
3394 matches!(err, FrameDecoderError::ChecksumMismatch { .. }),
3395 "expected ChecksumMismatch, got {err:?}"
3396 );
3397 }
3398
3399 #[cfg(feature = "hash")]
3400 #[test]
3401 fn decode_from_to_verify_rejects_corrupted_checksum() {
3402 // decode_from_to has its own block loop (not decode_blocks); it must
3403 // still honour Verify and reject a corrupted trailer rather than
3404 // silently accept it.
3405 use crate::decoding::ContentChecksum;
3406 use crate::decoding::errors::FrameDecoderError;
3407 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3408 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3409 compressor.set_content_checksum(true);
3410 compressor.set_source(payload.as_slice());
3411 let mut compressed = Vec::new();
3412 compressor.set_drain(&mut compressed);
3413 compressor.compress();
3414 let last = compressed.len() - 1;
3415 compressed[last] ^= 0xFF;
3416
3417 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3418 let mut dec = FrameDecoder::new();
3419 dec.set_content_checksum(ContentChecksum::Verify);
3420 let mut out = alloc::vec![0u8; payload.len() + slack];
3421
3422 // Split the trailing 4-byte checksum into a SEPARATE call so the
3423 // verification must happen on the checksum-only early-return path (not
3424 // the post-drain path) — the incremental case CodeRabbit flagged.
3425 let split = compressed.len() - 4;
3426 let (_r1, w1) = dec
3427 .decode_from_to(&compressed[..split], &mut out)
3428 .expect("blocks decode without the trailer");
3429 let err = dec
3430 .decode_from_to(&compressed[split..], &mut out[w1..])
3431 .expect_err("decode_from_to in Verify mode must reject a corrupted checksum");
3432 assert!(
3433 matches!(err, FrameDecoderError::ChecksumMismatch { .. }),
3434 "expected ChecksumMismatch, got {err:?}"
3435 );
3436 }
3437
3438 #[cfg(feature = "hash")]
3439 #[test]
3440 fn decode_from_to_small_target_split_trailer_flushes_tail() {
3441 // Regression: when a prior call decoded the last block but a small
3442 // `target` left output buffered, the trailer-only call must still flush
3443 // the buffered tail (it used to early-return Ok((4,0)) and lose it).
3444 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3445 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3446 compressor.set_content_checksum(true);
3447 compressor.set_source(payload.as_slice());
3448 let mut compressed = Vec::new();
3449 compressor.set_drain(&mut compressed);
3450 compressor.compress();
3451
3452 let split = compressed.len() - 4;
3453 let mut dec = FrameDecoder::new();
3454 let mut out = alloc::vec![0u8; payload.len()];
3455 // Call 1: all blocks, but a SMALL (64-byte) target leaves the rest
3456 // buffered on the decoder side.
3457 let (_r1, w1) = dec
3458 .decode_from_to(&compressed[..split], &mut out[..64])
3459 .expect("blocks decode with a small target");
3460 assert!(w1 <= 64);
3461 // Call 2: the 4-byte trailer alone must flush the buffered tail through
3462 // the shared read path, not return early and drop it.
3463 let (_r2, w2) = dec
3464 .decode_from_to(&compressed[split..], &mut out[w1..])
3465 .expect("trailer call must flush the buffered tail");
3466 assert_eq!(w1 + w2, payload.len(), "buffered tail was dropped");
3467 assert_eq!(&out[..w1 + w2], payload.as_slice());
3468 }
3469
3470 #[cfg(feature = "hash")]
3471 #[test]
3472 fn none_mode_skips_the_checksum_pass() {
3473 use crate::decoding::ContentChecksum;
3474 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3475 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3476 compressor.set_content_checksum(true);
3477 compressor.set_source(payload.as_slice());
3478 let mut compressed = Vec::new();
3479 compressor.set_drain(&mut compressed);
3480 compressor.compress();
3481
3482 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3483 let mut dec = FrameDecoder::new();
3484 dec.set_content_checksum(ContentChecksum::None);
3485 let mut out = alloc::vec![0u8; payload.len() + slack];
3486 let n = dec
3487 .decode_all(compressed.as_slice(), &mut out)
3488 .expect("None mode must still decode correctly");
3489 assert_eq!(&out[..n], payload.as_slice());
3490 // No digest is computed in None mode, even though the frame carries one.
3491 assert!(dec.get_checksum_from_data().is_some());
3492 assert!(dec.get_calculated_checksum().is_none());
3493 }
3494
3495 #[cfg(feature = "hash")]
3496 #[test]
3497 fn encoder_without_checksum_emits_no_trailing_digest() {
3498 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
3499
3500 let mut with = Vec::new();
3501 let mut c_with = FrameCompressor::new(CompressionLevel::Default);
3502 c_with.set_content_checksum(true);
3503 c_with.set_source(payload.as_slice());
3504 c_with.set_drain(&mut with);
3505 c_with.compress();
3506
3507 let mut without = Vec::new();
3508 let mut c_without = FrameCompressor::new(CompressionLevel::Default);
3509 c_without.set_content_checksum(false);
3510 c_without.set_source(payload.as_slice());
3511 c_without.set_drain(&mut without);
3512 c_without.compress();
3513
3514 // The checksum-off frame is exactly the 4-byte trailing digest shorter.
3515 assert_eq!(with.len(), without.len() + 4);
3516
3517 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3518 let mut dec = FrameDecoder::new();
3519 let mut out = alloc::vec![0u8; payload.len() + slack];
3520 let n = dec
3521 .decode_all(without.as_slice(), &mut out)
3522 .expect("a frame without a content checksum must decode");
3523 assert_eq!(&out[..n], payload.as_slice());
3524 assert!(
3525 dec.get_checksum_from_data().is_none(),
3526 "no trailing checksum should be reported"
3527 );
3528 }
3529
3530 #[test]
3531 fn decode_all_fcs_overflow_via_corrupt_frame_returns_structured_error() {
3532 // Hand-build a corrupt frame that declares
3533 // frame_content_size = 4 but the (last) block carries a
3534 // larger Raw payload. The pre-flight FCS check inside the
3535 // direct path's block loop catches this and returns the
3536 // structured FrameContentSizeMismatch variant — not a
3537 // panic, not a generic TargetTooSmall.
3538 //
3539 // Frame layout (single_segment, FCS=4):
3540 // magic 4 bytes 0xFD2FB528
3541 // FHD 1 byte single_segment=1, no checksum,
3542 // FCS field size = 0 (-> 1-byte FCS)
3543 // FCS 1 byte 0x04
3544 // block_header 3 bytes last=1, type=Raw, block_size=10
3545 // block_payload 10 bytes 0xAA repeated
3546 let mut frame = alloc::vec::Vec::new();
3547 // magic
3548 frame.extend_from_slice(&0xFD2FB528u32.to_le_bytes());
3549 // FHD: single_segment=1, fcs_flag=0 (1-byte FCS), no checksum,
3550 // no dict. Bit layout: FCS(7-6)=0, single_segment(5)=1,
3551 // reserved/uncs(4)=0, content_checksum(2)=0, dict(0-1)=00.
3552 frame.push(0b0010_0000);
3553 // FCS: 1 byte
3554 frame.push(4);
3555 // Block header: cBlockSize=10, type=Raw (0), last=1
3556 // 3-byte LE: bit0=last, bits1-2=type(2 bits), bits3-23=size
3557 let cblock_size: u32 = 10;
3558 let bh: u32 = 1 | (cblock_size << 3); // last=1, type=Raw=0
3559 frame.push((bh & 0xFF) as u8);
3560 frame.push((bh >> 8) as u8);
3561 frame.push((bh >> 16) as u8);
3562 // Payload — 10 bytes that, if decoded, would exceed FCS=4.
3563 frame.extend(core::iter::repeat_n(0xAAu8, 10));
3564
3565 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3566 let mut dec = FrameDecoder::new();
3567 let mut out = alloc::vec![0u8; 4 + slack];
3568 let err = dec
3569 .decode_all(&frame, &mut out)
3570 .expect_err("FCS-overflow frame must fail decode");
3571 assert!(
3572 matches!(
3573 err,
3574 super::FrameDecoderError::FrameContentSizeMismatch { .. }
3575 ),
3576 "expected FrameContentSizeMismatch, got {:?}",
3577 err
3578 );
3579 }
3580
3581 #[test]
3582 fn decode_all_compressed_block_fcs_overflow_returns_structured_error() {
3583 // Acceptance test for #246: a malformed frame whose *Compressed*
3584 // block expands past the declared `frame_content_size` must
3585 // surface `FrameContentSizeMismatch` from the direct-decode path
3586 // (UserSliceBackend sequence executor), NOT panic and NOT a
3587 // generic FailedToReadBlockBody. The Raw-block sibling above
3588 // covers the `BackendOverflow` arm; this covers the Compressed
3589 // sequence-executor overflow arm (`ExecuteSequencesError::
3590 // OutputBufferOverflow` folded into FrameContentSizeMismatch in
3591 // `run_direct_decode`).
3592 //
3593 // Construction: compress a compressible payload to get a genuine
3594 // Compressed block + a header-declared FCS, then surgically patch
3595 // the FCS field down to a tiny value. The block body still
3596 // decodes (literals/sequences are independent of FCS) and the
3597 // sequence executor overflows the small output slice.
3598 // Highly compressible payload (repeated phrase) → Compressed
3599 // block whose sequence executor produces ~4 KiB of output.
3600 let unit = b"The quick brown fox jumps over the lazy dog. ";
3601 let mut payload = Vec::with_capacity(4 * 1024);
3602 while payload.len() < 4 * 1024 {
3603 payload.extend_from_slice(unit);
3604 }
3605 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3606 compressor.set_source(payload.as_slice());
3607 let mut frame = Vec::new();
3608 compressor.set_drain(&mut frame);
3609 compressor.compress();
3610 // Sanity: the encoder actually compressed (=> a Compressed block,
3611 // not a raw-stored fallback) so we exercise the sequence path.
3612 assert!(frame.len() < payload.len());
3613
3614 // Locate the FCS field: it is the last `fcs_len` bytes of the
3615 // frame header, whose total size `header_size` includes the magic.
3616 // A ~4 KiB single-segment frame declares FCS = 4096, which lands in
3617 // the 2-byte field range [256, 65791] (RFC 8878 §3.1.1.1.4) — assert
3618 // that so the patch logic below stays a single deterministic branch.
3619 let (header, header_size) =
3620 super::super::frame::read_frame_header(frame.as_slice()).expect("valid header");
3621 let fcs_len = header
3622 .descriptor
3623 .frame_content_size_bytes()
3624 .expect("fcs present") as usize;
3625 assert_eq!(
3626 fcs_len, 2,
3627 "4 KiB single-segment frame must use a 2-byte FCS"
3628 );
3629 let fcs_off = header_size as usize - fcs_len;
3630
3631 // Patch the 2-byte FCS to its floor: stored bytes 0 decode to 256
3632 // (the field's `+256` bias), far below the 4 KiB the block actually
3633 // produces, so the sequence executor overflows the output slice.
3634 let patched_declared: u64 = 256;
3635 frame[fcs_off] = 0;
3636 frame[fcs_off + 1] = 0;
3637
3638 // Size the output to declared + WILDCOPY slack so the direct path
3639 // is eligible (output.len() >= content_size + slack) — the
3640 // overflow then comes from the frame, not an undersized buffer.
3641 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
3642 let mut out = alloc::vec![0u8; patched_declared as usize + slack];
3643 let mut dec = FrameDecoder::new();
3644 let err = dec
3645 .decode_all(frame.as_slice(), &mut out)
3646 .expect_err("Compressed block exceeding FCS must fail decode");
3647 match err {
3648 super::FrameDecoderError::FrameContentSizeMismatch { declared, produced } => {
3649 assert_eq!(declared, patched_declared, "declared echoes patched FCS");
3650 assert!(produced > declared, "produced must exceed declared");
3651 }
3652 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
3653 }
3654 }
3655
3656 /// Block-precise error positions (#174): a failing block header / body
3657 /// reports its 0-based index and frame-absolute offset, consistent with
3658 /// the encoder's `FrameEmitInfo.blocks[index].offset_in_frame`.
3659 #[cfg(feature = "lsm")]
3660 #[test]
3661 fn block_precise_errors_carry_index_and_offset() {
3662 use crate::encoding::{CompressionLevel, FrameCompressor};
3663 // ~1.3 MiB of incompressible (xorshift) bytes → many 128 KiB raw
3664 // blocks, so blocks 3 and 7 both exist and are not the last block.
3665 let mut data = alloc::vec::Vec::with_capacity(1_300_000);
3666 let mut s: u64 = 0x2545_F491_4F6C_DD1D;
3667 while data.len() < 1_300_000 {
3668 s ^= s << 13;
3669 s ^= s >> 7;
3670 s ^= s << 17;
3671 data.push((s >> 33) as u8);
3672 }
3673
3674 let mut frame = alloc::vec::Vec::new();
3675 let blocks = {
3676 let mut fc = FrameCompressor::new(CompressionLevel::Level(1));
3677 fc.set_source(data.as_slice());
3678 fc.set_drain(&mut frame);
3679 fc.compress();
3680 fc.last_frame_emit_info()
3681 .expect("emit info present under lsm")
3682 .blocks
3683 .clone()
3684 };
3685 assert!(blocks.len() > 7, "need >7 blocks, got {}", blocks.len());
3686
3687 let mut out = alloc::vec![0u8; data.len() + 4096];
3688
3689 // (1) Corrupt block 7's header: force its Block_Type to Reserved (3)
3690 // by setting both type bits — fails the header read at block 7.
3691 let off7 = blocks[7].offset_in_frame as usize;
3692 let mut corrupt = frame.clone();
3693 corrupt[off7] |= 0b0000_0110;
3694 let mut dec = FrameDecoder::new();
3695 let err = dec
3696 .decode_all(&corrupt, &mut out)
3697 .expect_err("reserved block-7 header must fail");
3698 match err {
3699 super::FrameDecoderError::FailedToReadBlockHeaderAt {
3700 block_index,
3701 frame_offset,
3702 ..
3703 } => {
3704 assert_eq!(block_index, 7);
3705 assert_eq!(frame_offset, blocks[7].offset_in_frame);
3706 }
3707 other => panic!("expected FailedToReadBlockHeaderAt, got {other:?}"),
3708 }
3709
3710 // (2) Truncate at block 3's body start: header intact, body missing
3711 // → the body decode fails at block 3 with its FrameBlock metadata.
3712 let body3 = blocks[3].offset_in_frame as usize + blocks[3].header_size as usize;
3713 let mut dec = FrameDecoder::new();
3714 let err = dec
3715 .decode_all(&frame[..body3], &mut out)
3716 .expect_err("truncated block-3 body must fail");
3717 match err {
3718 super::FrameDecoderError::FailedToReadBlockBodyAt {
3719 block_index,
3720 frame_offset,
3721 block,
3722 ..
3723 } => {
3724 assert_eq!(block_index, 3);
3725 assert_eq!(frame_offset, blocks[3].offset_in_frame);
3726 assert_eq!(block.offset_in_frame, blocks[3].offset_in_frame);
3727 }
3728 other => panic!("expected FailedToReadBlockBodyAt, got {other:?}"),
3729 }
3730 }
3731
3732 #[test]
3733 fn decode_all_exact_fit_output_decodes_correctly() {
3734 // Output sized exactly to frame_content_size (no
3735 // WILDCOPY_OVERLENGTH slack) is now eligible for the direct
3736 // path: every output-write site is exact-fit-safe (sequence
3737 // exec falls back to the bounded, non-overshooting copy on the
3738 // trailing sequence(s), Raw/RLE blocks copy exactly). This must
3739 // produce the same bytes as a slack-padded buffer. Exercised on
3740 // x86 through the per-kernel AVX2/SSE2 inline-exec macros, which
3741 // carry the same tight-tail branch.
3742 let payload: Vec<u8> = (0..2048u32)
3743 .map(|i| (i.wrapping_mul(31) & 0xFF) as u8)
3744 .collect();
3745 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3746 compressor.set_source(payload.as_slice());
3747 let mut compressed = Vec::new();
3748 compressor.set_drain(&mut compressed);
3749 compressor.compress();
3750
3751 let mut dec = FrameDecoder::new();
3752 // Exactly payload.len(), no slack.
3753 let mut out = alloc::vec![0u8; payload.len()];
3754 let n = dec
3755 .decode_all(compressed.as_slice(), &mut out)
3756 .expect("exact-fit decode_all should succeed");
3757 assert_eq!(n, payload.len());
3758 assert_eq!(&out[..n], payload.as_slice());
3759 }
3760
3761 #[test]
3762 fn decode_all_fallback_validates_fcs_against_total_output() {
3763 // Synthetic single-segment frame: FCS = 20 bytes, but the
3764 // last-block flag fires after only 4 bytes of raw payload.
3765 // On the direct path this would trip the post-block
3766 // `produced > content_size` check; the fallback path
3767 // (eligible=false because output is sized exactly to FCS,
3768 // no WILDCOPY slack) used to silently return Ok(4). With
3769 // the fix it now surfaces `FrameContentSizeMismatch`
3770 // matching the direct path.
3771 //
3772 // Frame layout: 4 B magic | 1 B FHD (single_segment=1,
3773 // FCS_flag=3 → 8-byte FCS) | 8 B FCS=20 | block header
3774 // (Raw, last, size=4) | 4 raw bytes.
3775 let mut wire = Vec::new();
3776 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes()); // magic
3777 // FHD: FCS_flag=3 (8-byte FCS) <<6 | single_segment=1 <<5.
3778 wire.push(0b1110_0000);
3779 wire.extend_from_slice(&20u64.to_le_bytes()); // declared FCS
3780 // Block header: (size << 3) | (block_type << 1) | last_block.
3781 // Raw block (block_type=0), last_block=1, size=4 → 0b00100001 = 0x21.
3782 wire.push(0x21);
3783 wire.push(0x00);
3784 wire.push(0x00);
3785 wire.extend_from_slice(&[1u8, 2, 3, 4]);
3786
3787 let mut dec = FrameDecoder::new();
3788 // Size output SMALLER than the declared FCS so direct-decode is
3789 // gated out (`output.len() >= content_size` is false) and the
3790 // frame takes the legacy fallback drain loop — the path this test
3791 // guards. The corrupt frame only produces 4 bytes, so 19 is ample
3792 // room; the point is `19 != declared FCS (20)`.
3793 const DECLARED_FCS: usize = 20;
3794 let mut out = alloc::vec![0u8; DECLARED_FCS - 1];
3795 assert_ne!(
3796 out.len(),
3797 DECLARED_FCS,
3798 "output must be smaller than FCS to exercise the fallback path",
3799 );
3800 let err = dec
3801 .decode_all(wire.as_slice(), &mut out)
3802 .expect_err("fallback must reject corrupt FCS underflow");
3803 match err {
3804 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
3805 declared,
3806 produced,
3807 } => {
3808 assert_eq!(declared, 20);
3809 assert_eq!(produced, 4);
3810 }
3811 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
3812 }
3813 }
3814
3815 #[test]
3816 fn decode_all_fallback_treats_explicit_fcs_zero_as_declared() {
3817 // Synthetic multi-segment frame with FCS_flag=2 (4-byte
3818 // FCS) explicitly set to 0. The header DECLARES zero
3819 // content, but the body carries a 5-byte raw last-block.
3820 // `fcs_declared()` must return true (the field is on the
3821 // wire) so the fallback's post-decode size check sees the
3822 // mismatch — even though `frame_content_size == 0`. This
3823 // is exactly the FCS=0 edge case where the previous
3824 // `content_size > 0` proxy would have silently accepted
3825 // the corrupt frame.
3826 //
3827 // Frame layout:
3828 // 4 B magic — 28 B5 2F FD
3829 // 1 B FHD — FCS_flag=2 (bits 7-6), no
3830 // single_segment, content_checksum=0,
3831 // dict_id_flag=0 → 0b1000_0000
3832 // 1 B window_descriptor — exp=10, mantissa=0 → window=1 MiB
3833 // 4 B FCS — 0 LE
3834 // 3 B block header — raw, last, size=5 → 0x29 0x00 0x00
3835 // 5 B raw payload — anything non-empty
3836 let mut wire = Vec::new();
3837 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
3838 wire.push(0b1000_0000); // FHD: FCS_flag=2, others 0.
3839 wire.push(0x50); // window_descriptor: exp=10, mantissa=0.
3840 wire.extend_from_slice(&0u32.to_le_bytes()); // FCS = 0.
3841 // Block header (24-bit LE): (size << 3) | (block_type << 1) | last_block
3842 // = (5 << 3) | (0 << 1) | 1 = 0x29.
3843 wire.push(0x29);
3844 wire.push(0x00);
3845 wire.push(0x00);
3846 wire.extend_from_slice(&[1u8, 2, 3, 4, 5]);
3847
3848 let mut dec = FrameDecoder::new();
3849 // FCS=0 declared, so eligibility (`content_size > 0`)
3850 // false — falls through to the drain loop. Output buffer
3851 // size doesn't matter for the eligibility check here;
3852 // give it some room so `read()` can drain the block.
3853 let mut out = alloc::vec![0u8; 16];
3854 let err = dec
3855 .decode_all(wire.as_slice(), &mut out)
3856 .expect_err("corrupt FCS=0 + 5-byte block must error");
3857 match err {
3858 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
3859 declared,
3860 produced,
3861 } => {
3862 assert_eq!(declared, 0);
3863 assert_eq!(produced, 5);
3864 }
3865 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
3866 }
3867 }
3868
3869 #[test]
3870 fn decode_all_fallback_accepts_honest_explicit_fcs_zero() {
3871 // Companion to the corrupt-FCS=0 test above: an HONEST
3872 // empty frame with FCS_flag=2 (4-byte FCS) explicitly set
3873 // to 0 AND a 0-byte raw last-block. `fcs_declared()`
3874 // returns true and `content_size == 0 == total_written`,
3875 // so the fallback validation accepts the frame instead of
3876 // misreporting a mismatch.
3877 //
3878 // (Single-segment FCS=0 would test a similar invariant
3879 // but trips header-stage validation: `window_size =
3880 // frame_content_size = 0 < MIN_WINDOW_SIZE` fails the
3881 // window-size sanity check before decode runs. Use the
3882 // multi-segment shape where `window_size` comes from
3883 // `window_descriptor` independently of FCS.)
3884 //
3885 // Frame layout:
3886 // 4 B magic
3887 // 1 B FHD — FCS_flag=2, others 0 → 0x80
3888 // 1 B window_descriptor — exp=10 → 1 MiB window
3889 // 4 B FCS — 0 LE
3890 // 3 B block header — raw, last, size=0 → 0x01 0x00 0x00
3891 let mut wire = Vec::new();
3892 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
3893 wire.push(0b1000_0000);
3894 wire.push(0x50);
3895 wire.extend_from_slice(&0u32.to_le_bytes());
3896 // Block header: (0 << 3) | (0 << 1) | 1 = 0x01.
3897 wire.push(0x01);
3898 wire.push(0x00);
3899 wire.push(0x00);
3900
3901 let mut dec = FrameDecoder::new();
3902 let mut out = alloc::vec![0u8; 16];
3903 let n = dec
3904 .decode_all(wire.as_slice(), &mut out)
3905 .expect("honest FCS=0 + empty block must succeed");
3906 assert_eq!(n, 0);
3907 }
3908
3909 #[test]
3910 fn reset_with_dict_handle_applies_dict_when_no_dict_id() {
3911 let payload = b"reset-without-dict-id";
3912 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3913 compressor.set_source(payload.as_slice());
3914 let mut compressed = Vec::new();
3915 compressor.set_drain(&mut compressed);
3916 compressor.compress();
3917
3918 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3919 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
3920
3921 let mut decoder = FrameDecoder::new();
3922 decoder
3923 .reset_with_dict_handle(compressed.as_slice(), &handle)
3924 .expect("reset should succeed");
3925 let state = decoder.state.as_ref().expect("state should be initialized");
3926 assert!(state.frame_header.dictionary_id().is_none());
3927 assert_eq!(state.using_dict, Some(handle.id()));
3928 }
3929
3930 #[test]
3931 fn reserve_buffer_reserves_the_shortfall_not_the_full_window_again() {
3932 // `Vec::reserve_exact` takes ADDITIONAL capacity. The decode_all
3933 // fallback loop re-enters decode_blocks once per strategy chunk,
3934 // and each entry pre-reserves the window: re-requesting the FULL
3935 // window on a buffer already holding ~window bytes of history
3936 // would grow it toward 2x window, defeating the peak-memory cap
3937 // the exact-growth policy exists for.
3938 use super::DecoderScratchKind;
3939 let window = 1usize << 20;
3940 let mut scratch = DecoderScratchKind::new_flat(window);
3941 scratch.reserve_buffer(window);
3942 let data = alloc::vec![0u8; window];
3943 match &mut scratch {
3944 super::DecoderScratchKind::Flat(s) => s.buffer.push(&data),
3945 super::DecoderScratchKind::Ring(_) => unreachable!("new_flat builds Flat"),
3946 }
3947 scratch.reserve_buffer(window);
3948 let workspace = scratch.workspace_bytes();
3949 assert!(
3950 workspace < window * 3 / 2,
3951 "second reserve_buffer grew a full window past the buffered \
3952 history: workspace {workspace} bytes vs window {window}"
3953 );
3954 }
3955
3956 #[test]
3957 fn dict_frame_decodes_through_direct_path() {
3958 // A dictionary frame decoded via `decode_all_with_dict_handle`
3959 // into a buffer sized exactly to FCS takes the direct path
3960 // (UserSliceBackend); matches reaching into the dictionary
3961 // content must resolve through `repeat_from_dict`. The payload
3962 // embeds dictionary content verbatim so the encoder emits
3963 // dict-region matches from the first bytes of the frame.
3964 let dict_raw = include_bytes!("../../dict_tests/dictionary");
3965 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
3966 let dict_tail: alloc::vec::Vec<u8> = handle
3967 .as_dict()
3968 .dict_content
3969 .iter()
3970 .rev()
3971 .take(2048)
3972 .rev()
3973 .copied()
3974 .collect();
3975 // No in-frame duplicate of the dictionary bytes: with a second
3976 // copy in the payload the encoder may emit the later copy as an
3977 // in-frame match, and the test would stay green even if the
3978 // direct path stopped forwarding the dictionary handle. A
3979 // single copy forces every dict-region match through
3980 // `repeat_from_dict`.
3981 let mut payload = dict_tail;
3982 payload.extend_from_slice(b"unique suffix after dictionary material 0123456789");
3983
3984 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
3985 compressor
3986 .set_dictionary_from_bytes(dict_raw)
3987 .expect("dict load");
3988 compressor.set_source(payload.as_slice());
3989 let mut compressed = Vec::new();
3990 compressor.set_drain(&mut compressed);
3991 compressor.compress();
3992
3993 // Fixture sanity: the frame must actually depend on the
3994 // dictionary, otherwise the decode below never exercises
3995 // dict-region match resolution.
3996 let mut plain = Vec::new();
3997 let mut no_dict = FrameCompressor::new(CompressionLevel::Default);
3998 no_dict.set_source(payload.as_slice());
3999 no_dict.set_drain(&mut plain);
4000 no_dict.compress();
4001 assert!(
4002 compressed.len() < plain.len(),
4003 "fixture must depend on the dictionary: dict {} bytes vs plain {} bytes",
4004 compressed.len(),
4005 plain.len()
4006 );
4007
4008 let mut decoder = FrameDecoder::new();
4009 let mut out = alloc::vec![0u8; payload.len()];
4010 let n = decoder
4011 .decode_all_with_dict_handle(compressed.as_slice(), &mut out, &handle)
4012 .expect("dict frame must decode on the direct path");
4013 assert_eq!(n, payload.len());
4014 assert_eq!(out, payload, "direct-path dict decode must be byte-exact");
4015 // Both paths are byte-identical, so pin the dispatch itself: a
4016 // re-introduced dict exclusion in the direct gate would silently
4017 // fall back to the buffered path and leave the asserts above green.
4018 assert_eq!(
4019 decoder.direct_frames, 1,
4020 "dict frame must take the direct path, not the buffered fallback"
4021 );
4022 }
4023
4024 #[test]
4025 fn implausible_content_size_skips_eager_alloc_direct_path() {
4026 // Adversarial frame: a 1 KiB window (small ring) but a declared
4027 // content size of 4 MiB, followed by a truncated raw block. The
4028 // direct path would `resize` the caller's Vec to the pledged 4 MiB
4029 // (allocating + zeroing it) BEFORE the truncated body is validated.
4030 // The gate must reject the implausible size (4 MiB cannot come from
4031 // 3 compressed bytes) and fall through to the window-bounded ring
4032 // drain, which errors without ever allocating the pledged size.
4033 //
4034 // Hand-built so the declared size is fully decoupled from the real
4035 // (tiny) input — the encoder always writes a truthful FCS.
4036 let frame: &[u8] = &[
4037 0x28, 0xB5, 0x2F, 0xFD, // magic
4038 0x80, // FHD: multi-segment, 4-byte FCS field, no dict
4039 0x00, // window descriptor -> 1 KiB window
4040 0x00, 0x00, 0x40, 0x00, // FCS = 4 MiB
4041 0x21, 0x03, 0x00, // raw block header: last, size 100, no body
4042 ];
4043
4044 let mut dec = FrameDecoder::new();
4045 let mut src = frame;
4046 dec.init(&mut src).expect("header must parse");
4047 // `src` now points past the header at the truncated 3-byte block.
4048 let mut out = Vec::new();
4049 let err = dec.decode_current_frame_to_vec(src, &mut out, None);
4050 assert!(
4051 err.is_err(),
4052 "truncated body must fail regardless of decode path"
4053 );
4054 assert_eq!(
4055 dec.direct_frames, 0,
4056 "implausible FCS must NOT take the eager-alloc direct path"
4057 );
4058 }
4059
4060 #[test]
4061 fn implausible_single_segment_fcs_rejected_before_window_reservation() {
4062 // Single-segment adversarial frame: the window equals the declared
4063 // content size (4 MiB) by definition, so the fallback ring drain would
4064 // pre-reserve that whole window via `useful_window_size()` before the
4065 // truncated body errors — the multi-segment gate test does not cover
4066 // this. The implausible size (4 MiB cannot come from 3 compressed
4067 // bytes) must be rejected up front with a content-size error, NOT a
4068 // block-body error after the reservation.
4069 let frame: &[u8] = &[
4070 0x28, 0xB5, 0x2F, 0xFD, // magic
4071 0xA0, // FHD: single-segment, 4-byte FCS field
4072 0x00, 0x00, 0x40, 0x00, // FCS = 4 MiB (== window for single-segment)
4073 0x21, 0x03, 0x00, // raw block header: last, size 100, no body
4074 ];
4075
4076 let mut dec = FrameDecoder::new();
4077 let mut src = frame;
4078 dec.init(&mut src).expect("header must parse");
4079 let mut out = Vec::new();
4080 let err = dec
4081 .decode_current_frame_to_vec(src, &mut out, None)
4082 .expect_err("implausible single-segment FCS must be rejected");
4083 match err {
4084 super::FrameDecoderError::FrameContentSizeMismatch { declared, .. } => {
4085 assert_eq!(declared, 4 * 1024 * 1024);
4086 }
4087 other => panic!(
4088 "expected early FrameContentSizeMismatch (no window reservation), got {other:?}"
4089 ),
4090 }
4091 assert_eq!(
4092 dec.direct_frames, 0,
4093 "implausible FCS must not take the eager-alloc direct path"
4094 );
4095 }
4096
4097 #[cfg(feature = "lsm")]
4098 mod expect_validation {
4099 use super::*;
4100 use crate::decoding::errors::FrameDecoderError;
4101
4102 fn compress(payload: &[u8]) -> Vec<u8> {
4103 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4104 compressor.set_source(payload);
4105 let mut compressed = Vec::new();
4106 compressor.set_drain(&mut compressed);
4107 compressor.compress();
4108 compressed
4109 }
4110
4111 fn compress_with_dict(payload: &[u8], dict_raw: &[u8]) -> Vec<u8> {
4112 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4113 compressor
4114 .set_dictionary_from_bytes(dict_raw)
4115 .expect("dict load");
4116 compressor.set_source(payload);
4117 let mut compressed = Vec::new();
4118 compressor.set_drain(&mut compressed);
4119 compressor.compress();
4120 compressed
4121 }
4122
4123 #[test]
4124 fn expect_dict_id_none_default_allows_anything() {
4125 let compressed = compress(b"hello-no-expect");
4126 let mut decoder = FrameDecoder::new();
4127 decoder
4128 .reset(compressed.as_slice())
4129 .expect("default None passes");
4130 }
4131
4132 #[test]
4133 fn expect_dict_id_zero_matches_frame_without_dict_id() {
4134 // Default-encoded frame has no dict_id; pinning Some(0)
4135 // ("no dictionary expected") must accept it.
4136 let compressed = compress(b"payload");
4137 let mut decoder = FrameDecoder::new();
4138 decoder.expect_dict_id(Some(0));
4139 decoder
4140 .reset(compressed.as_slice())
4141 .expect("Some(0) ~ None");
4142 }
4143
4144 #[test]
4145 fn expect_dict_id_matching_value_passes() {
4146 let dict_raw = include_bytes!("../../dict_tests/dictionary");
4147 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
4148 let actual_id = handle.id();
4149
4150 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
4151
4152 let mut decoder = FrameDecoder::new();
4153 decoder.expect_dict_id(Some(actual_id));
4154 // Decode requires the dict to be registered; using
4155 // reset_with_dict_handle for that.
4156 decoder
4157 .reset_with_dict_handle(compressed.as_slice(), &handle)
4158 .expect("matching dict_id passes");
4159 }
4160
4161 #[test]
4162 fn expect_dict_id_mismatching_value_fails_before_decode() {
4163 let dict_raw = include_bytes!("../../dict_tests/dictionary");
4164 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
4165 let actual_id = handle.id();
4166 let wrong_id = actual_id.wrapping_add(1);
4167
4168 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
4169
4170 let mut decoder = FrameDecoder::new();
4171 decoder.expect_dict_id(Some(wrong_id));
4172 let err = decoder
4173 .reset_with_dict_handle(compressed.as_slice(), &handle)
4174 .expect_err("mismatch must fail");
4175 match err {
4176 FrameDecoderError::UnexpectedDictId { expected, found } => {
4177 assert_eq!(expected, Some(wrong_id));
4178 assert_eq!(found, Some(actual_id));
4179 }
4180 other => panic!("expected UnexpectedDictId, got {other:?}"),
4181 }
4182 }
4183
4184 #[test]
4185 fn expect_dict_id_nonzero_fails_on_frame_without_dict_id() {
4186 // Frame has no dict_id; expecting Some(42) (non-zero)
4187 // must fail with found = None.
4188 let compressed = compress(b"no-dict-frame");
4189 let mut decoder = FrameDecoder::new();
4190 decoder.expect_dict_id(Some(42));
4191 let err = decoder
4192 .reset(compressed.as_slice())
4193 .expect_err("nonzero expectation on dictless frame must fail");
4194 match err {
4195 FrameDecoderError::UnexpectedDictId { expected, found } => {
4196 assert_eq!(expected, Some(42));
4197 assert_eq!(found, None);
4198 }
4199 other => panic!("expected UnexpectedDictId, got {other:?}"),
4200 }
4201 }
4202
4203 #[test]
4204 fn expect_window_descriptor_none_default_allows_anything() {
4205 let compressed = compress(b"hello-no-wd-expect");
4206 let mut decoder = FrameDecoder::new();
4207 decoder
4208 .reset(compressed.as_slice())
4209 .expect("default None passes");
4210 }
4211
4212 #[test]
4213 fn expect_window_descriptor_mismatch_fails_before_decode() {
4214 // Compress a payload large enough to force a
4215 // multi-segment frame (window_descriptor on wire).
4216 // Default compression at >256 KiB produces multi-
4217 // segment frames with a real window_descriptor byte.
4218 let payload = alloc::vec![0xABu8; 512 * 1024];
4219 let compressed = compress(&payload);
4220
4221 // Read the actual window_descriptor by decoding once
4222 // without expectations, then pin a wrong value.
4223 let mut probe_decoder = FrameDecoder::new();
4224 probe_decoder.reset(compressed.as_slice()).unwrap();
4225 let probe_state = probe_decoder.state.as_ref().unwrap();
4226 let actual_wd = probe_state
4227 .frame_header
4228 .window_descriptor()
4229 .expect("multi-segment frame should expose window_descriptor");
4230 let wrong_wd = actual_wd.wrapping_add(0x10); // bump exponent
4231
4232 let mut decoder = FrameDecoder::new();
4233 decoder.expect_window_descriptor(Some(wrong_wd));
4234 let err = decoder
4235 .reset(compressed.as_slice())
4236 .expect_err("wrong window_descriptor must fail");
4237 match err {
4238 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
4239 assert_eq!(expected, wrong_wd);
4240 assert_eq!(found, Some(actual_wd));
4241 }
4242 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
4243 }
4244 }
4245
4246 /// Build a minimal synthetic single-segment zstd frame
4247 /// carrying a 4-byte raw payload. RFC 8878 §3.1.1.1
4248 /// layout, hand-rolled because our default
4249 /// `FrameCompressor` settings don't emit
4250 /// `single_segment_flag` for tiny inputs.
4251 ///
4252 /// Wire bytes (13 total for 4-byte payload):
4253 /// ```text
4254 /// 28 B5 2F FD magic
4255 /// 20 FHD: single_segment=1, FCS_flag=0
4256 /// 04 FCS (single byte, value = payload.len())
4257 /// 21 00 00 block header: raw, last, size=4
4258 /// .. .. .. .. payload bytes
4259 /// ```
4260 fn synth_single_segment_frame(payload: &[u8]) -> Vec<u8> {
4261 assert!(payload.len() <= 255, "1-byte FCS field caps at 255");
4262 assert!(payload.len() < (1usize << 21), "block size 21-bit max");
4263 let mut out = Vec::new();
4264 // Magic 0xFD2FB528 LE.
4265 out.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
4266 // FHD: single_segment_flag (bit 5) set, everything
4267 // else zero. With single_segment + FCS_flag=0 the FCS
4268 // field is 1 byte. No window_descriptor on wire.
4269 out.push(0b0010_0000);
4270 // 1-byte FCS = payload length.
4271 out.push(payload.len() as u8);
4272 // Block header (3 bytes LE):
4273 // last_block=1, block_type=0 (Raw), block_size=payload.len().
4274 // Encoded: (size << 3) | (block_type << 1) | last_block.
4275 // Block header: last_block flag in bit 0, block_type
4276 // (0 = Raw) in bits 1-2, block size in bits 3+.
4277 let bh: u32 = ((payload.len() as u32) << 3) | 1;
4278 out.push((bh & 0xFF) as u8);
4279 out.push(((bh >> 8) & 0xFF) as u8);
4280 out.push(((bh >> 16) & 0xFF) as u8);
4281 // Raw payload.
4282 out.extend_from_slice(payload);
4283 out
4284 }
4285
4286 #[test]
4287 fn expect_window_descriptor_on_single_segment_frame_fails_with_found_none() {
4288 // Single-segment frames omit the window_descriptor
4289 // byte from the wire entirely. Setting an expectation
4290 // here must surface `found: None` so callers
4291 // distinguish "wrong descriptor" from "no descriptor
4292 // on the wire" — never silently pass.
4293 let compressed = synth_single_segment_frame(b"tiny");
4294
4295 // First sanity-check: the synthetic frame decodes
4296 // cleanly without any expectation.
4297 {
4298 let mut probe = FrameDecoder::new();
4299 probe
4300 .reset(compressed.as_slice())
4301 .expect("synth frame parses");
4302 let probe_state = probe.state.as_ref().unwrap();
4303 assert!(
4304 probe_state.frame_header.window_descriptor().is_none(),
4305 "synth frame must be single-segment"
4306 );
4307 }
4308
4309 let mut decoder = FrameDecoder::new();
4310 decoder.expect_window_descriptor(Some(0x40));
4311 let err = decoder
4312 .reset(compressed.as_slice())
4313 .expect_err("single-segment + expectation must fail");
4314 match err {
4315 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
4316 assert_eq!(expected, 0x40);
4317 assert_eq!(found, None);
4318 }
4319 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
4320 }
4321 }
4322
4323 #[test]
4324 fn validation_failure_leaves_decoder_re_resettable() {
4325 // After UnexpectedDictId on a wrong-expectation reset,
4326 // clearing the expectation and re-calling reset must
4327 // succeed on the same source — no lingering failed
4328 // state.
4329 let compressed = compress(b"re-resettable");
4330
4331 let mut decoder = FrameDecoder::new();
4332 decoder.expect_dict_id(Some(42));
4333 let err = decoder
4334 .reset(compressed.as_slice())
4335 .expect_err("first reset fails");
4336 assert!(matches!(err, FrameDecoderError::UnexpectedDictId { .. }));
4337
4338 // Clear expectation and retry on a fresh source.
4339 decoder.expect_dict_id(None);
4340 decoder
4341 .reset(compressed.as_slice())
4342 .expect("retry after clearing expectation should succeed");
4343 }
4344 }
4345
4346 /// Build a skippable frame on the wire: 4-byte LE magic + 4-byte LE
4347 /// length + payload bytes. RFC 8878 §3.1.2 restricts the magic
4348 /// variant to `0..=15`; assert here so accidental misuse of the
4349 /// helper can't smuggle a non-skippable magic past the tests.
4350 #[cfg(feature = "lsm")]
4351 fn build_skippable_frame(variant: u8, payload: &[u8]) -> Vec<u8> {
4352 assert!(
4353 variant <= 15,
4354 "skippable-frame variant {variant} outside RFC 8878 0..=15 range",
4355 );
4356 let mut out = Vec::with_capacity(8 + payload.len());
4357 let magic: u32 = 0x184D2A50 + u32::from(variant);
4358 out.extend_from_slice(&magic.to_le_bytes());
4359 out.extend_from_slice(&u32::try_from(payload.len()).unwrap().to_le_bytes());
4360 out.extend_from_slice(payload);
4361 out
4362 }
4363
4364 #[cfg(feature = "lsm")]
4365 #[test]
4366 fn decode_all_with_skippable_visitor_sees_payloads_in_order() {
4367 // Build a stream: skippable(v0, "alpha") + zstd_frame +
4368 // skippable(v3, "beta") + zstd_frame + skippable(v15, "")
4369 // and verify the visitor is invoked exactly three times with
4370 // the correct (variant, payload) pairs in stream order while
4371 // the zstd frames decode normally.
4372 let payload_a: Vec<u8> = (0..256u16).map(|i| i as u8).collect();
4373 let payload_b: Vec<u8> = (0..256u16).map(|i| (i ^ 0xAA) as u8).collect();
4374
4375 let mut comp_a = Vec::new();
4376 let mut c = FrameCompressor::new(CompressionLevel::Default);
4377 c.set_source(payload_a.as_slice());
4378 c.set_drain(&mut comp_a);
4379 c.compress();
4380
4381 let mut comp_b = Vec::new();
4382 let mut c = FrameCompressor::new(CompressionLevel::Default);
4383 c.set_source(payload_b.as_slice());
4384 c.set_drain(&mut comp_b);
4385 c.compress();
4386
4387 let skip0 = build_skippable_frame(0, b"alpha");
4388 let skip3 = build_skippable_frame(3, b"beta");
4389 let skip15 = build_skippable_frame(15, &[]);
4390
4391 let mut stream = Vec::new();
4392 stream.extend_from_slice(&skip0);
4393 stream.extend_from_slice(&comp_a);
4394 stream.extend_from_slice(&skip3);
4395 stream.extend_from_slice(&comp_b);
4396 stream.extend_from_slice(&skip15);
4397
4398 let mut decoder = FrameDecoder::new();
4399 let mut out = alloc::vec![0u8; payload_a.len() + payload_b.len()];
4400 let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
4401 let n = decoder
4402 .decode_all_with_skippable_visitor(stream.as_slice(), &mut out, |variant, payload| {
4403 collected.push((variant, payload.to_vec()));
4404 })
4405 .expect("decode_all_with_skippable_visitor should succeed");
4406
4407 // All three skippables visited in stream order.
4408 assert_eq!(collected.len(), 3);
4409 assert_eq!(collected[0], (0u8, b"alpha".to_vec()));
4410 assert_eq!(collected[1], (3u8, b"beta".to_vec()));
4411 assert_eq!(collected[2], (15u8, Vec::<u8>::new()));
4412
4413 // Both zstd frames decoded into `out` back-to-back.
4414 assert_eq!(n, payload_a.len() + payload_b.len());
4415 assert_eq!(&out[..payload_a.len()], payload_a.as_slice());
4416 assert_eq!(&out[payload_a.len()..n], payload_b.as_slice());
4417 }
4418
4419 #[cfg(feature = "lsm")]
4420 #[test]
4421 fn decode_all_silently_skips_when_no_visitor() {
4422 // Regression gate: plain decode_all must still silently skip
4423 // skippable frames (RFC 8878 mandated behavior) with no
4424 // behavioral change after the visitor refactor.
4425 let payload: Vec<u8> = (0..512u16).map(|i| i as u8).collect();
4426 let mut comp = Vec::new();
4427 let mut c = FrameCompressor::new(CompressionLevel::Default);
4428 c.set_source(payload.as_slice());
4429 c.set_drain(&mut comp);
4430 c.compress();
4431
4432 let skip = build_skippable_frame(7, b"ignored sidecar");
4433 let mut stream = Vec::new();
4434 stream.extend_from_slice(&skip);
4435 stream.extend_from_slice(&comp);
4436
4437 let mut decoder = FrameDecoder::new();
4438 let mut out = alloc::vec![0u8; payload.len()];
4439 let n = decoder
4440 .decode_all(stream.as_slice(), &mut out)
4441 .expect("decode_all should succeed on skippable + zstd stream");
4442 assert_eq!(n, payload.len());
4443 assert_eq!(&out[..n], payload.as_slice());
4444 }
4445
4446 #[cfg(feature = "lsm")]
4447 #[test]
4448 fn frame_emit_info_describes_emitted_block_layout() {
4449 // Encode a payload large enough to force >1 block, fetch
4450 // FrameEmitInfo, walk blocks[] and verify each block's
4451 // (offset_in_frame, header_size, body_size) matches the bytes
4452 // actually emitted into the drain buffer.
4453 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
4454 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4455 // Content checksum is opt-in (library default mirrors libzstd's
4456 // checksum-off); request it so the checksum_range assertion below
4457 // exercises the hash-gated trailer accounting.
4458 compressor.set_content_checksum(true);
4459 compressor.set_source(payload.as_slice());
4460 let mut compressed = Vec::new();
4461 compressor.set_drain(&mut compressed);
4462 compressor.compress();
4463
4464 let info = compressor
4465 .last_frame_emit_info()
4466 .expect("last_frame_emit_info populated after compress")
4467 .clone();
4468 drop(compressor);
4469
4470 // Frame header range starts at 0 and is non-empty.
4471 assert_eq!(info.frame_header_range.start, 0);
4472 assert!(info.frame_header_range.end > 0);
4473 // Total size matches what was written to the drain.
4474 assert_eq!(info.total_size as usize, compressed.len());
4475 // At least one block, and the last entry has last_block=true.
4476 assert!(!info.blocks.is_empty());
4477 assert!(info.blocks.last().unwrap().last_block);
4478 // All non-final blocks have last_block=false.
4479 for b in &info.blocks[..info.blocks.len() - 1] {
4480 assert!(!b.last_block);
4481 }
4482 // Walk and verify each block's header bytes match the
4483 // recorded type / size by re-decoding the 3-byte header.
4484 // Walking arithmetic: offset_in_frame + header_size + body_size
4485 // must land exactly on the next block's offset_in_frame (or,
4486 // for the last block, on the checksum / end of frame).
4487 for (i, b) in info.blocks.iter().enumerate() {
4488 let off = b.offset_in_frame as usize;
4489 assert_eq!(b.header_size, 3);
4490 let mut hdr = [0u8; 4];
4491 hdr[..3].copy_from_slice(&compressed[off..off + 3]);
4492 let raw = u32::from_le_bytes(hdr);
4493 let last = (raw & 1) != 0;
4494 let ty = (raw >> 1) & 0b11;
4495 let sz = raw >> 3;
4496 assert_eq!(last, b.last_block);
4497 assert_eq!(sz, b.block_size_field);
4498 // body_size is the PHYSICAL length on the wire: spec's
4499 // Block_Size for Raw/Compressed, always 1 for RLE.
4500 let expected_physical = match b.block_type {
4501 crate::encoding::frame_emit_info::BlockType::RLE => 1,
4502 _ => sz,
4503 };
4504 assert_eq!(b.body_size, expected_physical);
4505 let expected_ty = match b.block_type {
4506 crate::encoding::frame_emit_info::BlockType::Raw => 0,
4507 crate::encoding::frame_emit_info::BlockType::RLE => 1,
4508 crate::encoding::frame_emit_info::BlockType::Compressed => 2,
4509 crate::encoding::frame_emit_info::BlockType::Reserved => 3,
4510 };
4511 assert_eq!(ty, expected_ty);
4512 // Walking-arithmetic invariant.
4513 let next_off = b.offset_in_frame + b.header_size as u32 + b.body_size;
4514 if let Some(next) = info.blocks.get(i + 1) {
4515 assert_eq!(
4516 next_off, next.offset_in_frame,
4517 "block {i} body_size doesn't reach next block's offset_in_frame",
4518 );
4519 } else if let Some(cs) = info.checksum_range.as_ref() {
4520 assert_eq!(
4521 next_off, cs.start,
4522 "last block body_size doesn't reach checksum_range.start",
4523 );
4524 } else {
4525 assert_eq!(
4526 next_off, info.total_size,
4527 "last block body_size doesn't reach total_size",
4528 );
4529 }
4530 }
4531 // Checksum range present iff `feature = "hash"` is enabled.
4532 assert_eq!(info.checksum_range.is_some(), cfg!(feature = "hash"));
4533 }
4534
4535 #[cfg(all(feature = "lsm", feature = "hash"))]
4536 #[test]
4537 fn per_block_checksum_round_trip() {
4538 // Encode with per-block checksums enabled. Decode with
4539 // per-block verification. Both sides emit exactly 1
4540 // checksum per physical block written to / read from the
4541 // wire (encoder hashes per emission site, including each
4542 // post-split partition; decoder hashes each decoded block).
4543 // Cardinality and element-wise contents must match
4544 // round-trip.
4545 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
4546 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4547 compressor.set_source(payload.as_slice());
4548 compressor.enable_per_block_checksums();
4549 let mut compressed = Vec::new();
4550 compressor.set_drain(&mut compressed);
4551 compressor.compress();
4552
4553 let encoder_checksums = compressor
4554 .last_frame_block_checksums()
4555 .expect("checksums populated after enable + compress")
4556 .to_vec();
4557 drop(compressor);
4558 assert!(!encoder_checksums.is_empty());
4559
4560 // Decode side: enable verification, decode, compare.
4561 let mut decoder = FrameDecoder::new();
4562 decoder.enable_per_block_checksums();
4563 let mut output = alloc::vec![0u8; payload.len()];
4564 let n = decoder
4565 .decode_all(compressed.as_slice(), &mut output)
4566 .expect("decode_all should succeed");
4567 assert_eq!(n, payload.len());
4568 assert_eq!(&output[..n], payload.as_slice());
4569
4570 let decoder_checksums = decoder.computed_block_checksums();
4571 assert_eq!(decoder_checksums, encoder_checksums.as_slice());
4572 }
4573
4574 // ── decode_blocks_partial (block-subset partial decode, lsm) ──
4575
4576 /// Build a multi-block compressible frame and return
4577 /// `(compressed, full_decode, emit_info)`. The emit info's
4578 /// `decompressed_byte_range` maps decompressed offsets to block indices.
4579 #[cfg(feature = "lsm")]
4580 fn multi_block_fixture() -> (
4581 Vec<u8>,
4582 Vec<u8>,
4583 crate::encoding::frame_emit_info::FrameEmitInfo,
4584 ) {
4585 let mut data: Vec<u8> = Vec::with_capacity(400 * 1024);
4586 let mut x = 0x9E37_79B9u32;
4587 while data.len() < 400 * 1024 {
4588 x ^= x << 13;
4589 x ^= x >> 17;
4590 x ^= x << 5;
4591 let run = 16 + (x as usize % 48);
4592 let byte = (x >> 24) as u8;
4593 for _ in 0..run {
4594 data.push(byte);
4595 }
4596 data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4597 }
4598
4599 let mut compressed = Vec::new();
4600 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4601 compressor.set_source(data.as_slice());
4602 compressor.set_drain(&mut compressed);
4603 compressor.compress();
4604 let info = compressor
4605 .last_frame_emit_info()
4606 .expect("emit info populated")
4607 .clone();
4608 drop(compressor);
4609
4610 let mut dec = FrameDecoder::new();
4611 let mut full = alloc::vec![0u8; data.len()];
4612 let n = dec
4613 .decode_all(compressed.as_slice(), &mut full)
4614 .expect("full decode");
4615 full.truncate(n);
4616 assert_eq!(full, data, "fixture must round-trip");
4617 (compressed, full, info)
4618 }
4619
4620 #[cfg(feature = "lsm")]
4621 #[test]
4622 fn decode_blocks_partial_subset_matches_full_decode() {
4623 let (compressed, full, info) = multi_block_fixture();
4624 let nblocks = info.blocks.len() as u32;
4625 assert!(
4626 nblocks >= 4,
4627 "fixture must have several blocks, got {nblocks}"
4628 );
4629 let half = nblocks / 2;
4630 // Boundaries: 1 block, 2 blocks, half, all, and a non-zero start.
4631 // `(0, u32::MAX)` exercises the "decode to end of frame" sentinel,
4632 // a distinct public contract from an explicit upper bound.
4633 for &(s, e) in &[
4634 (0u32, u32::MAX),
4635 (0, 1),
4636 (0, 2),
4637 (0, half),
4638 (0, nblocks),
4639 (1, 2),
4640 (half, nblocks),
4641 ] {
4642 // The sentinel decodes through the last block; map it to nblocks
4643 // for the expected-slice / block-count arithmetic below.
4644 let effective_end = if e == u32::MAX { nblocks } else { e };
4645 let mut source = compressed.as_slice();
4646 let mut dec = FrameDecoder::new();
4647 dec.reset(&mut source).unwrap();
4648 let pd = dec
4649 .decode_blocks_partial(&mut source, s, e, None, false)
4650 .unwrap_or_else(|err| panic!("range [{s},{e}) errored: {err:?}"));
4651
4652 let start = info.decompressed_byte_range(s as usize).unwrap().start as usize;
4653 let end = info
4654 .decompressed_byte_range((effective_end - 1) as usize)
4655 .unwrap()
4656 .end as usize;
4657 assert_eq!(
4658 pd.data.as_slice(),
4659 &full[start..end],
4660 "subset bytes must equal the full-decode slice for [{s},{e})"
4661 );
4662 assert_eq!(pd.start_block, s);
4663 assert_eq!(pd.blocks_decoded, effective_end - s);
4664 assert!(pd.stopped_at.is_none(), "clean range [{s},{e})");
4665 }
4666 }
4667
4668 #[cfg(feature = "lsm")]
4669 #[test]
4670 fn decode_blocks_partial_recovers_clean_prefix_on_truncated_block() {
4671 let (compressed, full, info) = multi_block_fixture();
4672 let nblocks = info.blocks.len();
4673 let k = nblocks / 2;
4674 assert!(k >= 1, "need a clean prefix before the failing block");
4675
4676 // Truncate the source right after block k's 3-byte header, so its body
4677 // read fails regardless of block type (0 body bytes available).
4678 let cut = info.blocks[k].offset_in_frame as usize + info.blocks[k].header_size as usize;
4679 let truncated = &compressed[..cut];
4680
4681 let mut source = truncated;
4682 let mut dec = FrameDecoder::new();
4683 dec.reset(&mut source).unwrap();
4684 let pd = dec
4685 .decode_blocks_partial(&mut source, 0, u32::MAX, None, false)
4686 .unwrap();
4687
4688 let (idx, _err) = pd.stopped_at.expect("must stop on the truncated block");
4689 assert_eq!(idx, k as u32, "stopped at the truncated block index");
4690 assert_eq!(pd.blocks_decoded, k as u32, "blocks 0..k decoded cleanly");
4691 assert!(!pd.frame_finished);
4692 let clean_end = info.decompressed_byte_range(k).unwrap().start as usize;
4693 assert_eq!(
4694 pd.data.as_slice(),
4695 &full[..clean_end],
4696 "clean prefix preserved through the failure"
4697 );
4698 }
4699
4700 #[cfg(feature = "lsm")]
4701 #[test]
4702 fn decode_blocks_partial_invalid_range_errors() {
4703 let (compressed, _full, _info) = multi_block_fixture();
4704 let mut source = compressed.as_slice();
4705 let mut dec = FrameDecoder::new();
4706 dec.reset(&mut source).unwrap();
4707 let err = dec
4708 .decode_blocks_partial(&mut source, 5, 2, None, false)
4709 .expect_err("start > end must error");
4710 assert!(matches!(
4711 err,
4712 crate::decoding::errors::FrameDecoderError::InvalidBlockRange {
4713 start_block: 5,
4714 end_block: 2,
4715 }
4716 ));
4717 }
4718
4719 #[cfg(feature = "lsm")]
4720 #[test]
4721 fn decode_blocks_partial_skips_trailing_blocks() {
4722 let (compressed, full, info) = multi_block_fixture();
4723 assert!(info.blocks.len() >= 3);
4724 let mut source = compressed.as_slice();
4725 let mut dec = FrameDecoder::new();
4726 dec.reset(&mut source).unwrap();
4727 let pd = dec
4728 .decode_blocks_partial(&mut source, 0, 1, None, false)
4729 .unwrap();
4730
4731 assert_eq!(pd.blocks_decoded, 1);
4732 assert!(pd.stopped_at.is_none());
4733 assert!(!pd.frame_finished, "block 0 is not the last block");
4734 let end = info.decompressed_byte_range(0).unwrap().end as usize;
4735 assert_eq!(pd.data.as_slice(), &full[..end]);
4736 // The trailing blocks + checksum were never consumed from the source.
4737 assert!(
4738 dec.bytes_read_from_source() < u64::from(info.total_size),
4739 "only block 0's region should be consumed, read {} of {}",
4740 dec.bytes_read_from_source(),
4741 info.total_size
4742 );
4743 }
4744
4745 #[cfg(feature = "lsm")]
4746 #[test]
4747 fn lsm_style_range_query_partial_recovery() {
4748 // Simulates lsm-tree's range-query path: a key range resolves to a
4749 // decompressed byte window, which maps to inner zstd block indices via
4750 // `decompressed_byte_range`; decode only the covering blocks and check
4751 // the wanted window is recovered exactly (no key outside, all inside).
4752 let (compressed, full, info) = multi_block_fixture();
4753 let total = full.len() as u64;
4754 let want_start = total / 3;
4755 let want_end = (total * 2) / 3;
4756
4757 // Map [want_start, want_end) to covering block indices.
4758 let nblocks = info.blocks.len();
4759 let mut start_block = 0u32;
4760 let mut end_block = nblocks as u32;
4761 for i in 0..nblocks {
4762 let r = info.decompressed_byte_range(i).unwrap();
4763 if r.start <= want_start && want_start < r.end {
4764 start_block = i as u32;
4765 }
4766 if r.start < want_end && want_end <= r.end {
4767 end_block = i as u32 + 1;
4768 break;
4769 }
4770 }
4771
4772 let mut source = compressed.as_slice();
4773 let mut dec = FrameDecoder::new();
4774 dec.reset(&mut source).unwrap();
4775 let pd = dec
4776 .decode_blocks_partial(&mut source, start_block, end_block, None, false)
4777 .unwrap();
4778 assert!(pd.stopped_at.is_none());
4779
4780 let covered_start = info
4781 .decompressed_byte_range(start_block as usize)
4782 .unwrap()
4783 .start;
4784 let covered_end = info
4785 .decompressed_byte_range((end_block - 1) as usize)
4786 .unwrap()
4787 .end;
4788 assert!(
4789 covered_start <= want_start && want_end <= covered_end,
4790 "covering blocks must contain the wanted window"
4791 );
4792 assert_eq!(
4793 pd.data.as_slice(),
4794 &full[covered_start as usize..covered_end as usize],
4795 "covered subset must equal the full-decode slice"
4796 );
4797 // Slice the exact key range out of the covered subset.
4798 let off = (want_start - covered_start) as usize;
4799 let len = (want_end - want_start) as usize;
4800 assert_eq!(
4801 &pd.data[off..off + len],
4802 &full[want_start as usize..want_end as usize],
4803 "exact key range recovered from the partial decode"
4804 );
4805 }
4806
4807 #[cfg(feature = "lsm")]
4808 #[test]
4809 fn decode_blocks_partial_leaves_no_residual_when_no_in_range_block() {
4810 // Regression: when the requested range reaches no in-range block (here
4811 // start_block is past EOF, so every block is decoded only as window
4812 // context), `PartialDecode::data` is empty — but the context bytes must
4813 // NOT linger in the decoder buffer, or a later collect()/read() on the
4814 // same decoder returns out-of-range data.
4815 let (compressed, _full, info) = multi_block_fixture();
4816 let nblocks = info.blocks.len() as u32;
4817 let mut source = compressed.as_slice();
4818 let mut dec = FrameDecoder::new();
4819 dec.reset(&mut source).unwrap();
4820 let pd = dec
4821 .decode_blocks_partial(&mut source, nblocks + 5, u32::MAX, None, false)
4822 .unwrap();
4823 assert!(pd.data.is_empty(), "no in-range block → empty data");
4824 assert_eq!(pd.blocks_decoded, 0);
4825 assert!(
4826 pd.frame_finished,
4827 "frame's last block was reached as context"
4828 );
4829 assert_eq!(
4830 dec.can_collect(),
4831 0,
4832 "context bytes must not leak via collect()/read() when data is empty"
4833 );
4834 }
4835
4836 #[cfg(feature = "lsm")]
4837 #[test]
4838 fn decode_blocks_partial_empty_range_leaves_no_residual() {
4839 // Companion to the start-past-EOF case: an in-frame empty range `[k, k)`
4840 // (k < EOF) takes the same `prefix_window_len == None` path but with
4841 // `frame_finished == false` and up to `window_size` context bytes still
4842 // physically present. Assert the buffer is fully cleared directly (a
4843 // `can_collect()` check alone would pass even with <= window_size bytes
4844 // retained, because it holds the window back).
4845 let (compressed, _full, info) = multi_block_fixture();
4846 let k = ((info.blocks.len() as u32) / 2).max(1);
4847 let mut source = compressed.as_slice();
4848 let mut dec = FrameDecoder::new();
4849 dec.reset(&mut source).unwrap();
4850 let pd = dec
4851 .decode_blocks_partial(&mut source, k, k, None, false)
4852 .unwrap();
4853
4854 assert!(pd.data.is_empty(), "empty range must yield empty data");
4855 assert_eq!(pd.blocks_decoded, 0);
4856 assert!(
4857 !pd.frame_finished,
4858 "frame should still have trailing blocks"
4859 );
4860 assert_eq!(
4861 dec.state.as_ref().unwrap().decoder_scratch.buffer_len(),
4862 0,
4863 "empty-range partial decode must not retain context bytes"
4864 );
4865 }
4866
4867 #[cfg(all(feature = "lsm", feature = "hash"))]
4868 #[test]
4869 fn decode_blocks_partial_captures_per_block_checksums() {
4870 // Regression: with per-block checksums enabled, decode_blocks_partial
4871 // must populate computed_block_checksums just like decode_blocks /
4872 // decode_all — otherwise callers verifying per-block digests silently
4873 // lose them on the partial path.
4874 let (compressed, full, _info) = multi_block_fixture();
4875
4876 // Reference digests via decode_blocks (the path that captures them).
4877 let mut ref_dec = FrameDecoder::new();
4878 ref_dec.enable_per_block_checksums();
4879 let mut rsrc = compressed.as_slice();
4880 ref_dec.reset(&mut rsrc).unwrap();
4881 while !ref_dec.is_finished() {
4882 ref_dec
4883 .decode_blocks(&mut rsrc, crate::decoding::BlockDecodingStrategy::All)
4884 .unwrap();
4885 }
4886 let expected = ref_dec.computed_block_checksums().to_vec();
4887 assert!(!expected.is_empty(), "fixture must have multiple blocks");
4888 let _ = full;
4889
4890 // Partial decode of the whole frame must capture the same digests.
4891 let mut source = compressed.as_slice();
4892 let mut dec = FrameDecoder::new();
4893 dec.enable_per_block_checksums();
4894 dec.reset(&mut source).unwrap();
4895 let _ = dec
4896 .decode_blocks_partial(&mut source, 0, u32::MAX, None, false)
4897 .unwrap();
4898 assert_eq!(
4899 dec.computed_block_checksums(),
4900 expected.as_slice(),
4901 "partial decode must capture the same per-block checksums as full decode"
4902 );
4903 }
4904
4905 // ── resume (window-priming + entropy cold resume, lsm) ───────────
4906
4907 /// Window size of `compressed`'s frame, read from a freshly-reset decoder.
4908 #[cfg(feature = "lsm")]
4909 fn frame_window_size(compressed: &[u8]) -> usize {
4910 let mut src = compressed;
4911 let mut dec = FrameDecoder::new();
4912 dec.reset(&mut src).unwrap();
4913 dec.state
4914 .as_ref()
4915 .unwrap()
4916 .frame_header
4917 .window_size()
4918 .unwrap_or(0) as usize
4919 }
4920
4921 /// Build a large compressible MULTI-SEGMENT frame (window_size < content,
4922 /// so mid-frame blocks reach back only into a bounded window) and return
4923 /// `(compressed, full_decode, emit_info)`.
4924 #[cfg(feature = "lsm")]
4925 fn multi_segment_block_fixture() -> (
4926 Vec<u8>,
4927 Vec<u8>,
4928 crate::encoding::frame_emit_info::FrameEmitInfo,
4929 ) {
4930 // ~3 MiB of compressible (runs + repeated phrase) data — large enough
4931 // that the encoder picks window_size < content_size (multi-segment).
4932 let mut data: Vec<u8> = Vec::with_capacity(3 * 1024 * 1024);
4933 let mut x = 0x9E37_79B9u32;
4934 while data.len() < 3 * 1024 * 1024 {
4935 x ^= x << 13;
4936 x ^= x >> 17;
4937 x ^= x << 5;
4938 let run = 16 + (x as usize % 48);
4939 let byte = (x >> 24) as u8;
4940 for _ in 0..run {
4941 data.push(byte);
4942 }
4943 data.extend_from_slice(b"the quick brown fox jumps over the lazy dog\n");
4944 }
4945
4946 let mut compressed = Vec::new();
4947 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
4948 compressor.set_source(data.as_slice());
4949 compressor.set_drain(&mut compressed);
4950 compressor.compress();
4951 let info = compressor
4952 .last_frame_emit_info()
4953 .expect("emit info populated")
4954 .clone();
4955 drop(compressor);
4956
4957 // Confirm the precondition: the frame must be multi-segment.
4958 let mut sanity = FrameDecoder::new();
4959 sanity.init(&mut compressed.as_slice()).unwrap();
4960 assert!(
4961 !sanity
4962 .state
4963 .as_ref()
4964 .unwrap()
4965 .frame_header
4966 .descriptor
4967 .single_segment_flag(),
4968 "fixture precondition: frame must be multi-segment (resize if encoder default changed)"
4969 );
4970
4971 let mut dec = FrameDecoder::new();
4972 let mut full = alloc::vec![0u8; data.len()];
4973 let n = dec
4974 .decode_all(compressed.as_slice(), &mut full)
4975 .expect("full decode");
4976 full.truncate(n);
4977 assert_eq!(full, data, "fixture must round-trip");
4978 (compressed, full, info)
4979 }
4980
4981 /// Emit a [`ResumeState`] for resuming at block `n` by decoding `[0, n)` on
4982 /// a throwaway decoder with `emit_resume = true`.
4983 #[cfg(feature = "lsm")]
4984 fn emit_resume_state_at(compressed: &[u8], n: u32) -> super::ResumeState {
4985 let mut src = compressed;
4986 let mut dec = FrameDecoder::new();
4987 dec.reset(&mut src).unwrap();
4988 let pd = dec
4989 .decode_blocks_partial(&mut src, 0, n, None, true)
4990 .expect("prefix decode for resume-state emission");
4991 pd.resume_state
4992 .expect("emit_resume should populate resume_state")
4993 }
4994
4995 #[cfg(feature = "lsm")]
4996 #[test]
4997 fn resume_matches_full_decode_at_first_mid_last() {
4998 // Acceptance criterion: after resuming at block N (cold decoder, primed
4999 // window + restored entropy), decode_blocks_partial yields bytes
5000 // byte-identical to a full decode's [ends[N-1]..ends[end-1]) slice, for
5001 // N in {1, mid, last}. Repeat_Mode entropy blocks are covered because
5002 // the emitted ResumeState carries the carry-over tables.
5003 let (compressed, full, info) = multi_block_fixture();
5004 let nblocks = info.blocks.len() as u32;
5005 assert!(nblocks >= 4, "need several blocks, got {nblocks}");
5006
5007 for &n in &[1u32, nblocks / 2, nblocks - 1] {
5008 // Producer: emit resume state for block n (separate decoder).
5009 let st = emit_resume_state_at(&compressed, n);
5010 assert_eq!(st.block_index(), n);
5011 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start;
5012 assert_eq!(st.output_offset(), output_offset);
5013
5014 // Consumer: a FRESH (cold) decoder resumes at n. Pass the WHOLE
5015 // decompressed prefix as window_prime; it is capped to one window
5016 // internally, exercising the cap path.
5017 let window_prime = &full[..output_offset as usize];
5018 let mut header_src = compressed.as_slice();
5019 let mut dec = FrameDecoder::new();
5020 dec.reset(&mut header_src).unwrap();
5021 // Caller positions the source at block n's compressed frame offset.
5022 let off = info.blocks[n as usize].offset_in_frame as usize;
5023 let mut block_src = &compressed[off..];
5024 let pd = dec
5025 .decode_blocks_partial(
5026 &mut block_src,
5027 n,
5028 u32::MAX,
5029 Some(super::ResumeInput {
5030 window_prime,
5031 state: &st,
5032 }),
5033 false,
5034 )
5035 .unwrap_or_else(|e| panic!("resume decode at N={n} errored: {e:?}"));
5036
5037 let start = output_offset as usize;
5038 let end = info
5039 .decompressed_byte_range((nblocks - 1) as usize)
5040 .unwrap()
5041 .end as usize;
5042 assert_eq!(
5043 pd.data.as_slice(),
5044 &full[start..end],
5045 "resumed bytes must equal the full-decode slice for N={n}"
5046 );
5047 assert_eq!(pd.start_block, n);
5048 assert_eq!(pd.blocks_decoded, nblocks - n);
5049 assert!(pd.stopped_at.is_none(), "clean resume at N={n}");
5050 assert!(pd.frame_finished, "decoded through the last block");
5051 }
5052 }
5053
5054 #[cfg(feature = "lsm")]
5055 #[test]
5056 fn resume_with_exact_window_tail_matches_full_decode() {
5057 // Realistic cold-resume shape on a MULTI-SEGMENT frame: caller supplies
5058 // only the last `window_size` decompressed bytes (not the whole prefix),
5059 // which is all that can ever back a match.
5060 let (compressed, full, info) = multi_segment_block_fixture();
5061 let nblocks = info.blocks.len() as u32;
5062 let window_size = frame_window_size(&compressed);
5063 // First block whose preceding output exceeds one window, so the tail
5064 // genuinely truncates the prefix.
5065 let n = (1..nblocks)
5066 .find(|&i| {
5067 info.decompressed_byte_range(i as usize).unwrap().start as usize > window_size
5068 })
5069 .expect("multi-segment frame must have a block past one window");
5070 let st = emit_resume_state_at(&compressed, n);
5071 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start;
5072 assert!(output_offset as usize > window_size);
5073 let tail_start = output_offset as usize - window_size;
5074 let window_prime = &full[tail_start..output_offset as usize];
5075
5076 let mut header_src = compressed.as_slice();
5077 let mut dec = FrameDecoder::new();
5078 dec.reset(&mut header_src).unwrap();
5079 let off = info.blocks[n as usize].offset_in_frame as usize;
5080 let mut block_src = &compressed[off..];
5081 let pd = dec
5082 .decode_blocks_partial(
5083 &mut block_src,
5084 n,
5085 u32::MAX,
5086 Some(super::ResumeInput {
5087 window_prime,
5088 state: &st,
5089 }),
5090 false,
5091 )
5092 .unwrap();
5093
5094 let end = info
5095 .decompressed_byte_range((nblocks - 1) as usize)
5096 .unwrap()
5097 .end as usize;
5098 assert_eq!(pd.data.as_slice(), &full[output_offset as usize..end]);
5099 assert_eq!(pd.blocks_decoded, nblocks - n);
5100 }
5101
5102 #[cfg(feature = "lsm")]
5103 #[test]
5104 fn resume_rejects_short_window_prime() {
5105 // Acceptance criterion: a window_prime shorter than the required window
5106 // is rejected with a typed error, not a silent mis-decode.
5107 let (compressed, full, info) = multi_block_fixture();
5108 let nblocks = info.blocks.len() as u32;
5109 let window_size = frame_window_size(&compressed);
5110 let n = nblocks / 2;
5111 let st = emit_resume_state_at(&compressed, n);
5112 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start;
5113 let required = core::cmp::min(window_size as u64, output_offset) as usize;
5114 assert!(required > 0, "mid block must require a non-empty window");
5115
5116 // One byte short of the required window.
5117 let prime = &full[output_offset as usize - (required - 1)..output_offset as usize];
5118
5119 let mut header_src = compressed.as_slice();
5120 let mut dec = FrameDecoder::new();
5121 dec.reset(&mut header_src).unwrap();
5122 let off = info.blocks[n as usize].offset_in_frame as usize;
5123 let mut block_src = &compressed[off..];
5124 let err = dec
5125 .decode_blocks_partial(
5126 &mut block_src,
5127 n,
5128 u32::MAX,
5129 Some(super::ResumeInput {
5130 window_prime: prime,
5131 state: &st,
5132 }),
5133 false,
5134 )
5135 .expect_err("short window_prime must be rejected");
5136 match err {
5137 crate::decoding::errors::FrameDecoderError::ResumeWindowTooShort { got, need } => {
5138 assert_eq!(got, required - 1);
5139 assert_eq!(need, required);
5140 }
5141 other => panic!("expected ResumeWindowTooShort, got {other:?}"),
5142 }
5143 }
5144
5145 #[cfg(feature = "lsm")]
5146 #[test]
5147 fn resume_range_validates_against_effective_start_not_start_block() {
5148 // In resume mode `start_block` is ignored and decoding begins at
5149 // `state.block_index()`. The range guard must therefore validate the
5150 // EFFECTIVE start against `end_block`: `end_block` below the resume
5151 // block is an inverted range and must error, not silently return an
5152 // empty decode. Caller passes the conventional ignored `start_block = 0`.
5153 let (compressed, _full, info) = multi_block_fixture();
5154 let nblocks = info.blocks.len() as u32;
5155 let n = (nblocks / 2).max(2);
5156 let st = emit_resume_state_at(&compressed, n);
5157 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start;
5158
5159 let mut header_src = compressed.as_slice();
5160 let mut dec = FrameDecoder::new();
5161 dec.reset(&mut header_src).unwrap();
5162 let off = info.blocks[n as usize].offset_in_frame as usize;
5163 let mut block_src = &compressed[off..];
5164 // end_block = n - 1 is below the resume block n → inverted range.
5165 let err = dec
5166 .decode_blocks_partial(
5167 &mut block_src,
5168 0,
5169 n - 1,
5170 Some(super::ResumeInput {
5171 window_prime: &_full[..output_offset as usize],
5172 state: &st,
5173 }),
5174 false,
5175 )
5176 .expect_err("end_block below the resume block must be an inverted range");
5177 match err {
5178 crate::decoding::errors::FrameDecoderError::InvalidBlockRange {
5179 start_block,
5180 end_block,
5181 } => {
5182 assert_eq!(start_block, n, "error must report the effective start");
5183 assert_eq!(end_block, n - 1);
5184 }
5185 other => panic!("expected InvalidBlockRange, got {other:?}"),
5186 }
5187 }
5188
5189 #[cfg(feature = "lsm")]
5190 #[test]
5191 fn resume_rejects_state_from_a_different_frame() {
5192 // A ResumeState captured from one frame must not be applied to a frame
5193 // with a different decode shape (window size / single-segment / dict):
5194 // restoring foreign entropy tables would yield byte-wrong output. The
5195 // frame-identity guard must reject it up front with a typed error.
5196 let (frame_a, _full_a, info_a) = multi_block_fixture();
5197 let (frame_b, full_b, _info_b) = multi_segment_block_fixture();
5198 // Sanity: the two fixtures must differ in decode shape for the guard to
5199 // be exercised (single-segment vs multi-segment here).
5200 let st = emit_resume_state_at(&frame_a, (info_a.blocks.len() as u32 / 2).max(1));
5201
5202 let mut header_src = frame_b.as_slice();
5203 let mut dec = FrameDecoder::new();
5204 dec.reset(&mut header_src).unwrap();
5205 // The frame-key check runs before the window-length check, so even a
5206 // valid-length window_prime for frame B is rejected on identity.
5207 let err = dec
5208 .decode_blocks_partial(
5209 &mut frame_b.as_slice(),
5210 st.block_index(),
5211 u32::MAX,
5212 Some(super::ResumeInput {
5213 window_prime: &full_b,
5214 state: &st,
5215 }),
5216 false,
5217 )
5218 .expect_err("resume state from a different frame must be rejected");
5219 assert!(
5220 matches!(
5221 err,
5222 crate::decoding::errors::FrameDecoderError::ResumeFrameMismatch
5223 ),
5224 "expected ResumeFrameMismatch, got {err:?}"
5225 );
5226 }
5227
5228 #[cfg(all(feature = "lsm", feature = "hash"))]
5229 #[test]
5230 fn resume_rejects_wrong_window_prime_content() {
5231 // Same frame (FrameKey matches) but the caller supplies a window_prime
5232 // with one byte flipped. The shape key cannot catch this; the
5233 // content-exact XXH64 of the window must, rejecting before any restore
5234 // rather than mis-resolving matches against corrupted history.
5235 let (compressed, full, info) = multi_block_fixture();
5236 let nblocks = info.blocks.len() as u32;
5237 let n = (nblocks / 2).max(1);
5238 let st = emit_resume_state_at(&compressed, n);
5239 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start as usize;
5240 assert!(output_offset > 0);
5241
5242 // Correct prefix with the last byte corrupted (this byte is inside the
5243 // window the resume block reaches back into).
5244 let mut corrupted = full[..output_offset].to_vec();
5245 let last = corrupted.len() - 1;
5246 corrupted[last] ^= 0xFF;
5247
5248 let mut header_src = compressed.as_slice();
5249 let mut dec = FrameDecoder::new();
5250 dec.reset(&mut header_src).unwrap();
5251 let off = info.blocks[n as usize].offset_in_frame as usize;
5252 let mut block_src = &compressed[off..];
5253 let err = dec
5254 .decode_blocks_partial(
5255 &mut block_src,
5256 n,
5257 u32::MAX,
5258 Some(super::ResumeInput {
5259 window_prime: &corrupted,
5260 state: &st,
5261 }),
5262 false,
5263 )
5264 .expect_err("corrupted window_prime must be rejected by content hash");
5265 assert!(
5266 matches!(
5267 err,
5268 crate::decoding::errors::FrameDecoderError::ResumeFrameMismatch
5269 ),
5270 "expected ResumeFrameMismatch, got {err:?}"
5271 );
5272 }
5273
5274 #[cfg(feature = "lsm")]
5275 #[test]
5276 fn resume_rejects_state_with_different_active_dictionary() {
5277 // A dictless-header frame can be decoded with an explicit dictionary
5278 // applied at runtime (force_dict / reset_with_dict_handle). Two such
5279 // decodes differ in entropy/repcode/dict context even though the header
5280 // dictionary_id is identically absent, so the resume guard must key on
5281 // the ACTIVE dictionary, not just the header field. Here the snapshot is
5282 // captured with no active dictionary; resuming with one applied must be
5283 // rejected before any state is restored.
5284 let (compressed, full, info) = multi_block_fixture();
5285 let nblocks = info.blocks.len() as u32;
5286 let n = (nblocks / 2).max(1);
5287 let st = emit_resume_state_at(&compressed, n); // active_dictionary_id = None
5288 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start as usize;
5289
5290 let raw = std::fs::read("./dict_tests/dictionary").expect("dictionary fixture");
5291 let dict = crate::decoding::dictionary::Dictionary::decode_dict(&raw).expect("parse dict");
5292 let dict_id = dict.id;
5293
5294 let mut header_src = compressed.as_slice();
5295 let mut dec = FrameDecoder::new();
5296 dec.add_dict(dict).unwrap();
5297 dec.reset(&mut header_src).unwrap();
5298 dec.force_dict(dict_id).unwrap(); // active_dictionary_id = Some(dict_id)
5299 let off = info.blocks[n as usize].offset_in_frame as usize;
5300 let mut block_src = &compressed[off..];
5301 let err = dec
5302 .decode_blocks_partial(
5303 &mut block_src,
5304 n,
5305 u32::MAX,
5306 Some(super::ResumeInput {
5307 window_prime: &full[..output_offset],
5308 state: &st,
5309 }),
5310 false,
5311 )
5312 .expect_err("resume with a different active dictionary must be rejected");
5313 assert!(
5314 matches!(
5315 err,
5316 crate::decoding::errors::FrameDecoderError::ResumeFrameMismatch
5317 ),
5318 "expected ResumeFrameMismatch, got {err:?}"
5319 );
5320 }
5321
5322 #[cfg(feature = "lsm")]
5323 #[test]
5324 fn resume_invalid_range_does_not_mutate_decoder_state() {
5325 // An inverted effective range must be rejected WITHOUT priming the
5326 // decoder: no entropy restore, no window prime, no cursor advance. As
5327 // written before the fix, those mutations ran before the range check,
5328 // leaving the decoder in a synthetic resumed state on the error path.
5329 let (compressed, full, info) = multi_block_fixture();
5330 let nblocks = info.blocks.len() as u32;
5331 let n = (nblocks / 2).max(2);
5332 let st = emit_resume_state_at(&compressed, n);
5333 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start as usize;
5334
5335 let mut header_src = compressed.as_slice();
5336 let mut dec = FrameDecoder::new();
5337 dec.reset(&mut header_src).unwrap();
5338 // Freshly reset: cursor at block 0.
5339 assert_eq!(dec.state.as_ref().unwrap().block_counter, 0);
5340
5341 let off = info.blocks[n as usize].offset_in_frame as usize;
5342 let mut block_src = &compressed[off..];
5343 let err = dec
5344 .decode_blocks_partial(
5345 &mut block_src,
5346 0,
5347 n - 1, // below the resume block → inverted range
5348 Some(super::ResumeInput {
5349 window_prime: &full[..output_offset],
5350 state: &st,
5351 }),
5352 false,
5353 )
5354 .expect_err("inverted range must error");
5355 assert!(matches!(
5356 err,
5357 crate::decoding::errors::FrameDecoderError::InvalidBlockRange { .. }
5358 ));
5359 assert_eq!(
5360 dec.state.as_ref().unwrap().block_counter,
5361 0,
5362 "error path must not advance the cursor (validate before priming)"
5363 );
5364 }
5365
5366 #[cfg(feature = "lsm")]
5367 #[test]
5368 fn emit_resume_state_absent_on_terminal_block() {
5369 // When a decode reaches the frame's last block there is no "next block"
5370 // to resume at: the snapshot's block_index would be one past EOF and the
5371 // caller has no offset_in_frame for it. emit_resume must therefore yield
5372 // None on the terminal block, not a dangling snapshot.
5373 let (compressed, _full, info) = multi_block_fixture();
5374 let nblocks = info.blocks.len() as u32;
5375 let mut src = compressed.as_slice();
5376 let mut dec = FrameDecoder::new();
5377 dec.reset(&mut src).unwrap();
5378 let pd = dec
5379 .decode_blocks_partial(&mut src, 0, nblocks, None, true)
5380 .unwrap();
5381 assert!(pd.frame_finished, "decode must reach the last block");
5382 assert!(
5383 pd.resume_state.is_none(),
5384 "no resume state past the frame's last block"
5385 );
5386 }
5387
5388 #[cfg(feature = "lsm")]
5389 #[test]
5390 fn emit_resume_state_absent_when_not_requested() {
5391 // Default partial decode (emit_resume = false) must NOT pay the entropy
5392 // clone: resume_state stays None.
5393 let (compressed, _full, info) = multi_block_fixture();
5394 let nblocks = info.blocks.len() as u32;
5395 let mut src = compressed.as_slice();
5396 let mut dec = FrameDecoder::new();
5397 dec.reset(&mut src).unwrap();
5398 let pd = dec
5399 .decode_blocks_partial(&mut src, 0, nblocks, None, false)
5400 .unwrap();
5401 assert!(
5402 pd.resume_state.is_none(),
5403 "resume_state must be None unless emit_resume is set"
5404 );
5405 }
5406
5407 #[cfg(feature = "lsm")]
5408 #[test]
5409 fn resume_grow_loop_reconstructs_full() {
5410 // The motivating scenario: a symmetric one-call grow-loop. Each call
5411 // takes the previous ResumeState and emits the next, decoding only the
5412 // new extent — concatenated, the extents reconstruct the full output
5413 // with no prefix ever re-decompressed.
5414 let (compressed, full, info) = multi_block_fixture();
5415 let nblocks = info.blocks.len() as u32;
5416 assert!(nblocks >= 4);
5417
5418 // Walk the frame in extents of `step` blocks each.
5419 let step = (nblocks / 3).max(1);
5420 let mut combined: Vec<u8> = Vec::new();
5421 let mut next: u32 = 0;
5422 let mut carry: Option<super::ResumeState> = None;
5423
5424 while next < nblocks {
5425 let end = (next + step).min(nblocks);
5426 let mut dec = FrameDecoder::new();
5427 let mut header_src = compressed.as_slice();
5428 dec.reset(&mut header_src).unwrap();
5429
5430 let off = info.blocks[next as usize].offset_in_frame as usize;
5431 let mut block_src = &compressed[off..];
5432
5433 let output_offset = info.decompressed_byte_range(next as usize).unwrap().start;
5434 let pd = if let Some(st) = carry.as_ref() {
5435 // Resume from the prior extent's state (cold: fresh decoder).
5436 let window_prime = &full[..output_offset as usize];
5437 dec.decode_blocks_partial(
5438 &mut block_src,
5439 next,
5440 end,
5441 Some(super::ResumeInput {
5442 window_prime,
5443 state: st,
5444 }),
5445 true,
5446 )
5447 .unwrap()
5448 } else {
5449 // First extent: no resume input, just emit for the next.
5450 dec.decode_blocks_partial(&mut block_src, next, end, None, true)
5451 .unwrap()
5452 };
5453
5454 combined.extend_from_slice(&pd.data);
5455 carry = pd.resume_state;
5456 next = end;
5457 }
5458
5459 assert_eq!(
5460 combined, full,
5461 "grow-loop extents must reconstruct the full output"
5462 );
5463 }
5464
5465 #[cfg(all(feature = "lsm", feature = "hash"))]
5466 #[test]
5467 fn resume_does_not_redecode_prefix_blocks() {
5468 // Instrumented confirmation that blocks < N are not re-decoded on
5469 // resume. With per-block checksums enabled on the resuming decoder, the
5470 // resumed decode must record exactly one digest per in-range block
5471 // (end - N), never one per frame block.
5472 let (compressed, full, info) = multi_block_fixture();
5473 let nblocks = info.blocks.len() as u32;
5474 let n = nblocks / 2;
5475 let st = emit_resume_state_at(&compressed, n);
5476 let output_offset = info.decompressed_byte_range(n as usize).unwrap().start;
5477
5478 let mut header_src = compressed.as_slice();
5479 let mut dec = FrameDecoder::new();
5480 dec.enable_per_block_checksums();
5481 dec.reset(&mut header_src).unwrap();
5482 let off = info.blocks[n as usize].offset_in_frame as usize;
5483 let mut block_src = &compressed[off..];
5484 let _ = dec
5485 .decode_blocks_partial(
5486 &mut block_src,
5487 n,
5488 u32::MAX,
5489 Some(super::ResumeInput {
5490 window_prime: &full[..output_offset as usize],
5491 state: &st,
5492 }),
5493 false,
5494 )
5495 .unwrap();
5496
5497 assert_eq!(
5498 dec.computed_block_checksums().len() as u32,
5499 nblocks - n,
5500 "resume must decode only in-range blocks, not re-decode the prefix"
5501 );
5502 }
5503}