Skip to main content

structured_zstd/decoding/
frame_decoder.rs

1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::block_decoder::BlockDecoder;
10use crate::decoding::buffer_backend::BufferBackend;
11use crate::decoding::decode_buffer::DecodeBuffer;
12use crate::decoding::dictionary::{Dictionary, DictionaryHandle};
13use crate::decoding::errors::{DecodeBlockContentError, FrameDecoderError};
14use crate::decoding::flat_buf::FlatBuf;
15use crate::decoding::ringbuffer::RingBuffer;
16use crate::decoding::scratch::DecoderScratch;
17use crate::io::{Error, Read, Write};
18use alloc::collections::BTreeMap;
19use alloc::vec::Vec;
20use core::convert::TryInto;
21
22use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
23
24/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
25///
26/// This decoder is able to decode frames only partially and gives control
27/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
28/// It reads bytes as needed from a provided source and can be read from to collect partial results.
29///
30/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
31/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
32///
33/// Workflow is as follows:
34/// ```
35/// use structured_zstd::decoding::BlockDecodingStrategy;
36///
37/// # #[cfg(feature = "std")]
38/// use std::io::{Read, Write};
39///
40/// // no_std environments can use the crate's own Read traits
41/// # #[cfg(not(feature = "std"))]
42/// use structured_zstd::io::{Read, Write};
43///
44/// fn decode_this(mut file: impl Read) {
45///     //Create a new decoder
46///     let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
47///     let mut result = Vec::new();
48///
49///     // Use reset or init to make the decoder ready to decode the frame from the io::Read
50///     frame_dec.reset(&mut file).unwrap();
51///
52///     // Loop until the frame has been decoded completely
53///     while !frame_dec.is_finished() {
54///         // decode (roughly) batch_size many bytes
55///         frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
56///
57///         // read from the decoder to collect bytes from the internal buffer
58///         let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
59///
60///         // then do something with it
61///         do_something(&result[0..bytes_read]);
62///     }
63///
64///     // handle the last chunk of data
65///     while frame_dec.can_collect() > 0 {
66///         let x = frame_dec.read(result.as_mut_slice()).unwrap();
67///
68///         do_something(&result[0..x]);
69///     }
70/// }
71///
72/// fn do_something(data: &[u8]) {
73/// # #[cfg(feature = "std")]
74///     std::io::stdout().write_all(data).unwrap();
75/// }
76/// ```
77pub struct FrameDecoder {
78    state: Option<FrameDecoderState>,
79    owned_dicts: BTreeMap<u32, Dictionary>,
80    #[cfg(target_has_atomic = "ptr")]
81    shared_dicts: BTreeMap<u32, DictionaryHandle>,
82    #[cfg(not(target_has_atomic = "ptr"))]
83    shared_dicts: (),
84    /// `ZSTD_f_zstd1_magicless` — when true, [`init`] / [`reset`]
85    /// expect frames without the 4-byte magic number prefix.
86    /// Default false (standard zstd format).
87    magicless: bool,
88    /// Pinned `Dictionary_ID` expectation set via
89    /// [`Self::expect_dict_id`]. `None` (default) disables the
90    /// check; `Some(0)` matches frames whose header omits the
91    /// optional dict_id (treated as "no dictionary"). Validated in
92    /// [`Self::reset`] AFTER the frame header parses successfully
93    /// and BEFORE any block decode work.
94    #[cfg(feature = "lsm")]
95    expect_dict_id: Option<u32>,
96    /// Pinned `Window_Descriptor` byte expectation set via
97    /// [`Self::expect_window_descriptor`]. `None` (default)
98    /// disables the check. Validated in [`Self::reset`] AFTER the
99    /// frame header parses successfully and BEFORE any block
100    /// decode work. Single-segment frames (which omit the
101    /// `Window_Descriptor` byte from the wire) surface as
102    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`]
103    /// with `found: None`.
104    #[cfg(feature = "lsm")]
105    expect_window_descriptor: Option<u8>,
106    /// When `true`, the per-block decode loop XXH64-hashes each
107    /// block's decompressed bytes and stores the low-32-bit digest in
108    /// [`Self::computed_block_checksums`]. Default `false` (zero
109    /// cost). Set via [`Self::enable_per_block_checksums`]. Gated on
110    /// `all(lsm, hash)` because XXH64 lives behind the `hash`
111    /// feature.
112    #[cfg(all(feature = "lsm", feature = "hash"))]
113    per_block_checksums_enabled: bool,
114    /// Per-block XXH64 (low 32 bits) digests captured during the
115    /// current frame's decode when `per_block_checksums_enabled` is
116    /// set. Reset at the start of every new frame. Gated on
117    /// `all(lsm, hash)` (see `per_block_checksums_enabled`).
118    #[cfg(all(feature = "lsm", feature = "hash"))]
119    computed_block_checksums: alloc::vec::Vec<u32>,
120}
121
122/// Backend-tagged decode scratch — chosen at frame-reset time based
123/// on the parsed `FrameHeader.descriptor.single_segment_flag()` and
124/// kept stable through the lifetime of the frame. The match in each
125/// helper below dispatches **once per call** (e.g. once per block in
126/// `decode_block_content`, once per drain in `drain_to_writer`) —
127/// never inside the hot push/repeat loop, which is fully
128/// monomorphised through the `DecoderScratch<B>` generic.
129enum DecoderScratchKind {
130    Ring(DecoderScratch<RingBuffer>),
131    Flat(DecoderScratch<FlatBuf>),
132}
133
134impl DecoderScratchKind {
135    fn new_ring(window_size: usize) -> Self {
136        // Lazy ring-buffer allocation: do NOT `reserve(window_size)` here.
137        // The direct-decode path (`run_direct_decode`) writes through
138        // `UserSliceBackend` and never touches the ring; allocating it
139        // eagerly wastes one full window of peak memory on the common
140        // direct-eligible frame. On the non-direct path the window is
141        // pre-reserved once at frame entry (`decode_all_impl` and
142        // `decode_blocks` both call `DecoderScratchKind::reserve_buffer`
143        // before any block writes), so multi-block frames pay one
144        // amortised grow instead of repeated `reserve_amortized` steps
145        // per block. Issue #279 round 2.
146        let s = DecoderScratch::<RingBuffer>::new(window_size);
147        Self::Ring(s)
148    }
149
150    /// Construct a flat-backed scratch sized for a single-segment
151    /// frame. `frame_content_size` is the upcoming output size in
152    /// bytes (== `window_size` when the flag is set).
153    fn new_flat(frame_content_size: usize) -> Self {
154        let flat = FlatBuf::with_capacity(frame_content_size);
155        // DecoderScratch's default ctor would discard the pre-sized
156        // FlatBuf — go through from_backend so the buffer carries the
157        // capacity the constructor wants.
158        let mut s = DecoderScratch::<FlatBuf>::new(frame_content_size);
159        s.buffer = DecodeBuffer::from_backend(flat, frame_content_size);
160        Self::Flat(s)
161    }
162
163    /// Reset (or transition between) backends for a new frame.
164    /// Reuses the existing `DecoderScratch` allocations (FSE / HUF
165    /// tables, sequence vec, etc.) when the backend kind is unchanged
166    /// — only the underlying buffer is re-sized for the new frame.
167    /// Building a fresh `DecoderScratch` on every frame would
168    /// re-allocate everything and was measured at +255 % vs ring on
169    /// small frames; reusing it keeps the small-frame cost flat.
170    fn reset(&mut self, frame: &frame::FrameHeader, window_size: usize) {
171        if frame.descriptor.single_segment_flag() {
172            match self {
173                Self::Flat(s) => {
174                    s.reset(window_size);
175                    // DecodeBuffer::reset clears + reserves
176                    // window_size; FlatBuf's reserve grows the
177                    // backing Vec if the new FCS is larger than
178                    // what's already allocated. No alloc when the
179                    // previous flat frame had >= this capacity.
180                }
181                Self::Ring(_) => *self = Self::new_flat(window_size),
182            }
183        } else {
184            match self {
185                Self::Ring(s) => s.reset(window_size),
186                Self::Flat(_) => *self = Self::new_ring(window_size),
187            }
188        }
189    }
190
191    fn init_from_dict(&mut self, dict: &Dictionary) {
192        match self {
193            Self::Ring(s) => s.init_from_dict(dict),
194            Self::Flat(s) => s.init_from_dict(dict),
195        }
196    }
197
198    #[inline]
199    fn buffer_len(&self) -> usize {
200        match self {
201            Self::Ring(s) => s.buffer.len(),
202            Self::Flat(s) => s.buffer.len(),
203        }
204    }
205
206    /// Pre-reserve the backing buffer to `window_size` in a single
207    /// allocation. Called once on the non-direct (`decode_blocks`) path
208    /// after direct-eligibility is ruled out, so multi-segment fallback
209    /// decodes don't pay repeated `reserve_amortized` grow steps
210    /// (128 KiB → 256 KiB → ... → window) as blocks accumulate.
211    ///
212    /// Direct-eligible ring-backed frames never call this and pay zero
213    /// ring allocation for the window. Flat-backed (single-segment)
214    /// frames eagerly size their `FlatBuf` to `frame_content_size` at
215    /// `new_flat` construction time, so a direct-eligible flat frame
216    /// already paid for that capacity before this helper is reachable;
217    /// the "zero buffer allocation" guarantee here applies to the
218    /// ring path only.
219    #[inline]
220    fn reserve_buffer(&mut self, window_size: usize) {
221        match self {
222            Self::Ring(s) => s.buffer.reserve(window_size),
223            Self::Flat(s) => s.buffer.reserve(window_size),
224        }
225    }
226
227    /// Last `n` bytes of the visible buffer as `(s1, s2)` (wrap-aware).
228    /// Routes through whichever backend the current scratch holds.
229    #[cfg(all(feature = "lsm", feature = "hash"))]
230    fn last_n_as_slices(&self, n: usize) -> (&[u8], &[u8]) {
231        match self {
232            Self::Ring(s) => s.buffer.last_n_as_slices(n),
233            Self::Flat(s) => s.buffer.last_n_as_slices(n),
234        }
235    }
236
237    fn buffer_drain(&mut self) -> Vec<u8> {
238        match self {
239            Self::Ring(s) => s.buffer.drain(),
240            Self::Flat(s) => s.buffer.drain(),
241        }
242    }
243
244    fn buffer_drain_to_window_size(&mut self) -> Option<Vec<u8>> {
245        match self {
246            Self::Ring(s) => s.buffer.drain_to_window_size(),
247            Self::Flat(s) => s.buffer.drain_to_window_size(),
248        }
249    }
250
251    fn buffer_drain_to_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
252        match self {
253            Self::Ring(s) => s.buffer.drain_to_writer(sink),
254            Self::Flat(s) => s.buffer.drain_to_writer(sink),
255        }
256    }
257
258    fn buffer_drain_to_window_size_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
259        match self {
260            Self::Ring(s) => s.buffer.drain_to_window_size_writer(sink),
261            Self::Flat(s) => s.buffer.drain_to_window_size_writer(sink),
262        }
263    }
264
265    fn buffer_can_drain(&self) -> usize {
266        match self {
267            Self::Ring(s) => s.buffer.can_drain(),
268            Self::Flat(s) => s.buffer.can_drain(),
269        }
270    }
271
272    fn buffer_can_drain_to_window_size(&self) -> Option<usize> {
273        match self {
274            Self::Ring(s) => s.buffer.can_drain_to_window_size(),
275            Self::Flat(s) => s.buffer.can_drain_to_window_size(),
276        }
277    }
278
279    fn buffer_read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
280        match self {
281            Self::Ring(s) => s.buffer.read(target),
282            Self::Flat(s) => s.buffer.read(target),
283        }
284    }
285
286    fn buffer_read_all(&mut self, target: &mut [u8]) -> Result<usize, Error> {
287        match self {
288            Self::Ring(s) => s.buffer.read_all(target),
289            Self::Flat(s) => s.buffer.read_all(target),
290        }
291    }
292
293    fn decode_block_content<R: Read>(
294        &mut self,
295        decoder: &mut BlockDecoder,
296        header: &crate::blocks::block::BlockHeader,
297        source: R,
298    ) -> Result<u64, DecodeBlockContentError> {
299        match self {
300            Self::Ring(s) => decoder.decode_block_content(header, s, source),
301            Self::Flat(s) => decoder.decode_block_content(header, s, source),
302        }
303    }
304
305    #[cfg(feature = "hash")]
306    fn hash_finish(&self) -> u64 {
307        use core::hash::Hasher;
308        match self {
309            Self::Ring(s) => s.buffer.hash.finish(),
310            Self::Flat(s) => s.buffer.hash.finish(),
311        }
312    }
313}
314
315struct FrameDecoderState {
316    pub frame_header: frame::FrameHeader,
317    decoder_scratch: DecoderScratchKind,
318    frame_finished: bool,
319    block_counter: usize,
320    bytes_read_counter: u64,
321    check_sum: Option<u32>,
322    using_dict: Option<u32>,
323}
324
325pub enum BlockDecodingStrategy {
326    All,
327    UptoBlocks(usize),
328    UptoBytes(usize),
329}
330
331impl FrameDecoderState {
332    /// Construct a new frame decoder state, reading the frame header
333    /// from `source`. When `magicless` is `true`, the 4-byte magic
334    /// number prefix is NOT consumed (donor `ZSTD_f_zstd1_magicless`).
335    /// Crate-internal — reached only via `FrameDecoder::init` /
336    /// `FrameDecoder::init_with_dict_handle`. For multi-segment
337    /// (ring-backed) frames the decode buffer is allocated lazily —
338    /// direct-eligible frames pay zero buffer allocation, and the
339    /// non-direct fallback reserves `window_size` once in
340    /// `decode_all_impl` via `reserve_buffer`. Single-segment
341    /// (flat-backed) frames eagerly size the backing `FlatBuf` to
342    /// `frame_content_size` because the flat path writes the entire
343    /// output into the same buffer and cannot defer that allocation.
344    pub(crate) fn new_with_format(
345        source: impl Read,
346        magicless: bool,
347    ) -> Result<FrameDecoderState, FrameDecoderError> {
348        let (frame, header_size) = frame::read_frame_header_with_format(source, magicless)?;
349        let window_size = frame.window_size()?;
350
351        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
352            return Err(FrameDecoderError::WindowSizeTooBig {
353                requested: window_size,
354            });
355        }
356
357        let decoder_scratch = if frame.descriptor.single_segment_flag() {
358            DecoderScratchKind::new_flat(window_size as usize)
359        } else {
360            DecoderScratchKind::new_ring(window_size as usize)
361        };
362        Ok(FrameDecoderState {
363            frame_header: frame,
364            frame_finished: false,
365            block_counter: 0,
366            decoder_scratch,
367            bytes_read_counter: u64::from(header_size),
368            check_sum: None,
369            using_dict: None,
370        })
371    }
372
373    /// Reset this state for a new frame read from `source`, reusing
374    /// existing allocations. When `magicless` is `true`, the frame
375    /// header is read WITHOUT expecting a magic-number prefix
376    /// (donor `ZSTD_f_zstd1_magicless`). Crate-internal — reached
377    /// only via `FrameDecoder::reset`.
378    ///
379    /// `DecodeBuffer::reset` no longer reserves window_size for either
380    /// backend — capacity decisions live one layer up. For ring-backed
381    /// scratch, direct-eligible frames pay zero allocation here and
382    /// the non-direct path is pre-reserved by `decode_all_impl` /
383    /// `decode_blocks` via `DecoderScratchKind::reserve_buffer(window_size)`
384    /// before any block writes. Flat-backed scratch is sized to
385    /// `frame_content_size` by `DecoderScratchKind::new_flat` at first
386    /// construction (and on Ring → Flat transition); a reused flat
387    /// scratch whose new frame fits within the prior FCS reuses the
388    /// existing capacity, and one whose new FCS is larger is also
389    /// pre-reserved by the same `reserve_buffer(window_size)` call at
390    /// frame entry (single-segment frames have window_size == FCS).
391    pub(crate) fn reset_with_format(
392        &mut self,
393        source: impl Read,
394        magicless: bool,
395    ) -> Result<(), FrameDecoderError> {
396        let (frame_header, header_size) = frame::read_frame_header_with_format(source, magicless)?;
397        let window_size = frame_header.window_size()?;
398
399        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
400            return Err(FrameDecoderError::WindowSizeTooBig {
401                requested: window_size,
402            });
403        }
404
405        self.decoder_scratch
406            .reset(&frame_header, window_size as usize);
407        self.frame_header = frame_header;
408        self.frame_finished = false;
409        self.block_counter = 0;
410        self.bytes_read_counter = u64::from(header_size);
411        self.check_sum = None;
412        self.using_dict = None;
413        Ok(())
414    }
415}
416
417impl Default for FrameDecoder {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423impl FrameDecoder {
424    /// This will create a new decoder without allocating anything yet.
425    /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
426    /// else they just reset these buffers with not further allocations
427    pub fn new() -> FrameDecoder {
428        FrameDecoder {
429            state: None,
430            owned_dicts: BTreeMap::new(),
431            #[cfg(target_has_atomic = "ptr")]
432            shared_dicts: BTreeMap::new(),
433            #[cfg(not(target_has_atomic = "ptr"))]
434            shared_dicts: (),
435            magicless: false,
436            #[cfg(feature = "lsm")]
437            expect_dict_id: None,
438            #[cfg(feature = "lsm")]
439            expect_window_descriptor: None,
440            #[cfg(all(feature = "lsm", feature = "hash"))]
441            per_block_checksums_enabled: false,
442            #[cfg(all(feature = "lsm", feature = "hash"))]
443            computed_block_checksums: alloc::vec::Vec::new(),
444        }
445    }
446
447    /// Opt in to per-block XXH64 verification during decode.
448    /// Default off; zero cost when disabled. Each block's decompressed
449    /// bytes are XXH64-hashed (low 32 bits) and appended to
450    /// [`Self::computed_block_checksums`] as the decode progresses.
451    /// Callers compare the captured digests against externally-stored
452    /// expected values (e.g. from a per-block sidecar in the
453    /// containing application protocol).
454    ///
455    /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
456    /// primitive lives behind the `hash` feature, so this method
457    /// only compiles when both are enabled.
458    #[cfg(all(feature = "lsm", feature = "hash"))]
459    pub fn enable_per_block_checksums(&mut self) {
460        self.per_block_checksums_enabled = true;
461    }
462
463    /// Per-block XXH64 (low 32 bits) digests captured during the
464    /// current frame's decode. Empty unless
465    /// [`Self::enable_per_block_checksums`] was called before
466    /// [`Self::decode_all`] / [`Self::reset`].
467    ///
468    /// Reset at the start of every new frame.
469    ///
470    /// Behind `all(feature = "lsm", feature = "hash")`.
471    #[cfg(all(feature = "lsm", feature = "hash"))]
472    pub fn computed_block_checksums(&self) -> &[u32] {
473        &self.computed_block_checksums
474    }
475
476    /// Pin the expected `Dictionary_ID` for the next frame.
477    ///
478    /// When `expected` is set, [`Self::init`] / [`Self::reset`]
479    /// validate it against the parsed frame header BEFORE any
480    /// block decode work runs. A mismatch returns
481    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedDictId`]
482    /// before any block decode and before any output is produced.
483    /// Scratch buffer allocation / reservation for the decode
484    /// pipeline happens during frame-header parsing, which is
485    /// already complete when this validation fires — the cost of
486    /// scratch sizing is paid even on a mismatched header. The
487    /// guarantee is "no block decode, no XXH64 init, no partial
488    /// output", not "zero allocation".
489    ///
490    /// `Some(0)` is treated as "no dictionary expected": a frame
491    /// whose header omits the optional `Dictionary_ID` field
492    /// (flag value 0) passes the check; a frame that carries an
493    /// explicit non-zero id fails.
494    ///
495    /// `None` (default) disables the check.
496    ///
497    /// Primary use case: post-AEAD-decrypt sanity check in
498    /// wire-format consumers (e.g. lsm-tree's encrypted block
499    /// format pins the `dict_id` baked into the AAD against the
500    /// inner zstd frame's `dict_id` to defeat dict-substitution
501    /// attacks).
502    ///
503    /// NOT a replacement for AEAD authentication. NOT the same
504    /// semantic as donor `ZSTD_d_windowLogMax` (which is a
505    /// ceiling-style limit, separate concern).
506    #[cfg(feature = "lsm")]
507    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
508    pub fn expect_dict_id(&mut self, expected: Option<u32>) {
509        self.expect_dict_id = expected;
510    }
511
512    /// Pin the expected raw `Window_Descriptor` byte (RFC 8878
513    /// §3.1.1.1.2 layout: `(exp << 3) | mantissa`) for the next
514    /// frame.
515    ///
516    /// When `expected` is set, [`Self::init`] / [`Self::reset`]
517    /// validate it against the parsed frame header BEFORE any
518    /// block decode work runs. A mismatch returns
519    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`].
520    ///
521    /// Single-segment frames omit the `Window_Descriptor` byte
522    /// from the wire entirely. Setting an expectation while
523    /// receiving a single-segment frame fails the check with
524    /// `found: None` — there is no on-wire byte to match against,
525    /// which is reported explicitly rather than silently passing.
526    ///
527    /// `None` (default) disables the check.
528    ///
529    /// Byte-exact equality, NOT a ceiling. Donor
530    /// `ZSTD_d_windowLogMax` is a separate ceiling-style limit
531    /// available through the C FFI surface; this method is for
532    /// strict equality validation against a pinned expectation
533    /// (e.g. lsm-tree's wire format pins the window descriptor
534    /// from the AAD to defeat decompression-bomb-swap attacks).
535    #[cfg(feature = "lsm")]
536    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
537    pub fn expect_window_descriptor(&mut self, expected: Option<u8>) {
538        self.expect_window_descriptor = expected;
539    }
540
541    /// Validate the just-parsed frame header against any pinned
542    /// expectations set via [`Self::expect_dict_id`] /
543    /// [`Self::expect_window_descriptor`].
544    ///
545    /// Returns the typed error variant on mismatch and leaves
546    /// `self.state` in a re-resettable shape — a subsequent
547    /// `reset()` will overwrite `frame_header` from the new source
548    /// without needing intermediate cleanup.
549    #[cfg(feature = "lsm")]
550    fn validate_expectations(
551        &self,
552        frame_header: &frame::FrameHeader,
553    ) -> Result<(), FrameDecoderError> {
554        if let Some(expected) = self.expect_dict_id {
555            let found = frame_header.dictionary_id();
556            // `Some(0)` is the "no dictionary expected" sentinel —
557            // matches a frame whose header omits the optional
558            // dict_id field (which is reported as `None` by the
559            // parser). All other values must match exactly.
560            let matches = match (expected, found) {
561                (0, None) => true,
562                (e, Some(f)) => e == f,
563                _ => false,
564            };
565            if !matches {
566                return Err(FrameDecoderError::UnexpectedDictId {
567                    expected: Some(expected),
568                    found,
569                });
570            }
571        }
572        if let Some(expected) = self.expect_window_descriptor {
573            let found = frame_header.window_descriptor();
574            if found != Some(expected) {
575                return Err(FrameDecoderError::UnexpectedWindowDescriptor { expected, found });
576            }
577        }
578        Ok(())
579    }
580
581    /// Enable or disable magicless frame format
582    /// (`ZSTD_f_zstd1_magicless`). When set to `true`, subsequent
583    /// [`init`] / [`reset`] calls expect the frame header to begin
584    /// directly with the frame-header descriptor — no 4-byte magic
585    /// number prefix. Default false. Must match the encoder's
586    /// magicless setting; the format is unambiguous only when the
587    /// caller knows it out-of-band.
588    ///
589    /// Note: magicless mode also disables skippable-frame detection.
590    /// The `0x184D2A50..=0x184D2A5F` skippable-frame magic range is
591    /// only recognised when the 4-byte magic prefix is consumed, so
592    /// `decode_all` / `init` / `reset` will treat a skippable frame
593    /// at the head of a magicless stream as a malformed frame header
594    /// (bad descriptor / window-size error) instead of skipping it.
595    /// Mixed-format streams that interleave skippable frames must be
596    /// pre-split by the caller; `set_magicless(true)` is only safe
597    /// when the entire stream is known to be magicless zstd frames.
598    pub fn set_magicless(&mut self, magicless: bool) {
599        self.magicless = magicless;
600    }
601
602    #[cfg(target_has_atomic = "ptr")]
603    fn shared_dict_exists(&self, dict_id: u32) -> bool {
604        self.shared_dicts.contains_key(&dict_id)
605    }
606
607    #[cfg(not(target_has_atomic = "ptr"))]
608    fn shared_dict_exists(&self, _dict_id: u32) -> bool {
609        false
610    }
611
612    fn validate_registered_dictionary(dict: &Dictionary) -> Result<(), FrameDecoderError> {
613        use crate::decoding::errors::DictionaryDecodeError as dict_err;
614
615        if dict.id == 0 {
616            return Err(FrameDecoderError::from(dict_err::ZeroDictionaryId));
617        }
618        if let Some(index) = dict.offset_hist.iter().position(|&rep| rep == 0) {
619            return Err(FrameDecoderError::from(
620                dict_err::ZeroRepeatOffsetInDictionary { index: index as u8 },
621            ));
622        }
623        Ok(())
624    }
625
626    /// init() will allocate all needed buffers if it is the first time this decoder is used
627    /// else they just reset these buffers with not further allocations
628    ///
629    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
630    ///
631    /// equivalent to reset()
632    pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
633        self.reset(source)
634    }
635
636    /// Initialize the decoder for a new frame using a pre-parsed dictionary handle.
637    ///
638    /// If the frame header has a dictionary ID, this validates it against
639    /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
640    ///
641    /// If the header omits the optional dictionary ID, this still applies the
642    /// provided dictionary handle.
643    ///
644    /// # Warning
645    ///
646    /// This method always applies `dict` unless the frame header contains a
647    /// non-matching dictionary ID. Callers must only use this API when they
648    /// already know the frame was encoded with the provided dictionary, even if
649    /// the frame header omits the dictionary ID or encodes an explicit
650    /// dictionary ID of `0`.
651    ///
652    /// Passing a dictionary for a frame that was not encoded with it can
653    /// silently corrupt the decoded output.
654    pub fn init_with_dict_handle(
655        &mut self,
656        source: impl Read,
657        dict: &DictionaryHandle,
658    ) -> Result<(), FrameDecoderError> {
659        self.reset_with_dict_handle(source, dict)
660    }
661
662    /// reset() will allocate all needed buffers if it is the first time this decoder is used
663    /// else they just reset these buffers with not further allocations
664    ///
665    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
666    ///
667    /// equivalent to init()
668    pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
669        use FrameDecoderError as err;
670        // Fresh frame → start with an empty per-block checksum vec so
671        // the values for the next frame don't carry over from the
672        // previous one.
673        #[cfg(all(feature = "lsm", feature = "hash"))]
674        self.computed_block_checksums.clear();
675        let magicless = self.magicless;
676        let dict_id = match &mut self.state {
677            Some(s) => {
678                s.reset_with_format(source, magicless)?;
679                s.frame_header.dictionary_id()
680            }
681            None => {
682                self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
683                self.state
684                    .as_ref()
685                    .and_then(|state| state.frame_header.dictionary_id())
686            }
687        };
688        // Validate any pinned expectations BEFORE block decode work
689        // runs. Catches dict_id substitution / window-descriptor
690        // tampering on inputs already authenticated by an outer
691        // layer (e.g. AEAD). Returning here leaves `self.state` in
692        // a re-resettable shape — next `reset()` re-parses the
693        // frame header without intermediate cleanup.
694        #[cfg(feature = "lsm")]
695        if let Some(state) = self.state.as_ref() {
696            self.validate_expectations(&state.frame_header)?;
697        }
698        if let Some(dict_id) = dict_id {
699            let state = self.state.as_mut().expect("state initialized");
700            let owned_dicts = &self.owned_dicts;
701            #[cfg(target_has_atomic = "ptr")]
702            let shared_dicts = &self.shared_dicts;
703            let dict = owned_dicts
704                .get(&dict_id)
705                .or_else(|| {
706                    #[cfg(target_has_atomic = "ptr")]
707                    {
708                        shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
709                    }
710                    #[cfg(not(target_has_atomic = "ptr"))]
711                    {
712                        None
713                    }
714                })
715                .ok_or(err::DictNotProvided { dict_id })?;
716            state.decoder_scratch.init_from_dict(dict);
717            state.using_dict = Some(dict_id);
718        }
719        Ok(())
720    }
721
722    /// Reset this decoder for a new frame using a pre-parsed dictionary handle.
723    ///
724    /// If the frame header has a dictionary ID, this validates it against
725    /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
726    ///
727    /// If the header omits the optional dictionary ID, this still applies the
728    /// provided dictionary handle.
729    ///
730    /// # Warning
731    ///
732    /// This method always applies `dict` unless the frame header contains a
733    /// non-matching dictionary ID. Callers must only use this API when they
734    /// already know the frame was encoded with the provided dictionary, even if
735    /// the frame header omits the dictionary ID or encodes an explicit
736    /// dictionary ID of `0`.
737    ///
738    /// Passing a dictionary for a frame that was not encoded with it can
739    /// silently corrupt the decoded output.
740    pub fn reset_with_dict_handle(
741        &mut self,
742        source: impl Read,
743        dict: &DictionaryHandle,
744    ) -> Result<(), FrameDecoderError> {
745        use FrameDecoderError as err;
746        // Fresh frame → drop the previous frame's per-block checksum
747        // digests so the next decode starts with an empty vec.
748        // Mirrors the same clear in `reset()`; reset_with_dict_handle
749        // is a parallel entry point so it needs its own call.
750        #[cfg(all(feature = "lsm", feature = "hash"))]
751        self.computed_block_checksums.clear();
752        Self::validate_registered_dictionary(dict.as_dict())?;
753        let magicless = self.magicless;
754        // Scope the &mut borrow of `self.state` to the header parse
755        // alone, so the subsequent `validate_expectations(&self, ...)`
756        // call below can take a fresh shared borrow of self without
757        // tripping the borrow checker.
758        match &mut self.state {
759            Some(s) => s.reset_with_format(source, magicless)?,
760            None => {
761                self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
762            }
763        }
764        // Single source of truth: route through the same
765        // `validate_expectations` used by `reset()`. Routing through
766        // the helper keeps the two code paths from drifting (e.g.,
767        // if expect-semantics or error wiring changes later).
768        #[cfg(feature = "lsm")]
769        {
770            let header = &self
771                .state
772                .as_ref()
773                .expect("state populated by reset_with_format/new_with_format")
774                .frame_header;
775            self.validate_expectations(header)?;
776        }
777        let state = self
778            .state
779            .as_mut()
780            .expect("state populated by reset_with_format/new_with_format");
781        if let Some(dict_id) = state.frame_header.dictionary_id()
782            && dict_id != dict.id()
783        {
784            return Err(err::DictIdMismatch {
785                expected: dict_id,
786                provided: dict.id(),
787            });
788        }
789        state.decoder_scratch.init_from_dict(dict.as_dict());
790        state.using_dict = Some(dict.id());
791        Ok(())
792    }
793
794    /// Add a dictionary that can be selected dynamically by frame dictionary ID.
795    ///
796    /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
797    /// registered (either as owned or shared).
798    pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
799        Self::validate_registered_dictionary(&dict)?;
800        let dict_id = dict.id;
801        if self.owned_dicts.contains_key(&dict_id) || self.shared_dict_exists(dict_id) {
802            return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
803        }
804        self.owned_dicts.insert(dict_id, dict);
805        Ok(())
806    }
807
808    /// Parse and add a serialized dictionary blob.
809    pub fn add_dict_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), FrameDecoderError> {
810        let dict = Dictionary::decode_dict(raw_dictionary)?;
811        self.add_dict(dict)
812    }
813
814    /// Add a pre-parsed dictionary handle for reuse across decoders.
815    ///
816    /// This API is available on targets with pointer-width atomics
817    /// (`target_has_atomic = "ptr"`).
818    ///
819    /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
820    /// registered (either as owned or shared).
821    #[cfg(target_has_atomic = "ptr")]
822    pub fn add_dict_handle(&mut self, dict: DictionaryHandle) -> Result<(), FrameDecoderError> {
823        Self::validate_registered_dictionary(dict.as_dict())?;
824        let dict_id = dict.id();
825        if self.owned_dicts.contains_key(&dict_id) || self.shared_dicts.contains_key(&dict_id) {
826            return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
827        }
828        self.shared_dicts.insert(dict_id, dict);
829        Ok(())
830    }
831
832    pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
833        use FrameDecoderError as err;
834        let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
835        let owned_dicts = &self.owned_dicts;
836        #[cfg(target_has_atomic = "ptr")]
837        let shared_dicts = &self.shared_dicts;
838
839        let dict = owned_dicts
840            .get(&dict_id)
841            .or_else(|| {
842                #[cfg(target_has_atomic = "ptr")]
843                {
844                    shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
845                }
846                #[cfg(not(target_has_atomic = "ptr"))]
847                {
848                    None
849                }
850            })
851            .ok_or(err::DictNotProvided { dict_id })?;
852        state.decoder_scratch.init_from_dict(dict);
853        state.using_dict = Some(dict_id);
854
855        Ok(())
856    }
857
858    /// Returns how many bytes the frame contains after decompression
859    pub fn content_size(&self) -> u64 {
860        match &self.state {
861            None => 0,
862            Some(s) => s.frame_header.frame_content_size(),
863        }
864    }
865
866    /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
867    pub fn get_checksum_from_data(&self) -> Option<u32> {
868        let state = self.state.as_ref()?;
869
870        state.check_sum
871    }
872
873    /// Returns the checksum that was calculated while decoding.
874    /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder.
875    /// Returns `None` when the frame header has `content_checksum_flag = 0`:
876    /// no hash is computed for such frames (the post-decode XXH64 pass was a
877    /// 63 % decode-wall hotspot on flag-off frames; skipping it when the
878    /// frame format declares no trailing digest avoids that wasted work).
879    #[cfg(feature = "hash")]
880    pub fn get_calculated_checksum(&self) -> Option<u32> {
881        let state = self.state.as_ref()?;
882        if !state.frame_header.descriptor.content_checksum_flag() {
883            return None;
884        }
885        let cksum_64bit = state.decoder_scratch.hash_finish();
886        //truncate to lower 32bit because reasons...
887        Some(cksum_64bit as u32)
888    }
889
890    /// Counter for how many bytes have been consumed while decoding the frame
891    pub fn bytes_read_from_source(&self) -> u64 {
892        let state = match &self.state {
893            None => return 0,
894            Some(s) => s,
895        };
896        state.bytes_read_counter
897    }
898
899    /// Whether the current frames last block has been decoded yet
900    /// If this returns true you can call the drain* functions to get all content
901    /// (the read() function will drain automatically if this returns true)
902    pub fn is_finished(&self) -> bool {
903        let state = match &self.state {
904            None => return true,
905            Some(s) => s,
906        };
907        if state.frame_header.descriptor.content_checksum_flag() {
908            state.frame_finished && state.check_sum.is_some()
909        } else {
910            state.frame_finished
911        }
912    }
913
914    /// Counter for how many blocks have already been decoded
915    pub fn blocks_decoded(&self) -> usize {
916        let state = match &self.state {
917            None => return 0,
918            Some(s) => s,
919        };
920        state.block_counter
921    }
922
923    /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
924    /// The Strategy influences how many blocks will be decoded before the function returns
925    /// This is important if you want to manage memory consumption carefully. If you don't care
926    /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
927    pub fn decode_blocks(
928        &mut self,
929        mut source: impl Read,
930        strat: BlockDecodingStrategy,
931    ) -> Result<bool, FrameDecoderError> {
932        use FrameDecoderError as err;
933        let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
934
935        // Streaming entry point: pre-reserve the backing buffer to
936        // `window_size` so multi-block frames don't pay repeated
937        // `reserve_amortized` grow steps (128 KiB → 256 KiB → ... →
938        // window) as blocks accumulate. `decode_all` does the same up
939        // front in `decode_all_impl`; this mirrors it for callers
940        // driving `decode_blocks` directly. Idempotent — the
941        // backend's `reserve` early-returns when capacity is already
942        // sufficient.
943        let window_size = state.frame_header.window_size().unwrap_or(0) as usize;
944        state.decoder_scratch.reserve_buffer(window_size);
945
946        let mut block_dec = decoding::block_decoder::new();
947
948        let buffer_size_before = state.decoder_scratch.buffer_len();
949        let block_counter_before = state.block_counter;
950        loop {
951            vprintln!("################");
952            vprintln!("Next Block: {}", state.block_counter);
953            vprintln!("################");
954            let (block_header, block_header_size) = block_dec
955                .read_block_header(&mut source)
956                .map_err(err::FailedToReadBlockHeader)?;
957            state.bytes_read_counter += u64::from(block_header_size);
958
959            vprintln!();
960            vprintln!(
961                "Found {} block with size: {}, which will be of size: {}",
962                block_header.block_type,
963                block_header.content_size,
964                block_header.decompressed_size
965            );
966
967            #[cfg(all(feature = "lsm", feature = "hash"))]
968            let len_before_block: Option<usize> = if self.per_block_checksums_enabled {
969                Some(state.decoder_scratch.buffer_len())
970            } else {
971                None
972            };
973            let bytes_read_in_block_body = state
974                .decoder_scratch
975                .decode_block_content(&mut block_dec, &block_header, &mut source)
976                .map_err(err::FailedToReadBlockBody)?;
977            state.bytes_read_counter += bytes_read_in_block_body;
978
979            // Per-block XXH64 (low 32 bits) of the just-decompressed
980            // bytes. Hashed from `last_n_as_slices` so RingBuffer wrap
981            // is handled in-place, no extra copy.
982            #[cfg(all(feature = "lsm", feature = "hash"))]
983            if let Some(len_before_block) = len_before_block {
984                let added = state.decoder_scratch.buffer_len() - len_before_block;
985                let (s1, s2) = state.decoder_scratch.last_n_as_slices(added);
986                let mut h = twox_hash::XxHash64::with_seed(0);
987                use core::hash::Hasher;
988                h.write(s1);
989                h.write(s2);
990                self.computed_block_checksums.push(h.finish() as u32);
991            }
992
993            state.block_counter += 1;
994
995            vprintln!("Output: {}", state.decoder_scratch.buffer_len());
996
997            if block_header.last_block {
998                state.frame_finished = true;
999                if state.frame_header.descriptor.content_checksum_flag() {
1000                    let mut chksum = [0u8; 4];
1001                    source
1002                        .read_exact(&mut chksum)
1003                        .map_err(err::FailedToReadChecksum)?;
1004                    state.bytes_read_counter += 4;
1005                    let chksum = u32::from_le_bytes(chksum);
1006                    state.check_sum = Some(chksum);
1007                }
1008                break;
1009            }
1010
1011            match strat {
1012                BlockDecodingStrategy::All => { /* keep going */ }
1013                BlockDecodingStrategy::UptoBlocks(n) => {
1014                    if state.block_counter - block_counter_before >= n {
1015                        break;
1016                    }
1017                }
1018                BlockDecodingStrategy::UptoBytes(n) => {
1019                    if state.decoder_scratch.buffer_len() - buffer_size_before >= n {
1020                        break;
1021                    }
1022                }
1023            }
1024        }
1025
1026        Ok(state.frame_finished)
1027    }
1028
1029    /// Collect bytes and retain window_size bytes while decoding is still going on.
1030    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1031    pub fn collect(&mut self) -> Option<Vec<u8>> {
1032        let finished = self.is_finished();
1033        let state = self.state.as_mut()?;
1034        if finished {
1035            Some(state.decoder_scratch.buffer_drain())
1036        } else {
1037            state.decoder_scratch.buffer_drain_to_window_size()
1038        }
1039    }
1040
1041    /// Collect bytes and retain window_size bytes while decoding is still going on.
1042    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1043    pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
1044        let finished = self.is_finished();
1045        let state = match &mut self.state {
1046            None => return Ok(0),
1047            Some(s) => s,
1048        };
1049        if finished {
1050            state.decoder_scratch.buffer_drain_to_writer(w)
1051        } else {
1052            state.decoder_scratch.buffer_drain_to_window_size_writer(w)
1053        }
1054    }
1055
1056    /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
1057    /// because window_size bytes need to be retained for decoding.
1058    /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
1059    pub fn can_collect(&self) -> usize {
1060        let finished = self.is_finished();
1061        let state = match &self.state {
1062            None => return 0,
1063            Some(s) => s,
1064        };
1065        if finished {
1066            state.decoder_scratch.buffer_can_drain()
1067        } else {
1068            state
1069                .decoder_scratch
1070                .buffer_can_drain_to_window_size()
1071                .unwrap_or(0)
1072        }
1073    }
1074
1075    /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
1076    /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
1077    ///
1078    /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
1079    /// which try to serve an old-style c api
1080    ///
1081    /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
1082    /// input will not make any progress!
1083    ///
1084    /// Note that no kind of block can be bigger than 128kb.
1085    /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
1086    ///
1087    /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
1088    pub fn decode_from_to(
1089        &mut self,
1090        source: &[u8],
1091        target: &mut [u8],
1092    ) -> Result<(usize, usize), FrameDecoderError> {
1093        use FrameDecoderError as err;
1094        let bytes_read_at_start = match &self.state {
1095            Some(s) => s.bytes_read_counter,
1096            None => 0,
1097        };
1098
1099        if !self.is_finished() || self.state.is_none() {
1100            let mut mt_source = source;
1101
1102            if self.state.is_none() {
1103                self.init(&mut mt_source)?;
1104            }
1105
1106            //pseudo block to scope "state" so we can borrow self again after the block
1107            {
1108                let state = match &mut self.state {
1109                    Some(s) => s,
1110                    None => panic!("Bug in library"),
1111                };
1112                let mut block_dec = decoding::block_decoder::new();
1113
1114                if state.frame_header.descriptor.content_checksum_flag()
1115                    && state.frame_finished
1116                    && state.check_sum.is_none()
1117                {
1118                    //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
1119                    if mt_source.len() >= 4 {
1120                        let chksum = mt_source[..4].try_into().expect("optimized away");
1121                        state.bytes_read_counter += 4;
1122                        let chksum = u32::from_le_bytes(chksum);
1123                        state.check_sum = Some(chksum);
1124                    }
1125                    return Ok((4, 0));
1126                }
1127
1128                loop {
1129                    //check if there are enough bytes for the next header
1130                    if mt_source.len() < 3 {
1131                        break;
1132                    }
1133                    let (block_header, block_header_size) = block_dec
1134                        .read_block_header(&mut mt_source)
1135                        .map_err(err::FailedToReadBlockHeader)?;
1136
1137                    // check the needed size for the block before updating counters.
1138                    // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
1139                    if mt_source.len() < block_header.content_size as usize {
1140                        break;
1141                    }
1142                    state.bytes_read_counter += u64::from(block_header_size);
1143
1144                    let bytes_read_in_block_body = state
1145                        .decoder_scratch
1146                        .decode_block_content(&mut block_dec, &block_header, &mut mt_source)
1147                        .map_err(err::FailedToReadBlockBody)?;
1148                    state.bytes_read_counter += bytes_read_in_block_body;
1149                    state.block_counter += 1;
1150
1151                    if block_header.last_block {
1152                        state.frame_finished = true;
1153                        if state.frame_header.descriptor.content_checksum_flag() {
1154                            //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
1155                            if mt_source.len() >= 4 {
1156                                let chksum = mt_source[..4].try_into().expect("optimized away");
1157                                state.bytes_read_counter += 4;
1158                                let chksum = u32::from_le_bytes(chksum);
1159                                state.check_sum = Some(chksum);
1160                            }
1161                        }
1162                        break;
1163                    }
1164                }
1165            }
1166        }
1167
1168        let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
1169        let bytes_read_at_end = match &mut self.state {
1170            Some(s) => s.bytes_read_counter,
1171            None => panic!("Bug in library"),
1172        };
1173        let read_len = bytes_read_at_end - bytes_read_at_start;
1174        Ok((read_len as usize, result_len))
1175    }
1176
1177    /// Decode multiple frames into the output slice.
1178    ///
1179    /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1180    /// skipped during decode.
1181    ///
1182    /// `output` must be large enough to hold the decompressed data. If you don't know
1183    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1184    ///
1185    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1186    ///
1187    /// Returns the number of bytes written to `output`.
1188    pub fn decode_all(
1189        &mut self,
1190        input: &[u8],
1191        output: &mut [u8],
1192    ) -> Result<usize, FrameDecoderError> {
1193        #[cfg(not(feature = "lsm"))]
1194        {
1195            self.decode_all_impl(input, output, |this, src| this.init(src))
1196        }
1197        #[cfg(feature = "lsm")]
1198        {
1199            self.decode_all_impl(input, output, |this, src| this.init(src), None)
1200        }
1201    }
1202
1203    /// Decode multiple frames into the output slice, invoking `visitor`
1204    /// for every skippable frame encountered before advancing past it.
1205    ///
1206    /// `input` must contain an exact number of frames. Skippable frames
1207    /// (RFC 8878 §3.1.2 magic numbers `0x184D2A50..=0x184D2A5F`) are
1208    /// allowed and will be both visited AND skipped: the visitor gets
1209    /// `(magic_variant, payload)` where `magic_variant` is the low
1210    /// nibble of the magic (`magic - 0x184D2A50`, range `0..=15`) and
1211    /// `payload` is a borrowed slice of the on-wire payload bytes (the
1212    /// skippable frame's `Frame_Size` field worth of data) into
1213    /// `input` — no allocation.
1214    ///
1215    /// The visitor sees skippable frames in stream order; interleaved
1216    /// regular zstd frames continue to decompress into `output` exactly
1217    /// as `decode_all` does.
1218    ///
1219    /// `output` must be large enough to hold the decompressed data.
1220    /// Returns the number of bytes written to `output`.
1221    ///
1222    /// # Example
1223    ///
1224    /// ```ignore
1225    /// use structured_zstd::decoding::FrameDecoder;
1226    ///
1227    /// let mut decoder = FrameDecoder::new();
1228    /// let mut output = vec![0u8; 1024];
1229    /// let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
1230    /// let n = decoder.decode_all_with_skippable_visitor(
1231    ///     input,
1232    ///     &mut output,
1233    ///     |variant, payload| collected.push((variant, payload.to_vec())),
1234    /// )?;
1235    /// ```
1236    #[cfg(feature = "lsm")]
1237    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1238    pub fn decode_all_with_skippable_visitor<F>(
1239        &mut self,
1240        input: &[u8],
1241        output: &mut [u8],
1242        mut visitor: F,
1243    ) -> Result<usize, FrameDecoderError>
1244    where
1245        F: FnMut(u8, &[u8]),
1246    {
1247        self.decode_all_impl(
1248            input,
1249            output,
1250            |this, src| this.init(src),
1251            Some(&mut visitor),
1252        )
1253    }
1254
1255    /// Decode multiple frames into the output slice using a pre-parsed dictionary handle.
1256    ///
1257    /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1258    /// skipped during decode.
1259    ///
1260    /// `output` must be large enough to hold the decompressed data. If you don't know
1261    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1262    ///
1263    /// This calls [`FrameDecoder::init_with_dict_handle`], and all bytes currently in the
1264    /// decoder will be lost.
1265    ///
1266    /// # Warning
1267    ///
1268    /// Each decoded frame is initialized with `dict`, even when a frame header
1269    /// omits the optional dictionary ID. Callers must only use this API when
1270    /// they already know the input frames were encoded with the provided
1271    /// dictionary; otherwise decoded output can be silently corrupted.
1272    pub fn decode_all_with_dict_handle(
1273        &mut self,
1274        input: &[u8],
1275        output: &mut [u8],
1276        dict: &DictionaryHandle,
1277    ) -> Result<usize, FrameDecoderError> {
1278        #[cfg(not(feature = "lsm"))]
1279        {
1280            self.decode_all_impl(input, output, |this, src| {
1281                this.init_with_dict_handle(src, dict)
1282            })
1283        }
1284        #[cfg(feature = "lsm")]
1285        {
1286            self.decode_all_impl(
1287                input,
1288                output,
1289                |this, src| this.init_with_dict_handle(src, dict),
1290                None,
1291            )
1292        }
1293    }
1294
1295    /// Default-feature decode_all_impl: no visitor parameter so the
1296    /// no-lsm build's call surface and codegen are byte-identical to
1297    /// the pre-#172 implementation. Compiles only when `lsm` is OFF.
1298    #[cfg(not(feature = "lsm"))]
1299    fn decode_all_impl(
1300        &mut self,
1301        mut input: &[u8],
1302        mut output: &mut [u8],
1303        mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1304    ) -> Result<usize, FrameDecoderError> {
1305        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1306        let mut total_bytes_written = 0;
1307        while !input.is_empty() {
1308            match init_frame(self, &mut input) {
1309                Ok(_) => {}
1310                Err(FrameDecoderError::ReadFrameHeaderError(
1311                    crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
1312                )) => {
1313                    input = input
1314                        .get(length as usize..)
1315                        .ok_or(FrameDecoderError::FailedToSkipFrame)?;
1316                    continue;
1317                }
1318                Err(e) => return Err(e),
1319            };
1320            // Per-frame direct-path dispatch. Now safe to route the
1321            // public `decode_all` here because
1322            // `UserSliceBackend::exec_sequence_inline` returns
1323            // `Result<(), ExecuteSequencesError>` instead of
1324            // panicking on capacity overflow; the error propagates
1325            // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1326            // active dict, remaining `output` slice has WILDCOPY
1327            // slack) puts the frame on the fast path that bypasses
1328            // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1329            // frames (no FCS, active dict, output too small for
1330            // slack) fall through to the legacy `decode_blocks` +
1331            // `read` drain loop below.
1332            let (content_size, fcs_declared, dict_active, window_size) = {
1333                let state_ref = self.state.as_ref().expect("init populated state");
1334                (
1335                    state_ref.frame_header.frame_content_size(),
1336                    state_ref.frame_header.fcs_declared(),
1337                    state_ref.using_dict.is_some(),
1338                    state_ref.frame_header.window_size().unwrap_or(0),
1339                )
1340            };
1341            let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1342            // Per-block checksums collected inside `run_direct_decode`
1343            // post-loop (over recorded (start, end) ranges of `output`)
1344            // so the direct path stays eligible AND keeps the
1345            // window-size cap (`drop_to_window_size`) between blocks
1346            // that the spec relies on for `offset <= window_size`
1347            // validation. Path choice no longer alters checksum
1348            // semantics.
1349            let direct_eligible =
1350                content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1351            if direct_eligible {
1352                let written = self.run_direct_decode(&mut input, output, content_size)?;
1353                output = &mut output[written..];
1354                total_bytes_written += written;
1355                continue;
1356            }
1357            // Non-direct fallback: pre-reserve the backing buffer to
1358            // `window_size` in a single allocation before block decode
1359            // starts, so multi-segment frames don't pay repeated
1360            // `reserve_amortized` grow steps as blocks accumulate (each
1361            // block only reserves MAX_BLOCK_SIZE = 128 KiB, so a window
1362            // > 128 KiB otherwise grows through several intermediate
1363            // sizes with `alloc_zeroed + memcpy` each time).
1364            if let Some(state) = self.state.as_mut() {
1365                state.decoder_scratch.reserve_buffer(window_size as usize);
1366            }
1367            let frame_start_total = total_bytes_written;
1368            loop {
1369                self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1370                let bytes_written = self
1371                    .read(output)
1372                    .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1373                output = &mut output[bytes_written..];
1374                total_bytes_written += bytes_written;
1375                if self.can_collect() != 0 {
1376                    return Err(FrameDecoderError::TargetTooSmall);
1377                }
1378                if self.is_finished() {
1379                    break;
1380                }
1381            }
1382            // Per-frame FCS validation on the legacy fallback path.
1383            // Use `fcs_declared()` (NOT `content_size > 0`) so an
1384            // empty frame with explicit FCS=0 on the wire still gets
1385            // validated.
1386            if fcs_declared {
1387                let produced = (total_bytes_written - frame_start_total) as u64;
1388                if produced != content_size {
1389                    return Err(FrameDecoderError::FrameContentSizeMismatch {
1390                        declared: content_size,
1391                        produced,
1392                    });
1393                }
1394            }
1395        }
1396
1397        Ok(total_bytes_written)
1398    }
1399
1400    /// `lsm`-feature decode_all_impl: adds the optional skippable
1401    /// visitor parameter consumed by
1402    /// [`Self::decode_all_with_skippable_visitor`]. Mirrors the no-lsm
1403    /// variant including the direct-path dispatch + FCS-validation
1404    /// rationale comments, so the two functions stay in sync; the only
1405    /// behavioral difference is the SkipFrame arm, which uses
1406    /// `split_at(length)` (single bounds check) instead of two
1407    /// separate `get(..length)` / `get(length..)` slices and invokes
1408    /// the visitor (when `Some`) on the borrowed payload before
1409    /// advancing past it.
1410    #[cfg(feature = "lsm")]
1411    #[allow(clippy::type_complexity)]
1412    fn decode_all_impl(
1413        &mut self,
1414        mut input: &[u8],
1415        mut output: &mut [u8],
1416        mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1417        mut skippable_visitor: Option<&mut dyn FnMut(u8, &[u8])>,
1418    ) -> Result<usize, FrameDecoderError> {
1419        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1420        let mut total_bytes_written = 0;
1421        while !input.is_empty() {
1422            match init_frame(self, &mut input) {
1423                Ok(_) => {}
1424                Err(FrameDecoderError::ReadFrameHeaderError(
1425                    crate::decoding::errors::ReadFrameHeaderError::SkipFrame {
1426                        magic_number,
1427                        length,
1428                    },
1429                )) => {
1430                    let length = length as usize;
1431                    // Visitor sees the payload slice BEFORE we advance
1432                    // past it. Borrowed slice — no allocation. The
1433                    // variant is the low nibble of the magic number
1434                    // (RFC 8878 §3.1.2). `read_frame_header` only emits
1435                    // SkipFrame for magic in 0x184D2A50..=0x184D2A5F, so
1436                    // the subtraction fits in 0..=15.
1437                    if input.len() < length {
1438                        return Err(FrameDecoderError::FailedToSkipFrame);
1439                    }
1440                    let (payload, rest) = input.split_at(length);
1441                    if let Some(visitor) = skippable_visitor.as_mut() {
1442                        let variant = (magic_number - 0x184D2A50) as u8;
1443                        visitor(variant, payload);
1444                    }
1445                    input = rest;
1446                    continue;
1447                }
1448                Err(e) => return Err(e),
1449            };
1450            // Per-frame direct-path dispatch. Now safe to route the
1451            // public `decode_all` here because
1452            // `UserSliceBackend::exec_sequence_inline` returns
1453            // `Result<(), ExecuteSequencesError>` instead of
1454            // panicking on capacity overflow; the error propagates
1455            // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1456            // active dict, remaining `output` slice has WILDCOPY
1457            // slack) puts the frame on the fast path that bypasses
1458            // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1459            // frames (no FCS, active dict, output too small for
1460            // slack) fall through to the legacy `decode_blocks` +
1461            // `read` drain loop below.
1462            let (content_size, fcs_declared, dict_active, window_size) = {
1463                let state_ref = self.state.as_ref().expect("init populated state");
1464                (
1465                    state_ref.frame_header.frame_content_size(),
1466                    state_ref.frame_header.fcs_declared(),
1467                    state_ref.using_dict.is_some(),
1468                    state_ref.frame_header.window_size().unwrap_or(0),
1469                )
1470            };
1471            let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1472            let direct_eligible =
1473                content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1474            if direct_eligible {
1475                let written = self.run_direct_decode(&mut input, output, content_size)?;
1476                output = &mut output[written..];
1477                total_bytes_written += written;
1478                continue;
1479            }
1480            // Non-direct fallback: pre-reserve the backing buffer to
1481            // `window_size` once so the per-block growth cycle is
1482            // skipped (see same comment on the no-lsm path above).
1483            if let Some(state) = self.state.as_mut() {
1484                state.decoder_scratch.reserve_buffer(window_size as usize);
1485            }
1486            let frame_start_total = total_bytes_written;
1487            loop {
1488                self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1489                let bytes_written = self
1490                    .read(output)
1491                    .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1492                output = &mut output[bytes_written..];
1493                total_bytes_written += bytes_written;
1494                if self.can_collect() != 0 {
1495                    return Err(FrameDecoderError::TargetTooSmall);
1496                }
1497                if self.is_finished() {
1498                    break;
1499                }
1500            }
1501            // Per-frame FCS validation on the legacy fallback path.
1502            // Use `fcs_declared()` (NOT `content_size > 0`) so an
1503            // empty frame with explicit FCS=0 on the wire still gets
1504            // validated.
1505            if fcs_declared {
1506                let produced = (total_bytes_written - frame_start_total) as u64;
1507                if produced != content_size {
1508                    return Err(FrameDecoderError::FrameContentSizeMismatch {
1509                        declared: content_size,
1510                        produced,
1511                    });
1512                }
1513            }
1514        }
1515
1516        Ok(total_bytes_written)
1517    }
1518
1519    /// Decode multiple frames into the output slice using a serialized dictionary.
1520    ///
1521    /// # Warning
1522    ///
1523    /// Each decoded frame is initialized with the parsed dictionary, even when a
1524    /// frame header omits the optional dictionary ID. Callers must only use this
1525    /// API when they already know the input frames were encoded with that
1526    /// dictionary; otherwise decoded output can be silently corrupted.
1527    pub fn decode_all_with_dict_bytes(
1528        &mut self,
1529        input: &[u8],
1530        output: &mut [u8],
1531        raw_dictionary: &[u8],
1532    ) -> Result<usize, FrameDecoderError> {
1533        let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
1534        self.decode_all_with_dict_handle(input, output, &dict)
1535    }
1536
1537    /// Decode multiple frames into the extra capacity of the output vector.
1538    ///
1539    /// `input` must contain an exact number of frames.
1540    ///
1541    /// `output` must have enough extra capacity to hold the decompressed data.
1542    /// This function reserves an additional [`WILDCOPY_OVERLENGTH`]
1543    /// bytes on top of the caller's capacity so the per-frame direct
1544    /// decode path stays eligible — that may grow the vector by up
1545    /// to that fixed amount via `Vec::reserve`. It will NOT grow
1546    /// further to fit the decompressed payload itself; the caller's
1547    /// pre-allocated capacity must already cover the data. If you
1548    /// don't know how large the output will be, use
1549    /// [`FrameDecoder::decode_blocks`] instead.
1550    ///
1551    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1552    ///
1553    /// The length of the output vector is updated to include the decompressed data.
1554    /// The length is not changed if an error occurs. The
1555    /// `WILDCOPY_OVERLENGTH` slack is internal — `output.len()` on
1556    /// return is the actual decompressed size, NOT the inflated
1557    /// capacity. Callers who pre-sized the Vec with
1558    /// `Vec::with_capacity(fcs)` see no functional change beyond
1559    /// the small one-time capacity bump.
1560    pub fn decode_all_to_vec(
1561        &mut self,
1562        input: &[u8],
1563        output: &mut Vec<u8>,
1564    ) -> Result<(), FrameDecoderError> {
1565        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1566        let len = output.len();
1567        // Reserve WILDCOPY slack on top of the caller's capacity so
1568        // `decode_all` can land on the direct path even when the
1569        // caller didn't pre-allocate slack themselves.
1570        output.reserve(WILDCOPY_OVERLENGTH);
1571        let cap = output.capacity();
1572        output.resize(cap, 0);
1573        match self.decode_all(input, &mut output[len..]) {
1574            Ok(bytes_written) => {
1575                let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
1576                output.resize(new_len, 0);
1577                Ok(())
1578            }
1579            Err(e) => {
1580                output.resize(len, 0);
1581                Err(e)
1582            }
1583        }
1584    }
1585
1586    /// Single-frame direct-decode path. Decodes one zstd frame into
1587    /// `output[..content_size]` via a stack-local
1588    /// `DecodeBuffer<UserSliceBackend>`, bypassing the per-block
1589    /// FlatBuf/Ring -> `read()` drain copy.
1590    ///
1591    /// # Preconditions (caller-enforced)
1592    ///
1593    /// - `self.init` (or `init_with_dict_handle`) was called for
1594    ///   this frame so `self.state` is populated.
1595    /// - `content_size` matches `self.state.frame_header
1596    ///   .frame_content_size()` and is `> 0` (caller already passed
1597    ///   the eligibility gate).
1598    /// - `output.len() >= content_size + WILDCOPY_OVERLENGTH`.
1599    /// - No active dictionary
1600    ///   (`self.state.using_dict.is_none()`).
1601    ///
1602    /// On return, `input` points at the byte immediately after the
1603    /// frame's checksum (or after the last block, when the frame
1604    /// has `content_checksum_flag = 0`). `self.state.frame_finished`
1605    /// is set so [`Self::is_finished`] reports `true`.
1606    fn run_direct_decode(
1607        &mut self,
1608        input: &mut &[u8],
1609        output: &mut [u8],
1610        content_size: u64,
1611    ) -> Result<usize, FrameDecoderError> {
1612        use super::block_decoder;
1613        use super::decode_buffer::DecodeBuffer;
1614        use super::scratch::DirectScratch;
1615        use super::user_slice_buf::UserSliceBackend;
1616        use crate::io::Read;
1617        use FrameDecoderError as err;
1618
1619        let state = self
1620            .state
1621            .as_mut()
1622            .expect("caller ensures init populated state");
1623
1624        // Borrow persistent fields out of whichever scratch variant
1625        // `init` produced (Flat for single_segment, Ring for
1626        // multi-segment) — both expose the same HUF/FSE/Vec
1627        // fields; only `buffer` differs and we don't use that here.
1628        // Macro-style binding avoids the closure / generic
1629        // gymnastics of returning multiple `&mut` from a match arm.
1630        let (huf, fse, offset_hist, literals_buffer, sequences, block_content_buffer, window_size) =
1631            match &mut state.decoder_scratch {
1632                DecoderScratchKind::Flat(s) => (
1633                    &mut s.huf,
1634                    &mut s.fse,
1635                    &mut s.offset_hist,
1636                    &mut s.literals_buffer,
1637                    &mut s.sequences,
1638                    &mut s.block_content_buffer,
1639                    s.buffer.window_size,
1640                ),
1641                DecoderScratchKind::Ring(s) => (
1642                    &mut s.huf,
1643                    &mut s.fse,
1644                    &mut s.offset_hist,
1645                    &mut s.literals_buffer,
1646                    &mut s.sequences,
1647                    &mut s.block_content_buffer,
1648                    s.buffer.window_size,
1649                ),
1650            };
1651        let backend = UserSliceBackend::from_slice(output);
1652        let buffer = DecodeBuffer::from_backend(backend, window_size);
1653        let mut direct = DirectScratch {
1654            huf,
1655            fse,
1656            offset_hist,
1657            literals_buffer,
1658            sequences,
1659            block_content_buffer,
1660            buffer,
1661        };
1662
1663        // Block loop. Mirrors `decode_blocks` (without the
1664        // strategy-bounded early exit — we always decode the whole
1665        // frame in one shot for the direct path). Keeps
1666        // `state.bytes_read_counter` / `state.block_counter` in
1667        // sync with `decode_blocks` so post-call accessors
1668        // (`bytes_read_from_source`, `blocks_decoded`) return
1669        // accurate values.
1670        let mut block_dec = block_decoder::new();
1671        // Track total output bytes against the declared
1672        // `frame_content_size` via the buffer's actual write
1673        // counter — `BlockHeader.decompressed_size` is 0 for
1674        // Compressed blocks (the header parser can't know the
1675        // expanded size before decoding the body), so per-header
1676        // tracking would always count 0 for those blocks and
1677        // miscount frames that aren't pure Raw/RLE.
1678        let mut produced: u64 = 0;
1679        // Per-block output ranges captured during the direct-path
1680        // loop. After the loop we re-borrow `output` (post-drop of
1681        // `direct`) and XXH64 each range into
1682        // `self.computed_block_checksums`, so the digests vector
1683        // stays consistent with the legacy `decode_blocks` path
1684        // regardless of which dispatch the frame took.
1685        // `Vec::new()` does not allocate, so this stays free when
1686        // `per_block_checksums_enabled` is false: the `push` and the
1687        // post-loop hashing loop are both gated by the same flag.
1688        #[cfg(all(feature = "lsm", feature = "hash"))]
1689        let mut block_ranges: alloc::vec::Vec<(usize, usize)> = alloc::vec::Vec::new();
1690        loop {
1691            #[cfg(all(feature = "lsm", feature = "hash"))]
1692            let produced_before: Option<usize> = if self.per_block_checksums_enabled {
1693                Some(produced as usize)
1694            } else {
1695                None
1696            };
1697            let (block_header, hsize) = block_dec
1698                .read_block_header(&mut *input)
1699                .map_err(err::FailedToReadBlockHeader)?;
1700            state.bytes_read_counter += u64::from(hsize);
1701            // Pre-flight FCS check ONLY for Raw / RLE blocks where
1702            // `decompressed_size` is the actual block output size.
1703            // For Compressed blocks the header field is 0; the
1704            // post-decode check below catches overflow via the
1705            // backend's actual write counter delta.
1706            let block_upper = u64::from(block_header.decompressed_size);
1707            if block_upper > 0 && produced + block_upper > content_size {
1708                // Frame is corrupt — Raw/RLE block headers claim
1709                // more output than the FCS allows.
1710                return Err(err::FrameContentSizeMismatch {
1711                    declared: content_size,
1712                    produced: produced + block_upper,
1713                });
1714            }
1715            // Slice-source fast path: consume the block body
1716            // straight from `input` without copying into the
1717            // persistent `block_content_buffer`.
1718            let body_consumed = match block_dec.decode_block_content_from_slice(
1719                &block_header,
1720                &mut direct,
1721                &mut *input,
1722            ) {
1723                Ok(n) => n,
1724                // Defense-in-depth: RLE / Raw block whose declared
1725                // `decompressed_size` slipped past the per-block
1726                // pre-flight above and tripped the backend's
1727                // fallible write surface.
1728                Err(crate::decoding::errors::DecodeBlockContentError::BackendOverflow {
1729                    ..
1730                }) => {
1731                    // Use saturating_add on the
1732                    // `produced + decompressed_size` sum. Each block
1733                    // is bounded by 128 KiB (MAX_BLOCK_SIZE), but
1734                    // accumulated `produced` can grow toward
1735                    // u64::MAX across adversarial frames. Saturating
1736                    // avoids a panic on the error path itself.
1737                    return Err(err::FrameContentSizeMismatch {
1738                        declared: content_size,
1739                        produced: produced
1740                            .saturating_add(u64::from(block_header.decompressed_size)),
1741                    });
1742                }
1743                Err(e) => return Err(err::FailedToReadBlockBody(e)),
1744            };
1745            produced = direct.buffer.buffer_ref().tail() as u64;
1746            // Post-decode FCS overflow check.
1747            if produced > content_size {
1748                return Err(err::FrameContentSizeMismatch {
1749                    declared: content_size,
1750                    produced,
1751                });
1752            }
1753            state.bytes_read_counter += body_consumed;
1754            state.block_counter += 1;
1755            #[cfg(all(feature = "lsm", feature = "hash"))]
1756            if let Some(produced_before) = produced_before {
1757                block_ranges.push((produced_before, produced as usize));
1758            }
1759            // Cap the visible buffer at window_size between blocks
1760            // so the next block's match-offset validation matches
1761            // the spec's `offset <= window_size` rule.
1762            direct.buffer.drop_to_window_size();
1763            if block_header.last_block {
1764                if state.frame_header.descriptor.content_checksum_flag() {
1765                    let mut chksum = [0u8; 4];
1766                    input
1767                        .read_exact(&mut chksum)
1768                        .map_err(err::FailedToReadChecksum)?;
1769                    state.bytes_read_counter += 4;
1770                    state.check_sum = Some(u32::from_le_bytes(chksum));
1771                }
1772                break;
1773            }
1774        }
1775        // Final sanity: blocks summed to exactly `content_size`.
1776        if produced != content_size {
1777            return Err(err::FrameContentSizeMismatch {
1778                declared: content_size,
1779                produced,
1780            });
1781        }
1782
1783        let written = content_size as usize;
1784        state.frame_finished = true;
1785        // Drop the stack-local DirectScratch (and its DecodeBuffer
1786        // borrow on `output`) so we can re-borrow `output` for the
1787        // hash pass below.
1788        drop(direct);
1789        // Per-block XXH64 (low 32 bits) over the captured ranges.
1790        // Mirrors `decode_blocks`' per-block hashing so the digests
1791        // vector stays identical regardless of which dispatch path
1792        // the frame took. Ranges were recorded inside the loop while
1793        // `direct` held a mutable borrow on `output`; now that the
1794        // borrow is dropped we can read the slices directly.
1795        #[cfg(all(feature = "lsm", feature = "hash"))]
1796        if self.per_block_checksums_enabled {
1797            use core::hash::Hasher;
1798            for (start, end) in &block_ranges {
1799                let mut h = twox_hash::XxHash64::with_seed(0);
1800                h.write(&output[*start..*end]);
1801                self.computed_block_checksums.push(h.finish() as u32);
1802            }
1803        }
1804        #[cfg(feature = "hash")]
1805        if state.frame_header.descriptor.content_checksum_flag() {
1806            // Direct path bypasses the per-write hash accounting
1807            // (DecodeBuffer hashes during drain; the direct path
1808            // never drains because the user slice IS the buffer).
1809            // Walk the decoded output once and propagate the
1810            // resulting hasher state so the frame-tail XXH64 check
1811            // (in the block loop above) can verify the digest.
1812            //
1813            // Gated on `content_checksum_flag`: frames without a
1814            // trailing checksum byte have nothing to verify, and the
1815            // 1 MiB+ second-pass scan dominated decode wall time
1816            // (63 % of total on z000033 L-3 in the standalone perf
1817            // loop). `get_calculated_checksum()` now returns `None`
1818            // for flag-off frames, matching this skip.
1819            use core::hash::Hasher;
1820            let mut hasher = twox_hash::XxHash64::with_seed(0);
1821            hasher.write(&output[..written]);
1822            match &mut state.decoder_scratch {
1823                DecoderScratchKind::Flat(s) => s.buffer.hash = hasher,
1824                DecoderScratchKind::Ring(s) => s.buffer.hash = hasher,
1825            }
1826        }
1827        Ok(written)
1828    }
1829}
1830
1831/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
1832/// this will retain window_size bytes, else it will drain it completely
1833impl Read for FrameDecoder {
1834    fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
1835        let state = match &mut self.state {
1836            None => return Ok(0),
1837            Some(s) => s,
1838        };
1839        if state.frame_finished {
1840            state.decoder_scratch.buffer_read_all(target)
1841        } else {
1842            state.decoder_scratch.buffer_read(target)
1843        }
1844    }
1845}
1846
1847#[cfg(test)]
1848mod tests {
1849    extern crate std;
1850
1851    use super::{DictionaryHandle, FrameDecoder};
1852    use crate::encoding::{CompressionLevel, FrameCompressor};
1853    use alloc::vec::Vec;
1854
1855    #[test]
1856    fn decode_all_legacy_drain_matches_direct_path_on_single_segment_frame() {
1857        // Roundtrip a small payload through the encoder, then decode
1858        // it via `decode_all` on two output shapes that select
1859        // different internal paths:
1860        //   1. Tight output (no WILDCOPY_OVERLENGTH slack) → legacy
1861        //      `decode_blocks` + `read()` drain path.
1862        //   2. Output with WILDCOPY slack → direct
1863        //      `run_direct_decode` + `UserSliceBackend` path.
1864        // Both paths must produce identical output bytes — the only
1865        // difference is the internal buffer/drain shape, not the
1866        // decoded semantics. This is the regression gate for the
1867        // direct-decode wiring.
1868        let payload: Vec<u8> = (0..4096u32).map(|i| (i & 0xFF) as u8).collect();
1869        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1870        compressor.set_source(payload.as_slice());
1871        let mut compressed = Vec::new();
1872        compressor.set_drain(&mut compressed);
1873        compressor.compress();
1874
1875        // Baseline: tight output → legacy drain path.
1876        let mut dec_a = FrameDecoder::new();
1877        let mut out_a = alloc::vec![0u8; payload.len()];
1878        let n_a = dec_a
1879            .decode_all(compressed.as_slice(), &mut out_a)
1880            .expect("decode_all (legacy drain) should succeed");
1881        assert_eq!(n_a, payload.len());
1882        assert_eq!(&out_a[..n_a], payload.as_slice());
1883
1884        // Direct: output with WILDCOPY slack → direct path.
1885        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1886        let mut dec_b = FrameDecoder::new();
1887        let mut out_b = alloc::vec![0u8; payload.len() + slack];
1888        let n_b = dec_b
1889            .decode_all(compressed.as_slice(), &mut out_b)
1890            .expect("decode_all (direct path) should succeed");
1891        assert_eq!(
1892            n_b,
1893            payload.len(),
1894            "direct decode produced wrong byte count"
1895        );
1896        assert_eq!(&out_b[..n_b], payload.as_slice());
1897    }
1898
1899    #[test]
1900    fn decode_all_multi_segment_frame_decodes_correctly() {
1901        // Multi-segment frame: payload large enough that the
1902        // encoder's default frame layout has `single_segment_flag =
1903        // false` and `window_size < frame_content_size`. The direct
1904        // path must cap the visible buffer at window_size after each
1905        // block (drop_to_window_size) so match-offset validation
1906        // matches the spec rule `offset <= window_size`, and still
1907        // produce the same bytes as decode_all on the
1908        // FlatBuf/Ring-backed path.
1909        //
1910        // Make the payload structured so multi-segment behavior
1911        // actually kicks in: 2 MiB of repeating + random-ish bytes
1912        // forces window_size lower than content_size at the encoder.
1913        let mut payload: Vec<u8> = Vec::with_capacity(2 * 1024 * 1024);
1914        for i in 0..payload.capacity() {
1915            payload.push((i.wrapping_mul(2_654_435_761) & 0xFF) as u8);
1916        }
1917        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1918        compressor.set_source(payload.as_slice());
1919        let mut compressed = Vec::new();
1920        compressor.set_drain(&mut compressed);
1921        compressor.compress();
1922
1923        // Baseline: decode_all through the FlatBuf+drain path.
1924        let mut dec_a = FrameDecoder::new();
1925        let mut out_a = alloc::vec![0u8; payload.len()];
1926        let n_a = dec_a
1927            .decode_all(compressed.as_slice(), &mut out_a)
1928            .expect("decode_all should succeed");
1929        assert_eq!(n_a, payload.len());
1930        assert_eq!(&out_a[..n_a], payload.as_slice());
1931
1932        // Direct path: must give identical bytes via UserSliceBackend
1933        // + per-block drop_to_window_size.
1934        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1935        let mut dec_b = FrameDecoder::new();
1936        let mut out_b = alloc::vec![0u8; payload.len() + slack];
1937        let n_b = dec_b
1938            .decode_all(compressed.as_slice(), &mut out_b)
1939            .expect("decode_all should succeed on multi-segment frame");
1940        assert_eq!(n_b, payload.len(), "wrong byte count on direct path");
1941        assert_eq!(&out_b[..n_b], payload.as_slice());
1942
1943        // Sanity-check: confirm the encoded frame really IS
1944        // multi-segment. If a future encoder default changes,
1945        // catching the assumption here is better than silently
1946        // testing single_segment on this name.
1947        let mut sanity = FrameDecoder::new();
1948        sanity.init(&mut compressed.as_slice()).unwrap();
1949        assert!(
1950            !sanity
1951                .state
1952                .as_ref()
1953                .unwrap()
1954                .frame_header
1955                .descriptor
1956                .single_segment_flag(),
1957            "test precondition violated: frame is single-segment, rename or resize"
1958        );
1959    }
1960
1961    #[cfg(feature = "hash")]
1962    #[test]
1963    fn decode_all_propagates_checksum_into_persistent_scratch() {
1964        // Direct path on a checksum-flagged frame: the FrameCompressor
1965        // under `feature = "hash"` sets content_checksum_flag, so the
1966        // decoded frame has a recorded checksum. After
1967        // decode_all we must be able to verify it matches via
1968        // the public get_calculated_checksum() accessor — the digest
1969        // is computed by walking output at end of decode and stored
1970        // into the persistent scratch's hasher.
1971        let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
1972        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1973        compressor.set_source(payload.as_slice());
1974        let mut compressed = Vec::new();
1975        compressor.set_drain(&mut compressed);
1976        compressor.compress();
1977
1978        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1979        let mut dec = FrameDecoder::new();
1980        let mut out = alloc::vec![0u8; payload.len() + slack];
1981        let n = dec
1982            .decode_all(compressed.as_slice(), &mut out)
1983            .expect("decode_all with checksum must succeed");
1984        assert_eq!(n, payload.len());
1985        assert_eq!(&out[..n], payload.as_slice());
1986
1987        // Both sides must report the same checksum: the frame header
1988        // carries the stored u32, and get_calculated_checksum reads
1989        // the running digest the direct path just propagated.
1990        let stored = dec.get_checksum_from_data();
1991        let calculated = dec.get_calculated_checksum();
1992        assert!(stored.is_some(), "frame must carry stored checksum");
1993        assert!(
1994            calculated.is_some(),
1995            "direct path must propagate calculated checksum"
1996        );
1997        assert_eq!(
1998            stored, calculated,
1999            "stored vs calculated checksum mismatch on direct path"
2000        );
2001    }
2002
2003    #[test]
2004    fn decode_all_fcs_overflow_via_corrupt_frame_returns_structured_error() {
2005        // Hand-build a corrupt frame that declares
2006        // frame_content_size = 4 but the (last) block carries a
2007        // larger Raw payload. The pre-flight FCS check inside the
2008        // direct path's block loop catches this and returns the
2009        // structured FrameContentSizeMismatch variant — not a
2010        // panic, not a generic TargetTooSmall.
2011        //
2012        // Frame layout (single_segment, FCS=4):
2013        //   magic            4 bytes  0xFD2FB528
2014        //   FHD              1 byte   single_segment=1, no checksum,
2015        //                              FCS field size = 0 (-> 1-byte FCS)
2016        //   FCS              1 byte   0x04
2017        //   block_header     3 bytes  last=1, type=Raw, block_size=10
2018        //   block_payload    10 bytes 0xAA repeated
2019        let mut frame = alloc::vec::Vec::new();
2020        // magic
2021        frame.extend_from_slice(&0xFD2FB528u32.to_le_bytes());
2022        // FHD: single_segment=1, fcs_flag=0 (1-byte FCS), no checksum,
2023        // no dict. Bit layout: FCS(7-6)=0, single_segment(5)=1,
2024        // reserved/uncs(4)=0, content_checksum(2)=0, dict(0-1)=00.
2025        frame.push(0b0010_0000);
2026        // FCS: 1 byte
2027        frame.push(4);
2028        // Block header: cBlockSize=10, type=Raw (0), last=1
2029        // 3-byte LE: bit0=last, bits1-2=type(2 bits), bits3-23=size
2030        let cblock_size: u32 = 10;
2031        let bh: u32 = 1 | (cblock_size << 3); // last=1, type=Raw=0
2032        frame.push((bh & 0xFF) as u8);
2033        frame.push((bh >> 8) as u8);
2034        frame.push((bh >> 16) as u8);
2035        // Payload — 10 bytes that, if decoded, would exceed FCS=4.
2036        frame.extend(core::iter::repeat_n(0xAAu8, 10));
2037
2038        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
2039        let mut dec = FrameDecoder::new();
2040        let mut out = alloc::vec![0u8; 4 + slack];
2041        let err = dec
2042            .decode_all(&frame, &mut out)
2043            .expect_err("FCS-overflow frame must fail decode");
2044        assert!(
2045            matches!(
2046                err,
2047                super::FrameDecoderError::FrameContentSizeMismatch { .. }
2048            ),
2049            "expected FrameContentSizeMismatch, got {:?}",
2050            err
2051        );
2052    }
2053
2054    #[test]
2055    fn decode_all_falls_back_when_output_too_small_for_wildcopy_slack() {
2056        // Output sized exactly to frame_content_size (no
2057        // WILDCOPY_OVERLENGTH slack) must NOT trigger the direct
2058        // path — the burst's `extend_from_within_unchecked` writes
2059        // past `tail` into the slack region. Direct dispatcher
2060        // recognises this and falls back to the FlatBuf + drain
2061        // path which still produces the right output.
2062        let payload: Vec<u8> = (0..2048u32)
2063            .map(|i| (i.wrapping_mul(31) & 0xFF) as u8)
2064            .collect();
2065        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2066        compressor.set_source(payload.as_slice());
2067        let mut compressed = Vec::new();
2068        compressor.set_drain(&mut compressed);
2069        compressor.compress();
2070
2071        let mut dec = FrameDecoder::new();
2072        // Exactly payload.len(), no slack — direct path is gated out.
2073        let mut out = alloc::vec![0u8; payload.len()];
2074        let n = dec
2075            .decode_all(compressed.as_slice(), &mut out)
2076            .expect("decode_all should still succeed via fallback");
2077        assert_eq!(n, payload.len());
2078        assert_eq!(&out[..n], payload.as_slice());
2079    }
2080
2081    #[test]
2082    fn decode_all_fallback_validates_fcs_against_total_output() {
2083        // Synthetic single-segment frame: FCS = 20 bytes, but the
2084        // last-block flag fires after only 4 bytes of raw payload.
2085        // On the direct path this would trip the post-block
2086        // `produced > content_size` check; the fallback path
2087        // (eligible=false because output is sized exactly to FCS,
2088        // no WILDCOPY slack) used to silently return Ok(4). With
2089        // the fix it now surfaces `FrameContentSizeMismatch`
2090        // matching the direct path.
2091        //
2092        // Frame layout: 4 B magic | 1 B FHD (single_segment=1,
2093        // FCS_flag=3 → 8-byte FCS) | 8 B FCS=20 | block header
2094        // (Raw, last, size=4) | 4 raw bytes.
2095        let mut wire = Vec::new();
2096        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes()); // magic
2097        // FHD: FCS_flag=3 (8-byte FCS) <<6 | single_segment=1 <<5.
2098        wire.push(0b1110_0000);
2099        wire.extend_from_slice(&20u64.to_le_bytes()); // declared FCS
2100        // Block header: (size << 3) | (block_type << 1) | last_block.
2101        // Raw block (block_type=0), last_block=1, size=4 → 0b00100001 = 0x21.
2102        wire.push(0x21);
2103        wire.push(0x00);
2104        wire.push(0x00);
2105        wire.extend_from_slice(&[1u8, 2, 3, 4]);
2106
2107        let mut dec = FrameDecoder::new();
2108        // Size output exactly at declared FCS (no WILDCOPY slack)
2109        // so the eligibility check gates the direct path out.
2110        let mut out = alloc::vec![0u8; 20];
2111        let err = dec
2112            .decode_all(wire.as_slice(), &mut out)
2113            .expect_err("fallback must reject corrupt FCS underflow");
2114        match err {
2115            crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2116                declared,
2117                produced,
2118            } => {
2119                assert_eq!(declared, 20);
2120                assert_eq!(produced, 4);
2121            }
2122            other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2123        }
2124    }
2125
2126    #[test]
2127    fn decode_all_fallback_treats_explicit_fcs_zero_as_declared() {
2128        // Synthetic multi-segment frame with FCS_flag=2 (4-byte
2129        // FCS) explicitly set to 0. The header DECLARES zero
2130        // content, but the body carries a 5-byte raw last-block.
2131        // `fcs_declared()` must return true (the field is on the
2132        // wire) so the fallback's post-decode size check sees the
2133        // mismatch — even though `frame_content_size == 0`. This
2134        // is exactly the FCS=0 edge case where the previous
2135        // `content_size > 0` proxy would have silently accepted
2136        // the corrupt frame.
2137        //
2138        // Frame layout:
2139        //   4 B magic            — 28 B5 2F FD
2140        //   1 B FHD              — FCS_flag=2 (bits 7-6), no
2141        //                          single_segment, content_checksum=0,
2142        //                          dict_id_flag=0 → 0b1000_0000
2143        //   1 B window_descriptor — exp=10, mantissa=0 → window=1 MiB
2144        //   4 B FCS              — 0 LE
2145        //   3 B block header     — raw, last, size=5 → 0x29 0x00 0x00
2146        //   5 B raw payload      — anything non-empty
2147        let mut wire = Vec::new();
2148        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2149        wire.push(0b1000_0000); // FHD: FCS_flag=2, others 0.
2150        wire.push(0x50); // window_descriptor: exp=10, mantissa=0.
2151        wire.extend_from_slice(&0u32.to_le_bytes()); // FCS = 0.
2152        // Block header (24-bit LE): (size << 3) | (block_type << 1) | last_block
2153        // = (5 << 3) | (0 << 1) | 1 = 0x29.
2154        wire.push(0x29);
2155        wire.push(0x00);
2156        wire.push(0x00);
2157        wire.extend_from_slice(&[1u8, 2, 3, 4, 5]);
2158
2159        let mut dec = FrameDecoder::new();
2160        // FCS=0 declared, so eligibility (`content_size > 0`)
2161        // false — falls through to the drain loop. Output buffer
2162        // size doesn't matter for the eligibility check here;
2163        // give it some room so `read()` can drain the block.
2164        let mut out = alloc::vec![0u8; 16];
2165        let err = dec
2166            .decode_all(wire.as_slice(), &mut out)
2167            .expect_err("corrupt FCS=0 + 5-byte block must error");
2168        match err {
2169            crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2170                declared,
2171                produced,
2172            } => {
2173                assert_eq!(declared, 0);
2174                assert_eq!(produced, 5);
2175            }
2176            other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2177        }
2178    }
2179
2180    #[test]
2181    fn decode_all_fallback_accepts_honest_explicit_fcs_zero() {
2182        // Companion to the corrupt-FCS=0 test above: an HONEST
2183        // empty frame with FCS_flag=2 (4-byte FCS) explicitly set
2184        // to 0 AND a 0-byte raw last-block. `fcs_declared()`
2185        // returns true and `content_size == 0 == total_written`,
2186        // so the fallback validation accepts the frame instead of
2187        // misreporting a mismatch.
2188        //
2189        // (Single-segment FCS=0 would test a similar invariant
2190        // but trips header-stage validation: `window_size =
2191        // frame_content_size = 0 < MIN_WINDOW_SIZE` fails the
2192        // window-size sanity check before decode runs. Use the
2193        // multi-segment shape where `window_size` comes from
2194        // `window_descriptor` independently of FCS.)
2195        //
2196        // Frame layout:
2197        //   4 B magic
2198        //   1 B FHD              — FCS_flag=2, others 0 → 0x80
2199        //   1 B window_descriptor — exp=10 → 1 MiB window
2200        //   4 B FCS              — 0 LE
2201        //   3 B block header     — raw, last, size=0 → 0x01 0x00 0x00
2202        let mut wire = Vec::new();
2203        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2204        wire.push(0b1000_0000);
2205        wire.push(0x50);
2206        wire.extend_from_slice(&0u32.to_le_bytes());
2207        // Block header: (0 << 3) | (0 << 1) | 1 = 0x01.
2208        wire.push(0x01);
2209        wire.push(0x00);
2210        wire.push(0x00);
2211
2212        let mut dec = FrameDecoder::new();
2213        let mut out = alloc::vec![0u8; 16];
2214        let n = dec
2215            .decode_all(wire.as_slice(), &mut out)
2216            .expect("honest FCS=0 + empty block must succeed");
2217        assert_eq!(n, 0);
2218    }
2219
2220    #[test]
2221    fn reset_with_dict_handle_applies_dict_when_no_dict_id() {
2222        let payload = b"reset-without-dict-id";
2223        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2224        compressor.set_source(payload.as_slice());
2225        let mut compressed = Vec::new();
2226        compressor.set_drain(&mut compressed);
2227        compressor.compress();
2228
2229        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2230        let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
2231
2232        let mut decoder = FrameDecoder::new();
2233        decoder
2234            .reset_with_dict_handle(compressed.as_slice(), &handle)
2235            .expect("reset should succeed");
2236        let state = decoder.state.as_ref().expect("state should be initialized");
2237        assert!(state.frame_header.dictionary_id().is_none());
2238        assert_eq!(state.using_dict, Some(handle.id()));
2239    }
2240
2241    #[cfg(feature = "lsm")]
2242    mod expect_validation {
2243        use super::*;
2244        use crate::decoding::errors::FrameDecoderError;
2245
2246        fn compress(payload: &[u8]) -> Vec<u8> {
2247            let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2248            compressor.set_source(payload);
2249            let mut compressed = Vec::new();
2250            compressor.set_drain(&mut compressed);
2251            compressor.compress();
2252            compressed
2253        }
2254
2255        fn compress_with_dict(payload: &[u8], dict_raw: &[u8]) -> Vec<u8> {
2256            let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2257            compressor
2258                .set_dictionary_from_bytes(dict_raw)
2259                .expect("dict load");
2260            compressor.set_source(payload);
2261            let mut compressed = Vec::new();
2262            compressor.set_drain(&mut compressed);
2263            compressor.compress();
2264            compressed
2265        }
2266
2267        #[test]
2268        fn expect_dict_id_none_default_allows_anything() {
2269            let compressed = compress(b"hello-no-expect");
2270            let mut decoder = FrameDecoder::new();
2271            decoder
2272                .reset(compressed.as_slice())
2273                .expect("default None passes");
2274        }
2275
2276        #[test]
2277        fn expect_dict_id_zero_matches_frame_without_dict_id() {
2278            // Default-encoded frame has no dict_id; pinning Some(0)
2279            // ("no dictionary expected") must accept it.
2280            let compressed = compress(b"payload");
2281            let mut decoder = FrameDecoder::new();
2282            decoder.expect_dict_id(Some(0));
2283            decoder
2284                .reset(compressed.as_slice())
2285                .expect("Some(0) ~ None");
2286        }
2287
2288        #[test]
2289        fn expect_dict_id_matching_value_passes() {
2290            let dict_raw = include_bytes!("../../dict_tests/dictionary");
2291            let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2292            let actual_id = handle.id();
2293
2294            let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2295
2296            let mut decoder = FrameDecoder::new();
2297            decoder.expect_dict_id(Some(actual_id));
2298            // Decode requires the dict to be registered; using
2299            // reset_with_dict_handle for that.
2300            decoder
2301                .reset_with_dict_handle(compressed.as_slice(), &handle)
2302                .expect("matching dict_id passes");
2303        }
2304
2305        #[test]
2306        fn expect_dict_id_mismatching_value_fails_before_decode() {
2307            let dict_raw = include_bytes!("../../dict_tests/dictionary");
2308            let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2309            let actual_id = handle.id();
2310            let wrong_id = actual_id.wrapping_add(1);
2311
2312            let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2313
2314            let mut decoder = FrameDecoder::new();
2315            decoder.expect_dict_id(Some(wrong_id));
2316            let err = decoder
2317                .reset_with_dict_handle(compressed.as_slice(), &handle)
2318                .expect_err("mismatch must fail");
2319            match err {
2320                FrameDecoderError::UnexpectedDictId { expected, found } => {
2321                    assert_eq!(expected, Some(wrong_id));
2322                    assert_eq!(found, Some(actual_id));
2323                }
2324                other => panic!("expected UnexpectedDictId, got {other:?}"),
2325            }
2326        }
2327
2328        #[test]
2329        fn expect_dict_id_nonzero_fails_on_frame_without_dict_id() {
2330            // Frame has no dict_id; expecting Some(42) (non-zero)
2331            // must fail with found = None.
2332            let compressed = compress(b"no-dict-frame");
2333            let mut decoder = FrameDecoder::new();
2334            decoder.expect_dict_id(Some(42));
2335            let err = decoder
2336                .reset(compressed.as_slice())
2337                .expect_err("nonzero expectation on dictless frame must fail");
2338            match err {
2339                FrameDecoderError::UnexpectedDictId { expected, found } => {
2340                    assert_eq!(expected, Some(42));
2341                    assert_eq!(found, None);
2342                }
2343                other => panic!("expected UnexpectedDictId, got {other:?}"),
2344            }
2345        }
2346
2347        #[test]
2348        fn expect_window_descriptor_none_default_allows_anything() {
2349            let compressed = compress(b"hello-no-wd-expect");
2350            let mut decoder = FrameDecoder::new();
2351            decoder
2352                .reset(compressed.as_slice())
2353                .expect("default None passes");
2354        }
2355
2356        #[test]
2357        fn expect_window_descriptor_mismatch_fails_before_decode() {
2358            // Compress a payload large enough to force a
2359            // multi-segment frame (window_descriptor on wire).
2360            // Default compression at >256 KiB produces multi-
2361            // segment frames with a real window_descriptor byte.
2362            let payload = alloc::vec![0xABu8; 512 * 1024];
2363            let compressed = compress(&payload);
2364
2365            // Read the actual window_descriptor by decoding once
2366            // without expectations, then pin a wrong value.
2367            let mut probe_decoder = FrameDecoder::new();
2368            probe_decoder.reset(compressed.as_slice()).unwrap();
2369            let probe_state = probe_decoder.state.as_ref().unwrap();
2370            let actual_wd = probe_state
2371                .frame_header
2372                .window_descriptor()
2373                .expect("multi-segment frame should expose window_descriptor");
2374            let wrong_wd = actual_wd.wrapping_add(0x10); // bump exponent
2375
2376            let mut decoder = FrameDecoder::new();
2377            decoder.expect_window_descriptor(Some(wrong_wd));
2378            let err = decoder
2379                .reset(compressed.as_slice())
2380                .expect_err("wrong window_descriptor must fail");
2381            match err {
2382                FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2383                    assert_eq!(expected, wrong_wd);
2384                    assert_eq!(found, Some(actual_wd));
2385                }
2386                other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2387            }
2388        }
2389
2390        /// Build a minimal synthetic single-segment zstd frame
2391        /// carrying a 4-byte raw payload. RFC 8878 §3.1.1.1
2392        /// layout, hand-rolled because our default
2393        /// `FrameCompressor` settings don't emit
2394        /// `single_segment_flag` for tiny inputs.
2395        ///
2396        /// Wire bytes (13 total for 4-byte payload):
2397        /// ```text
2398        /// 28 B5 2F FD       magic
2399        /// 20                FHD: single_segment=1, FCS_flag=0
2400        /// 04                FCS (single byte, value = payload.len())
2401        /// 21 00 00          block header: raw, last, size=4
2402        /// .. .. .. ..       payload bytes
2403        /// ```
2404        fn synth_single_segment_frame(payload: &[u8]) -> Vec<u8> {
2405            assert!(payload.len() <= 255, "1-byte FCS field caps at 255");
2406            assert!(payload.len() < (1usize << 21), "block size 21-bit max");
2407            let mut out = Vec::new();
2408            // Magic 0xFD2FB528 LE.
2409            out.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2410            // FHD: single_segment_flag (bit 5) set, everything
2411            // else zero. With single_segment + FCS_flag=0 the FCS
2412            // field is 1 byte. No window_descriptor on wire.
2413            out.push(0b0010_0000);
2414            // 1-byte FCS = payload length.
2415            out.push(payload.len() as u8);
2416            // Block header (3 bytes LE):
2417            // last_block=1, block_type=0 (Raw), block_size=payload.len().
2418            // Encoded: (size << 3) | (block_type << 1) | last_block.
2419            // Block header: last_block flag in bit 0, block_type
2420            // (0 = Raw) in bits 1-2, block size in bits 3+.
2421            let bh: u32 = ((payload.len() as u32) << 3) | 1;
2422            out.push((bh & 0xFF) as u8);
2423            out.push(((bh >> 8) & 0xFF) as u8);
2424            out.push(((bh >> 16) & 0xFF) as u8);
2425            // Raw payload.
2426            out.extend_from_slice(payload);
2427            out
2428        }
2429
2430        #[test]
2431        fn expect_window_descriptor_on_single_segment_frame_fails_with_found_none() {
2432            // Single-segment frames omit the window_descriptor
2433            // byte from the wire entirely. Setting an expectation
2434            // here must surface `found: None` so callers
2435            // distinguish "wrong descriptor" from "no descriptor
2436            // on the wire" — never silently pass.
2437            let compressed = synth_single_segment_frame(b"tiny");
2438
2439            // First sanity-check: the synthetic frame decodes
2440            // cleanly without any expectation.
2441            {
2442                let mut probe = FrameDecoder::new();
2443                probe
2444                    .reset(compressed.as_slice())
2445                    .expect("synth frame parses");
2446                let probe_state = probe.state.as_ref().unwrap();
2447                assert!(
2448                    probe_state.frame_header.window_descriptor().is_none(),
2449                    "synth frame must be single-segment"
2450                );
2451            }
2452
2453            let mut decoder = FrameDecoder::new();
2454            decoder.expect_window_descriptor(Some(0x40));
2455            let err = decoder
2456                .reset(compressed.as_slice())
2457                .expect_err("single-segment + expectation must fail");
2458            match err {
2459                FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2460                    assert_eq!(expected, 0x40);
2461                    assert_eq!(found, None);
2462                }
2463                other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2464            }
2465        }
2466
2467        #[test]
2468        fn validation_failure_leaves_decoder_re_resettable() {
2469            // After UnexpectedDictId on a wrong-expectation reset,
2470            // clearing the expectation and re-calling reset must
2471            // succeed on the same source — no lingering failed
2472            // state.
2473            let compressed = compress(b"re-resettable");
2474
2475            let mut decoder = FrameDecoder::new();
2476            decoder.expect_dict_id(Some(42));
2477            let err = decoder
2478                .reset(compressed.as_slice())
2479                .expect_err("first reset fails");
2480            assert!(matches!(err, FrameDecoderError::UnexpectedDictId { .. }));
2481
2482            // Clear expectation and retry on a fresh source.
2483            decoder.expect_dict_id(None);
2484            decoder
2485                .reset(compressed.as_slice())
2486                .expect("retry after clearing expectation should succeed");
2487        }
2488    }
2489
2490    /// Build a skippable frame on the wire: 4-byte LE magic + 4-byte LE
2491    /// length + payload bytes. RFC 8878 §3.1.2 restricts the magic
2492    /// variant to `0..=15`; assert here so accidental misuse of the
2493    /// helper can't smuggle a non-skippable magic past the tests.
2494    #[cfg(feature = "lsm")]
2495    fn build_skippable_frame(variant: u8, payload: &[u8]) -> Vec<u8> {
2496        assert!(
2497            variant <= 15,
2498            "skippable-frame variant {variant} outside RFC 8878 0..=15 range",
2499        );
2500        let mut out = Vec::with_capacity(8 + payload.len());
2501        let magic: u32 = 0x184D2A50 + u32::from(variant);
2502        out.extend_from_slice(&magic.to_le_bytes());
2503        out.extend_from_slice(&u32::try_from(payload.len()).unwrap().to_le_bytes());
2504        out.extend_from_slice(payload);
2505        out
2506    }
2507
2508    #[cfg(feature = "lsm")]
2509    #[test]
2510    fn decode_all_with_skippable_visitor_sees_payloads_in_order() {
2511        // Build a stream: skippable(v0, "alpha") + zstd_frame +
2512        // skippable(v3, "beta") + zstd_frame + skippable(v15, "")
2513        // and verify the visitor is invoked exactly three times with
2514        // the correct (variant, payload) pairs in stream order while
2515        // the zstd frames decode normally.
2516        let payload_a: Vec<u8> = (0..256u16).map(|i| i as u8).collect();
2517        let payload_b: Vec<u8> = (0..256u16).map(|i| (i ^ 0xAA) as u8).collect();
2518
2519        let mut comp_a = Vec::new();
2520        let mut c = FrameCompressor::new(CompressionLevel::Default);
2521        c.set_source(payload_a.as_slice());
2522        c.set_drain(&mut comp_a);
2523        c.compress();
2524
2525        let mut comp_b = Vec::new();
2526        let mut c = FrameCompressor::new(CompressionLevel::Default);
2527        c.set_source(payload_b.as_slice());
2528        c.set_drain(&mut comp_b);
2529        c.compress();
2530
2531        let skip0 = build_skippable_frame(0, b"alpha");
2532        let skip3 = build_skippable_frame(3, b"beta");
2533        let skip15 = build_skippable_frame(15, &[]);
2534
2535        let mut stream = Vec::new();
2536        stream.extend_from_slice(&skip0);
2537        stream.extend_from_slice(&comp_a);
2538        stream.extend_from_slice(&skip3);
2539        stream.extend_from_slice(&comp_b);
2540        stream.extend_from_slice(&skip15);
2541
2542        let mut decoder = FrameDecoder::new();
2543        let mut out = alloc::vec![0u8; payload_a.len() + payload_b.len()];
2544        let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
2545        let n = decoder
2546            .decode_all_with_skippable_visitor(stream.as_slice(), &mut out, |variant, payload| {
2547                collected.push((variant, payload.to_vec()));
2548            })
2549            .expect("decode_all_with_skippable_visitor should succeed");
2550
2551        // All three skippables visited in stream order.
2552        assert_eq!(collected.len(), 3);
2553        assert_eq!(collected[0], (0u8, b"alpha".to_vec()));
2554        assert_eq!(collected[1], (3u8, b"beta".to_vec()));
2555        assert_eq!(collected[2], (15u8, Vec::<u8>::new()));
2556
2557        // Both zstd frames decoded into `out` back-to-back.
2558        assert_eq!(n, payload_a.len() + payload_b.len());
2559        assert_eq!(&out[..payload_a.len()], payload_a.as_slice());
2560        assert_eq!(&out[payload_a.len()..n], payload_b.as_slice());
2561    }
2562
2563    #[cfg(feature = "lsm")]
2564    #[test]
2565    fn decode_all_silently_skips_when_no_visitor() {
2566        // Regression gate: plain decode_all must still silently skip
2567        // skippable frames (RFC 8878 mandated behavior) with no
2568        // behavioral change after the visitor refactor.
2569        let payload: Vec<u8> = (0..512u16).map(|i| i as u8).collect();
2570        let mut comp = Vec::new();
2571        let mut c = FrameCompressor::new(CompressionLevel::Default);
2572        c.set_source(payload.as_slice());
2573        c.set_drain(&mut comp);
2574        c.compress();
2575
2576        let skip = build_skippable_frame(7, b"ignored sidecar");
2577        let mut stream = Vec::new();
2578        stream.extend_from_slice(&skip);
2579        stream.extend_from_slice(&comp);
2580
2581        let mut decoder = FrameDecoder::new();
2582        let mut out = alloc::vec![0u8; payload.len()];
2583        let n = decoder
2584            .decode_all(stream.as_slice(), &mut out)
2585            .expect("decode_all should succeed on skippable + zstd stream");
2586        assert_eq!(n, payload.len());
2587        assert_eq!(&out[..n], payload.as_slice());
2588    }
2589
2590    #[cfg(feature = "lsm")]
2591    #[test]
2592    fn frame_emit_info_describes_emitted_block_layout() {
2593        // Encode a payload large enough to force >1 block, fetch
2594        // FrameEmitInfo, walk blocks[] and verify each block's
2595        // (offset_in_frame, header_size, body_size) matches the bytes
2596        // actually emitted into the drain buffer.
2597        let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2598        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2599        compressor.set_source(payload.as_slice());
2600        let mut compressed = Vec::new();
2601        compressor.set_drain(&mut compressed);
2602        compressor.compress();
2603
2604        let info = compressor
2605            .last_frame_emit_info()
2606            .expect("last_frame_emit_info populated after compress")
2607            .clone();
2608        drop(compressor);
2609
2610        // Frame header range starts at 0 and is non-empty.
2611        assert_eq!(info.frame_header_range.start, 0);
2612        assert!(info.frame_header_range.end > 0);
2613        // Total size matches what was written to the drain.
2614        assert_eq!(info.total_size as usize, compressed.len());
2615        // At least one block, and the last entry has last_block=true.
2616        assert!(!info.blocks.is_empty());
2617        assert!(info.blocks.last().unwrap().last_block);
2618        // All non-final blocks have last_block=false.
2619        for b in &info.blocks[..info.blocks.len() - 1] {
2620            assert!(!b.last_block);
2621        }
2622        // Walk and verify each block's header bytes match the
2623        // recorded type / size by re-decoding the 3-byte header.
2624        // Walking arithmetic: offset_in_frame + header_size + body_size
2625        // must land exactly on the next block's offset_in_frame (or,
2626        // for the last block, on the checksum / end of frame).
2627        for (i, b) in info.blocks.iter().enumerate() {
2628            let off = b.offset_in_frame as usize;
2629            assert_eq!(b.header_size, 3);
2630            let mut hdr = [0u8; 4];
2631            hdr[..3].copy_from_slice(&compressed[off..off + 3]);
2632            let raw = u32::from_le_bytes(hdr);
2633            let last = (raw & 1) != 0;
2634            let ty = (raw >> 1) & 0b11;
2635            let sz = raw >> 3;
2636            assert_eq!(last, b.last_block);
2637            assert_eq!(sz, b.block_size_field);
2638            // body_size is the PHYSICAL length on the wire: spec's
2639            // Block_Size for Raw/Compressed, always 1 for RLE.
2640            let expected_physical = match b.block_type {
2641                crate::encoding::frame_emit_info::BlockType::RLE => 1,
2642                _ => sz,
2643            };
2644            assert_eq!(b.body_size, expected_physical);
2645            let expected_ty = match b.block_type {
2646                crate::encoding::frame_emit_info::BlockType::Raw => 0,
2647                crate::encoding::frame_emit_info::BlockType::RLE => 1,
2648                crate::encoding::frame_emit_info::BlockType::Compressed => 2,
2649                crate::encoding::frame_emit_info::BlockType::Reserved => 3,
2650            };
2651            assert_eq!(ty, expected_ty);
2652            // Walking-arithmetic invariant.
2653            let next_off = b.offset_in_frame + b.header_size as u32 + b.body_size;
2654            if let Some(next) = info.blocks.get(i + 1) {
2655                assert_eq!(
2656                    next_off, next.offset_in_frame,
2657                    "block {i} body_size doesn't reach next block's offset_in_frame",
2658                );
2659            } else if let Some(cs) = info.checksum_range.as_ref() {
2660                assert_eq!(
2661                    next_off, cs.start,
2662                    "last block body_size doesn't reach checksum_range.start",
2663                );
2664            } else {
2665                assert_eq!(
2666                    next_off, info.total_size,
2667                    "last block body_size doesn't reach total_size",
2668                );
2669            }
2670        }
2671        // Checksum range present iff `feature = "hash"` is enabled.
2672        assert_eq!(info.checksum_range.is_some(), cfg!(feature = "hash"));
2673    }
2674
2675    #[cfg(all(feature = "lsm", feature = "hash"))]
2676    #[test]
2677    fn per_block_checksum_round_trip() {
2678        // Encode with per-block checksums enabled. Decode with
2679        // per-block verification. Both sides emit exactly 1
2680        // checksum per physical block written to / read from the
2681        // wire (encoder hashes per emission site, including each
2682        // post-split partition; decoder hashes each decoded block).
2683        // Cardinality and element-wise contents must match
2684        // round-trip.
2685        let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2686        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2687        compressor.set_source(payload.as_slice());
2688        compressor.enable_per_block_checksums();
2689        let mut compressed = Vec::new();
2690        compressor.set_drain(&mut compressed);
2691        compressor.compress();
2692
2693        let encoder_checksums = compressor
2694            .last_frame_block_checksums()
2695            .expect("checksums populated after enable + compress")
2696            .to_vec();
2697        drop(compressor);
2698        assert!(!encoder_checksums.is_empty());
2699
2700        // Decode side: enable verification, decode, compare.
2701        let mut decoder = FrameDecoder::new();
2702        decoder.enable_per_block_checksums();
2703        let mut output = alloc::vec![0u8; payload.len()];
2704        let n = decoder
2705            .decode_all(compressed.as_slice(), &mut output)
2706            .expect("decode_all should succeed");
2707        assert_eq!(n, payload.len());
2708        assert_eq!(&output[..n], payload.as_slice());
2709
2710        let decoder_checksums = decoder.computed_block_checksums();
2711        assert_eq!(decoder_checksums, encoder_checksums.as_slice());
2712    }
2713}