Skip to main content

structured_zstd/decoding/
frame_decoder.rs

1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::block_decoder::BlockDecoder;
10use crate::decoding::buffer_backend::BufferBackend;
11use crate::decoding::decode_buffer::DecodeBuffer;
12use crate::decoding::dictionary::{Dictionary, DictionaryHandle};
13use crate::decoding::errors::{DecodeBlockContentError, FrameDecoderError};
14use crate::decoding::flat_buf::FlatBuf;
15use crate::decoding::ringbuffer::RingBuffer;
16use crate::decoding::scratch::DecoderScratch;
17use crate::io::{Error, Read, Write};
18use alloc::collections::BTreeMap;
19use alloc::vec::Vec;
20use core::convert::TryInto;
21
22use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
23
24/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
25///
26/// This decoder is able to decode frames only partially and gives control
27/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
28/// It reads bytes as needed from a provided source and can be read from to collect partial results.
29///
30/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
31/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
32///
33/// Workflow is as follows:
34/// ```
35/// use structured_zstd::decoding::BlockDecodingStrategy;
36///
37/// # #[cfg(feature = "std")]
38/// use std::io::{Read, Write};
39///
40/// // no_std environments can use the crate's own Read traits
41/// # #[cfg(not(feature = "std"))]
42/// use structured_zstd::io::{Read, Write};
43///
44/// fn decode_this(mut file: impl Read) {
45///     //Create a new decoder
46///     let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
47///     let mut result = Vec::new();
48///
49///     // Use reset or init to make the decoder ready to decode the frame from the io::Read
50///     frame_dec.reset(&mut file).unwrap();
51///
52///     // Loop until the frame has been decoded completely
53///     while !frame_dec.is_finished() {
54///         // decode (roughly) batch_size many bytes
55///         frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
56///
57///         // read from the decoder to collect bytes from the internal buffer
58///         let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
59///
60///         // then do something with it
61///         do_something(&result[0..bytes_read]);
62///     }
63///
64///     // handle the last chunk of data
65///     while frame_dec.can_collect() > 0 {
66///         let x = frame_dec.read(result.as_mut_slice()).unwrap();
67///
68///         do_something(&result[0..x]);
69///     }
70/// }
71///
72/// fn do_something(data: &[u8]) {
73/// # #[cfg(feature = "std")]
74///     std::io::stdout().write_all(data).unwrap();
75/// }
76/// ```
77pub struct FrameDecoder {
78    state: Option<FrameDecoderState>,
79    owned_dicts: BTreeMap<u32, Dictionary>,
80    #[cfg(target_has_atomic = "ptr")]
81    shared_dicts: BTreeMap<u32, DictionaryHandle>,
82    #[cfg(not(target_has_atomic = "ptr"))]
83    shared_dicts: (),
84    /// `ZSTD_f_zstd1_magicless` — when true, [`init`] / [`reset`]
85    /// expect frames without the 4-byte magic number prefix.
86    /// Default false (standard zstd format).
87    magicless: bool,
88    /// Pinned `Dictionary_ID` expectation set via
89    /// [`Self::expect_dict_id`]. `None` (default) disables the
90    /// check; `Some(0)` matches frames whose header omits the
91    /// optional dict_id (treated as "no dictionary"). Validated in
92    /// [`Self::reset`] AFTER the frame header parses successfully
93    /// and BEFORE any block decode work.
94    #[cfg(feature = "lsm")]
95    expect_dict_id: Option<u32>,
96    /// Pinned `Window_Descriptor` byte expectation set via
97    /// [`Self::expect_window_descriptor`]. `None` (default)
98    /// disables the check. Validated in [`Self::reset`] AFTER the
99    /// frame header parses successfully and BEFORE any block
100    /// decode work. Single-segment frames (which omit the
101    /// `Window_Descriptor` byte from the wire) surface as
102    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`]
103    /// with `found: None`.
104    #[cfg(feature = "lsm")]
105    expect_window_descriptor: Option<u8>,
106    /// When `true`, the per-block decode loop XXH64-hashes each
107    /// block's decompressed bytes and stores the low-32-bit digest in
108    /// [`Self::computed_block_checksums`]. Default `false` (zero
109    /// cost). Set via [`Self::enable_per_block_checksums`]. Gated on
110    /// `all(lsm, hash)` because XXH64 lives behind the `hash`
111    /// feature.
112    #[cfg(all(feature = "lsm", feature = "hash"))]
113    per_block_checksums_enabled: bool,
114    /// Per-block XXH64 (low 32 bits) digests captured during the
115    /// current frame's decode when `per_block_checksums_enabled` is
116    /// set. Reset at the start of every new frame. Gated on
117    /// `all(lsm, hash)` (see `per_block_checksums_enabled`).
118    #[cfg(all(feature = "lsm", feature = "hash"))]
119    computed_block_checksums: alloc::vec::Vec<u32>,
120}
121
122/// Backend-tagged decode scratch — chosen at frame-reset time based
123/// on the parsed `FrameHeader.descriptor.single_segment_flag()` and
124/// kept stable through the lifetime of the frame. The match in each
125/// helper below dispatches **once per call** (e.g. once per block in
126/// `decode_block_content`, once per drain in `drain_to_writer`) —
127/// never inside the hot push/repeat loop, which is fully
128/// monomorphised through the `DecoderScratch<B>` generic.
129enum DecoderScratchKind {
130    Ring(DecoderScratch<RingBuffer>),
131    Flat(DecoderScratch<FlatBuf>),
132}
133
134impl DecoderScratchKind {
135    fn new_ring(window_size: usize) -> Self {
136        let mut s = DecoderScratch::<RingBuffer>::new(window_size);
137        s.buffer.reserve(window_size);
138        Self::Ring(s)
139    }
140
141    /// Construct a flat-backed scratch sized for a single-segment
142    /// frame. `frame_content_size` is the upcoming output size in
143    /// bytes (== `window_size` when the flag is set).
144    fn new_flat(frame_content_size: usize) -> Self {
145        let flat = FlatBuf::with_capacity(frame_content_size);
146        // DecoderScratch's default ctor would discard the pre-sized
147        // FlatBuf — go through from_backend so the buffer carries the
148        // capacity the constructor wants.
149        let mut s = DecoderScratch::<FlatBuf>::new(frame_content_size);
150        s.buffer = DecodeBuffer::from_backend(flat, frame_content_size);
151        Self::Flat(s)
152    }
153
154    /// Reset (or transition between) backends for a new frame.
155    /// Reuses the existing `DecoderScratch` allocations (FSE / HUF
156    /// tables, sequence vec, etc.) when the backend kind is unchanged
157    /// — only the underlying buffer is re-sized for the new frame.
158    /// Building a fresh `DecoderScratch` on every frame would
159    /// re-allocate everything and was measured at +255 % vs ring on
160    /// small frames; reusing it keeps the small-frame cost flat.
161    fn reset(&mut self, frame: &frame::FrameHeader, window_size: usize) {
162        if frame.descriptor.single_segment_flag() {
163            match self {
164                Self::Flat(s) => {
165                    s.reset(window_size);
166                    // DecodeBuffer::reset clears + reserves
167                    // window_size; FlatBuf's reserve grows the
168                    // backing Vec if the new FCS is larger than
169                    // what's already allocated. No alloc when the
170                    // previous flat frame had >= this capacity.
171                }
172                Self::Ring(_) => *self = Self::new_flat(window_size),
173            }
174        } else {
175            match self {
176                Self::Ring(s) => s.reset(window_size),
177                Self::Flat(_) => *self = Self::new_ring(window_size),
178            }
179        }
180    }
181
182    fn init_from_dict(&mut self, dict: &Dictionary) {
183        match self {
184            Self::Ring(s) => s.init_from_dict(dict),
185            Self::Flat(s) => s.init_from_dict(dict),
186        }
187    }
188
189    #[inline]
190    fn buffer_len(&self) -> usize {
191        match self {
192            Self::Ring(s) => s.buffer.len(),
193            Self::Flat(s) => s.buffer.len(),
194        }
195    }
196
197    /// Last `n` bytes of the visible buffer as `(s1, s2)` (wrap-aware).
198    /// Routes through whichever backend the current scratch holds.
199    #[cfg(all(feature = "lsm", feature = "hash"))]
200    fn last_n_as_slices(&self, n: usize) -> (&[u8], &[u8]) {
201        match self {
202            Self::Ring(s) => s.buffer.last_n_as_slices(n),
203            Self::Flat(s) => s.buffer.last_n_as_slices(n),
204        }
205    }
206
207    fn buffer_drain(&mut self) -> Vec<u8> {
208        match self {
209            Self::Ring(s) => s.buffer.drain(),
210            Self::Flat(s) => s.buffer.drain(),
211        }
212    }
213
214    fn buffer_drain_to_window_size(&mut self) -> Option<Vec<u8>> {
215        match self {
216            Self::Ring(s) => s.buffer.drain_to_window_size(),
217            Self::Flat(s) => s.buffer.drain_to_window_size(),
218        }
219    }
220
221    fn buffer_drain_to_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
222        match self {
223            Self::Ring(s) => s.buffer.drain_to_writer(sink),
224            Self::Flat(s) => s.buffer.drain_to_writer(sink),
225        }
226    }
227
228    fn buffer_drain_to_window_size_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
229        match self {
230            Self::Ring(s) => s.buffer.drain_to_window_size_writer(sink),
231            Self::Flat(s) => s.buffer.drain_to_window_size_writer(sink),
232        }
233    }
234
235    fn buffer_can_drain(&self) -> usize {
236        match self {
237            Self::Ring(s) => s.buffer.can_drain(),
238            Self::Flat(s) => s.buffer.can_drain(),
239        }
240    }
241
242    fn buffer_can_drain_to_window_size(&self) -> Option<usize> {
243        match self {
244            Self::Ring(s) => s.buffer.can_drain_to_window_size(),
245            Self::Flat(s) => s.buffer.can_drain_to_window_size(),
246        }
247    }
248
249    fn buffer_read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
250        match self {
251            Self::Ring(s) => s.buffer.read(target),
252            Self::Flat(s) => s.buffer.read(target),
253        }
254    }
255
256    fn buffer_read_all(&mut self, target: &mut [u8]) -> Result<usize, Error> {
257        match self {
258            Self::Ring(s) => s.buffer.read_all(target),
259            Self::Flat(s) => s.buffer.read_all(target),
260        }
261    }
262
263    fn decode_block_content<R: Read>(
264        &mut self,
265        decoder: &mut BlockDecoder,
266        header: &crate::blocks::block::BlockHeader,
267        source: R,
268    ) -> Result<u64, DecodeBlockContentError> {
269        match self {
270            Self::Ring(s) => decoder.decode_block_content(header, s, source),
271            Self::Flat(s) => decoder.decode_block_content(header, s, source),
272        }
273    }
274
275    #[cfg(feature = "hash")]
276    fn hash_finish(&self) -> u64 {
277        use core::hash::Hasher;
278        match self {
279            Self::Ring(s) => s.buffer.hash.finish(),
280            Self::Flat(s) => s.buffer.hash.finish(),
281        }
282    }
283}
284
285struct FrameDecoderState {
286    pub frame_header: frame::FrameHeader,
287    decoder_scratch: DecoderScratchKind,
288    frame_finished: bool,
289    block_counter: usize,
290    bytes_read_counter: u64,
291    check_sum: Option<u32>,
292    using_dict: Option<u32>,
293}
294
295pub enum BlockDecodingStrategy {
296    All,
297    UptoBlocks(usize),
298    UptoBytes(usize),
299}
300
301impl FrameDecoderState {
302    /// Construct a new frame decoder state, reading the frame header
303    /// from `source`. When `magicless` is `true`, the 4-byte magic
304    /// number prefix is NOT consumed (donor `ZSTD_f_zstd1_magicless`).
305    /// Crate-internal — reached only via `FrameDecoder::init` /
306    /// `FrameDecoder::init_with_dict_handle`. Pre-allocates the
307    /// decode buffer to `window_size` so the first block does not
308    /// trigger incremental growth from zero capacity.
309    pub(crate) fn new_with_format(
310        source: impl Read,
311        magicless: bool,
312    ) -> Result<FrameDecoderState, FrameDecoderError> {
313        let (frame, header_size) = frame::read_frame_header_with_format(source, magicless)?;
314        let window_size = frame.window_size()?;
315
316        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
317            return Err(FrameDecoderError::WindowSizeTooBig {
318                requested: window_size,
319            });
320        }
321
322        let decoder_scratch = if frame.descriptor.single_segment_flag() {
323            DecoderScratchKind::new_flat(window_size as usize)
324        } else {
325            DecoderScratchKind::new_ring(window_size as usize)
326        };
327        Ok(FrameDecoderState {
328            frame_header: frame,
329            frame_finished: false,
330            block_counter: 0,
331            decoder_scratch,
332            bytes_read_counter: u64::from(header_size),
333            check_sum: None,
334            using_dict: None,
335        })
336    }
337
338    /// Reset this state for a new frame read from `source`, reusing
339    /// existing allocations. When `magicless` is `true`, the frame
340    /// header is read WITHOUT expecting a magic-number prefix
341    /// (donor `ZSTD_f_zstd1_magicless`). Crate-internal — reached
342    /// only via `FrameDecoder::reset`.
343    ///
344    /// `DecodeBuffer::reset` reserves `window_size` internally, so
345    /// no additional frame-level reservation is needed here.
346    /// Further buffer growth during decoding is performed on demand
347    /// by the active block path.
348    pub(crate) fn reset_with_format(
349        &mut self,
350        source: impl Read,
351        magicless: bool,
352    ) -> Result<(), FrameDecoderError> {
353        let (frame_header, header_size) = frame::read_frame_header_with_format(source, magicless)?;
354        let window_size = frame_header.window_size()?;
355
356        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
357            return Err(FrameDecoderError::WindowSizeTooBig {
358                requested: window_size,
359            });
360        }
361
362        self.decoder_scratch
363            .reset(&frame_header, window_size as usize);
364        self.frame_header = frame_header;
365        self.frame_finished = false;
366        self.block_counter = 0;
367        self.bytes_read_counter = u64::from(header_size);
368        self.check_sum = None;
369        self.using_dict = None;
370        Ok(())
371    }
372}
373
374impl Default for FrameDecoder {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380impl FrameDecoder {
381    /// This will create a new decoder without allocating anything yet.
382    /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
383    /// else they just reset these buffers with not further allocations
384    pub fn new() -> FrameDecoder {
385        FrameDecoder {
386            state: None,
387            owned_dicts: BTreeMap::new(),
388            #[cfg(target_has_atomic = "ptr")]
389            shared_dicts: BTreeMap::new(),
390            #[cfg(not(target_has_atomic = "ptr"))]
391            shared_dicts: (),
392            magicless: false,
393            #[cfg(feature = "lsm")]
394            expect_dict_id: None,
395            #[cfg(feature = "lsm")]
396            expect_window_descriptor: None,
397            #[cfg(all(feature = "lsm", feature = "hash"))]
398            per_block_checksums_enabled: false,
399            #[cfg(all(feature = "lsm", feature = "hash"))]
400            computed_block_checksums: alloc::vec::Vec::new(),
401        }
402    }
403
404    /// Opt in to per-block XXH64 verification during decode.
405    /// Default off; zero cost when disabled. Each block's decompressed
406    /// bytes are XXH64-hashed (low 32 bits) and appended to
407    /// [`Self::computed_block_checksums`] as the decode progresses.
408    /// Callers compare the captured digests against externally-stored
409    /// expected values (e.g. from a per-block sidecar in the
410    /// containing application protocol).
411    ///
412    /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
413    /// primitive lives behind the `hash` feature, so this method
414    /// only compiles when both are enabled.
415    #[cfg(all(feature = "lsm", feature = "hash"))]
416    pub fn enable_per_block_checksums(&mut self) {
417        self.per_block_checksums_enabled = true;
418    }
419
420    /// Per-block XXH64 (low 32 bits) digests captured during the
421    /// current frame's decode. Empty unless
422    /// [`Self::enable_per_block_checksums`] was called before
423    /// [`Self::decode_all`] / [`Self::reset`].
424    ///
425    /// Reset at the start of every new frame.
426    ///
427    /// Behind `all(feature = "lsm", feature = "hash")`.
428    #[cfg(all(feature = "lsm", feature = "hash"))]
429    pub fn computed_block_checksums(&self) -> &[u32] {
430        &self.computed_block_checksums
431    }
432
433    /// Pin the expected `Dictionary_ID` for the next frame.
434    ///
435    /// When `expected` is set, [`Self::init`] / [`Self::reset`]
436    /// validate it against the parsed frame header BEFORE any
437    /// block decode work runs. A mismatch returns
438    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedDictId`]
439    /// before any block decode and before any output is produced.
440    /// Scratch buffer allocation / reservation for the decode
441    /// pipeline happens during frame-header parsing, which is
442    /// already complete when this validation fires — the cost of
443    /// scratch sizing is paid even on a mismatched header. The
444    /// guarantee is "no block decode, no XXH64 init, no partial
445    /// output", not "zero allocation".
446    ///
447    /// `Some(0)` is treated as "no dictionary expected": a frame
448    /// whose header omits the optional `Dictionary_ID` field
449    /// (flag value 0) passes the check; a frame that carries an
450    /// explicit non-zero id fails.
451    ///
452    /// `None` (default) disables the check.
453    ///
454    /// Primary use case: post-AEAD-decrypt sanity check in
455    /// wire-format consumers (e.g. lsm-tree's encrypted block
456    /// format pins the `dict_id` baked into the AAD against the
457    /// inner zstd frame's `dict_id` to defeat dict-substitution
458    /// attacks).
459    ///
460    /// NOT a replacement for AEAD authentication. NOT the same
461    /// semantic as donor `ZSTD_d_windowLogMax` (which is a
462    /// ceiling-style limit, separate concern).
463    #[cfg(feature = "lsm")]
464    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
465    pub fn expect_dict_id(&mut self, expected: Option<u32>) {
466        self.expect_dict_id = expected;
467    }
468
469    /// Pin the expected raw `Window_Descriptor` byte (RFC 8878
470    /// §3.1.1.1.2 layout: `(exp << 3) | mantissa`) for the next
471    /// frame.
472    ///
473    /// When `expected` is set, [`Self::init`] / [`Self::reset`]
474    /// validate it against the parsed frame header BEFORE any
475    /// block decode work runs. A mismatch returns
476    /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`].
477    ///
478    /// Single-segment frames omit the `Window_Descriptor` byte
479    /// from the wire entirely. Setting an expectation while
480    /// receiving a single-segment frame fails the check with
481    /// `found: None` — there is no on-wire byte to match against,
482    /// which is reported explicitly rather than silently passing.
483    ///
484    /// `None` (default) disables the check.
485    ///
486    /// Byte-exact equality, NOT a ceiling. Donor
487    /// `ZSTD_d_windowLogMax` is a separate ceiling-style limit
488    /// available through the C FFI surface; this method is for
489    /// strict equality validation against a pinned expectation
490    /// (e.g. lsm-tree's wire format pins the window descriptor
491    /// from the AAD to defeat decompression-bomb-swap attacks).
492    #[cfg(feature = "lsm")]
493    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
494    pub fn expect_window_descriptor(&mut self, expected: Option<u8>) {
495        self.expect_window_descriptor = expected;
496    }
497
498    /// Validate the just-parsed frame header against any pinned
499    /// expectations set via [`Self::expect_dict_id`] /
500    /// [`Self::expect_window_descriptor`].
501    ///
502    /// Returns the typed error variant on mismatch and leaves
503    /// `self.state` in a re-resettable shape — a subsequent
504    /// `reset()` will overwrite `frame_header` from the new source
505    /// without needing intermediate cleanup.
506    #[cfg(feature = "lsm")]
507    fn validate_expectations(
508        &self,
509        frame_header: &frame::FrameHeader,
510    ) -> Result<(), FrameDecoderError> {
511        if let Some(expected) = self.expect_dict_id {
512            let found = frame_header.dictionary_id();
513            // `Some(0)` is the "no dictionary expected" sentinel —
514            // matches a frame whose header omits the optional
515            // dict_id field (which is reported as `None` by the
516            // parser). All other values must match exactly.
517            let matches = match (expected, found) {
518                (0, None) => true,
519                (e, Some(f)) => e == f,
520                _ => false,
521            };
522            if !matches {
523                return Err(FrameDecoderError::UnexpectedDictId {
524                    expected: Some(expected),
525                    found,
526                });
527            }
528        }
529        if let Some(expected) = self.expect_window_descriptor {
530            let found = frame_header.window_descriptor();
531            if found != Some(expected) {
532                return Err(FrameDecoderError::UnexpectedWindowDescriptor { expected, found });
533            }
534        }
535        Ok(())
536    }
537
538    /// Enable or disable magicless frame format
539    /// (`ZSTD_f_zstd1_magicless`). When set to `true`, subsequent
540    /// [`init`] / [`reset`] calls expect the frame header to begin
541    /// directly with the frame-header descriptor — no 4-byte magic
542    /// number prefix. Default false. Must match the encoder's
543    /// magicless setting; the format is unambiguous only when the
544    /// caller knows it out-of-band.
545    ///
546    /// Note: magicless mode also disables skippable-frame detection.
547    /// The `0x184D2A50..=0x184D2A5F` skippable-frame magic range is
548    /// only recognised when the 4-byte magic prefix is consumed, so
549    /// `decode_all` / `init` / `reset` will treat a skippable frame
550    /// at the head of a magicless stream as a malformed frame header
551    /// (bad descriptor / window-size error) instead of skipping it.
552    /// Mixed-format streams that interleave skippable frames must be
553    /// pre-split by the caller; `set_magicless(true)` is only safe
554    /// when the entire stream is known to be magicless zstd frames.
555    pub fn set_magicless(&mut self, magicless: bool) {
556        self.magicless = magicless;
557    }
558
559    #[cfg(target_has_atomic = "ptr")]
560    fn shared_dict_exists(&self, dict_id: u32) -> bool {
561        self.shared_dicts.contains_key(&dict_id)
562    }
563
564    #[cfg(not(target_has_atomic = "ptr"))]
565    fn shared_dict_exists(&self, _dict_id: u32) -> bool {
566        false
567    }
568
569    fn validate_registered_dictionary(dict: &Dictionary) -> Result<(), FrameDecoderError> {
570        use crate::decoding::errors::DictionaryDecodeError as dict_err;
571
572        if dict.id == 0 {
573            return Err(FrameDecoderError::from(dict_err::ZeroDictionaryId));
574        }
575        if let Some(index) = dict.offset_hist.iter().position(|&rep| rep == 0) {
576            return Err(FrameDecoderError::from(
577                dict_err::ZeroRepeatOffsetInDictionary { index: index as u8 },
578            ));
579        }
580        Ok(())
581    }
582
583    /// init() will allocate all needed buffers if it is the first time this decoder is used
584    /// else they just reset these buffers with not further allocations
585    ///
586    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
587    ///
588    /// equivalent to reset()
589    pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
590        self.reset(source)
591    }
592
593    /// Initialize the decoder for a new frame using a pre-parsed dictionary handle.
594    ///
595    /// If the frame header has a dictionary ID, this validates it against
596    /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
597    ///
598    /// If the header omits the optional dictionary ID, this still applies the
599    /// provided dictionary handle.
600    ///
601    /// # Warning
602    ///
603    /// This method always applies `dict` unless the frame header contains a
604    /// non-matching dictionary ID. Callers must only use this API when they
605    /// already know the frame was encoded with the provided dictionary, even if
606    /// the frame header omits the dictionary ID or encodes an explicit
607    /// dictionary ID of `0`.
608    ///
609    /// Passing a dictionary for a frame that was not encoded with it can
610    /// silently corrupt the decoded output.
611    pub fn init_with_dict_handle(
612        &mut self,
613        source: impl Read,
614        dict: &DictionaryHandle,
615    ) -> Result<(), FrameDecoderError> {
616        self.reset_with_dict_handle(source, dict)
617    }
618
619    /// reset() will allocate all needed buffers if it is the first time this decoder is used
620    /// else they just reset these buffers with not further allocations
621    ///
622    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
623    ///
624    /// equivalent to init()
625    pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
626        use FrameDecoderError as err;
627        // Fresh frame → start with an empty per-block checksum vec so
628        // the values for the next frame don't carry over from the
629        // previous one.
630        #[cfg(all(feature = "lsm", feature = "hash"))]
631        self.computed_block_checksums.clear();
632        let magicless = self.magicless;
633        let dict_id = match &mut self.state {
634            Some(s) => {
635                s.reset_with_format(source, magicless)?;
636                s.frame_header.dictionary_id()
637            }
638            None => {
639                self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
640                self.state
641                    .as_ref()
642                    .and_then(|state| state.frame_header.dictionary_id())
643            }
644        };
645        // Validate any pinned expectations BEFORE block decode work
646        // runs. Catches dict_id substitution / window-descriptor
647        // tampering on inputs already authenticated by an outer
648        // layer (e.g. AEAD). Returning here leaves `self.state` in
649        // a re-resettable shape — next `reset()` re-parses the
650        // frame header without intermediate cleanup.
651        #[cfg(feature = "lsm")]
652        if let Some(state) = self.state.as_ref() {
653            self.validate_expectations(&state.frame_header)?;
654        }
655        if let Some(dict_id) = dict_id {
656            let state = self.state.as_mut().expect("state initialized");
657            let owned_dicts = &self.owned_dicts;
658            #[cfg(target_has_atomic = "ptr")]
659            let shared_dicts = &self.shared_dicts;
660            let dict = owned_dicts
661                .get(&dict_id)
662                .or_else(|| {
663                    #[cfg(target_has_atomic = "ptr")]
664                    {
665                        shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
666                    }
667                    #[cfg(not(target_has_atomic = "ptr"))]
668                    {
669                        None
670                    }
671                })
672                .ok_or(err::DictNotProvided { dict_id })?;
673            state.decoder_scratch.init_from_dict(dict);
674            state.using_dict = Some(dict_id);
675        }
676        Ok(())
677    }
678
679    /// Reset this decoder for a new frame using a pre-parsed dictionary handle.
680    ///
681    /// If the frame header has a dictionary ID, this validates it against
682    /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
683    ///
684    /// If the header omits the optional dictionary ID, this still applies the
685    /// provided dictionary handle.
686    ///
687    /// # Warning
688    ///
689    /// This method always applies `dict` unless the frame header contains a
690    /// non-matching dictionary ID. Callers must only use this API when they
691    /// already know the frame was encoded with the provided dictionary, even if
692    /// the frame header omits the dictionary ID or encodes an explicit
693    /// dictionary ID of `0`.
694    ///
695    /// Passing a dictionary for a frame that was not encoded with it can
696    /// silently corrupt the decoded output.
697    pub fn reset_with_dict_handle(
698        &mut self,
699        source: impl Read,
700        dict: &DictionaryHandle,
701    ) -> Result<(), FrameDecoderError> {
702        use FrameDecoderError as err;
703        // Fresh frame → drop the previous frame's per-block checksum
704        // digests so the next decode starts with an empty vec.
705        // Mirrors the same clear in `reset()`; reset_with_dict_handle
706        // is a parallel entry point so it needs its own call.
707        #[cfg(all(feature = "lsm", feature = "hash"))]
708        self.computed_block_checksums.clear();
709        Self::validate_registered_dictionary(dict.as_dict())?;
710        let magicless = self.magicless;
711        // Scope the &mut borrow of `self.state` to the header parse
712        // alone, so the subsequent `validate_expectations(&self, ...)`
713        // call below can take a fresh shared borrow of self without
714        // tripping the borrow checker.
715        match &mut self.state {
716            Some(s) => s.reset_with_format(source, magicless)?,
717            None => {
718                self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
719            }
720        }
721        // Single source of truth: route through the same
722        // `validate_expectations` used by `reset()`. Routing through
723        // the helper keeps the two code paths from drifting (e.g.,
724        // if expect-semantics or error wiring changes later).
725        #[cfg(feature = "lsm")]
726        {
727            let header = &self
728                .state
729                .as_ref()
730                .expect("state populated by reset_with_format/new_with_format")
731                .frame_header;
732            self.validate_expectations(header)?;
733        }
734        let state = self
735            .state
736            .as_mut()
737            .expect("state populated by reset_with_format/new_with_format");
738        if let Some(dict_id) = state.frame_header.dictionary_id()
739            && dict_id != dict.id()
740        {
741            return Err(err::DictIdMismatch {
742                expected: dict_id,
743                provided: dict.id(),
744            });
745        }
746        state.decoder_scratch.init_from_dict(dict.as_dict());
747        state.using_dict = Some(dict.id());
748        Ok(())
749    }
750
751    /// Add a dictionary that can be selected dynamically by frame dictionary ID.
752    ///
753    /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
754    /// registered (either as owned or shared).
755    pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
756        Self::validate_registered_dictionary(&dict)?;
757        let dict_id = dict.id;
758        if self.owned_dicts.contains_key(&dict_id) || self.shared_dict_exists(dict_id) {
759            return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
760        }
761        self.owned_dicts.insert(dict_id, dict);
762        Ok(())
763    }
764
765    /// Parse and add a serialized dictionary blob.
766    pub fn add_dict_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), FrameDecoderError> {
767        let dict = Dictionary::decode_dict(raw_dictionary)?;
768        self.add_dict(dict)
769    }
770
771    /// Add a pre-parsed dictionary handle for reuse across decoders.
772    ///
773    /// This API is available on targets with pointer-width atomics
774    /// (`target_has_atomic = "ptr"`).
775    ///
776    /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
777    /// registered (either as owned or shared).
778    #[cfg(target_has_atomic = "ptr")]
779    pub fn add_dict_handle(&mut self, dict: DictionaryHandle) -> Result<(), FrameDecoderError> {
780        Self::validate_registered_dictionary(dict.as_dict())?;
781        let dict_id = dict.id();
782        if self.owned_dicts.contains_key(&dict_id) || self.shared_dicts.contains_key(&dict_id) {
783            return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
784        }
785        self.shared_dicts.insert(dict_id, dict);
786        Ok(())
787    }
788
789    pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
790        use FrameDecoderError as err;
791        let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
792        let owned_dicts = &self.owned_dicts;
793        #[cfg(target_has_atomic = "ptr")]
794        let shared_dicts = &self.shared_dicts;
795
796        let dict = owned_dicts
797            .get(&dict_id)
798            .or_else(|| {
799                #[cfg(target_has_atomic = "ptr")]
800                {
801                    shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
802                }
803                #[cfg(not(target_has_atomic = "ptr"))]
804                {
805                    None
806                }
807            })
808            .ok_or(err::DictNotProvided { dict_id })?;
809        state.decoder_scratch.init_from_dict(dict);
810        state.using_dict = Some(dict_id);
811
812        Ok(())
813    }
814
815    /// Returns how many bytes the frame contains after decompression
816    pub fn content_size(&self) -> u64 {
817        match &self.state {
818            None => 0,
819            Some(s) => s.frame_header.frame_content_size(),
820        }
821    }
822
823    /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
824    pub fn get_checksum_from_data(&self) -> Option<u32> {
825        let state = self.state.as_ref()?;
826
827        state.check_sum
828    }
829
830    /// Returns the checksum that was calculated while decoding.
831    /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder
832    #[cfg(feature = "hash")]
833    pub fn get_calculated_checksum(&self) -> Option<u32> {
834        let state = self.state.as_ref()?;
835        let cksum_64bit = state.decoder_scratch.hash_finish();
836        //truncate to lower 32bit because reasons...
837        Some(cksum_64bit as u32)
838    }
839
840    /// Counter for how many bytes have been consumed while decoding the frame
841    pub fn bytes_read_from_source(&self) -> u64 {
842        let state = match &self.state {
843            None => return 0,
844            Some(s) => s,
845        };
846        state.bytes_read_counter
847    }
848
849    /// Whether the current frames last block has been decoded yet
850    /// If this returns true you can call the drain* functions to get all content
851    /// (the read() function will drain automatically if this returns true)
852    pub fn is_finished(&self) -> bool {
853        let state = match &self.state {
854            None => return true,
855            Some(s) => s,
856        };
857        if state.frame_header.descriptor.content_checksum_flag() {
858            state.frame_finished && state.check_sum.is_some()
859        } else {
860            state.frame_finished
861        }
862    }
863
864    /// Counter for how many blocks have already been decoded
865    pub fn blocks_decoded(&self) -> usize {
866        let state = match &self.state {
867            None => return 0,
868            Some(s) => s,
869        };
870        state.block_counter
871    }
872
873    /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
874    /// The Strategy influences how many blocks will be decoded before the function returns
875    /// This is important if you want to manage memory consumption carefully. If you don't care
876    /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
877    pub fn decode_blocks(
878        &mut self,
879        mut source: impl Read,
880        strat: BlockDecodingStrategy,
881    ) -> Result<bool, FrameDecoderError> {
882        use FrameDecoderError as err;
883        let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
884
885        let mut block_dec = decoding::block_decoder::new();
886
887        let buffer_size_before = state.decoder_scratch.buffer_len();
888        let block_counter_before = state.block_counter;
889        loop {
890            vprintln!("################");
891            vprintln!("Next Block: {}", state.block_counter);
892            vprintln!("################");
893            let (block_header, block_header_size) = block_dec
894                .read_block_header(&mut source)
895                .map_err(err::FailedToReadBlockHeader)?;
896            state.bytes_read_counter += u64::from(block_header_size);
897
898            vprintln!();
899            vprintln!(
900                "Found {} block with size: {}, which will be of size: {}",
901                block_header.block_type,
902                block_header.content_size,
903                block_header.decompressed_size
904            );
905
906            #[cfg(all(feature = "lsm", feature = "hash"))]
907            let len_before_block: Option<usize> = if self.per_block_checksums_enabled {
908                Some(state.decoder_scratch.buffer_len())
909            } else {
910                None
911            };
912            let bytes_read_in_block_body = state
913                .decoder_scratch
914                .decode_block_content(&mut block_dec, &block_header, &mut source)
915                .map_err(err::FailedToReadBlockBody)?;
916            state.bytes_read_counter += bytes_read_in_block_body;
917
918            // Per-block XXH64 (low 32 bits) of the just-decompressed
919            // bytes. Hashed from `last_n_as_slices` so RingBuffer wrap
920            // is handled in-place, no extra copy.
921            #[cfg(all(feature = "lsm", feature = "hash"))]
922            if let Some(len_before_block) = len_before_block {
923                let added = state.decoder_scratch.buffer_len() - len_before_block;
924                let (s1, s2) = state.decoder_scratch.last_n_as_slices(added);
925                let mut h = twox_hash::XxHash64::with_seed(0);
926                use core::hash::Hasher;
927                h.write(s1);
928                h.write(s2);
929                self.computed_block_checksums.push(h.finish() as u32);
930            }
931
932            state.block_counter += 1;
933
934            vprintln!("Output: {}", state.decoder_scratch.buffer_len());
935
936            if block_header.last_block {
937                state.frame_finished = true;
938                if state.frame_header.descriptor.content_checksum_flag() {
939                    let mut chksum = [0u8; 4];
940                    source
941                        .read_exact(&mut chksum)
942                        .map_err(err::FailedToReadChecksum)?;
943                    state.bytes_read_counter += 4;
944                    let chksum = u32::from_le_bytes(chksum);
945                    state.check_sum = Some(chksum);
946                }
947                break;
948            }
949
950            match strat {
951                BlockDecodingStrategy::All => { /* keep going */ }
952                BlockDecodingStrategy::UptoBlocks(n) => {
953                    if state.block_counter - block_counter_before >= n {
954                        break;
955                    }
956                }
957                BlockDecodingStrategy::UptoBytes(n) => {
958                    if state.decoder_scratch.buffer_len() - buffer_size_before >= n {
959                        break;
960                    }
961                }
962            }
963        }
964
965        Ok(state.frame_finished)
966    }
967
968    /// Collect bytes and retain window_size bytes while decoding is still going on.
969    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
970    pub fn collect(&mut self) -> Option<Vec<u8>> {
971        let finished = self.is_finished();
972        let state = self.state.as_mut()?;
973        if finished {
974            Some(state.decoder_scratch.buffer_drain())
975        } else {
976            state.decoder_scratch.buffer_drain_to_window_size()
977        }
978    }
979
980    /// Collect bytes and retain window_size bytes while decoding is still going on.
981    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
982    pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
983        let finished = self.is_finished();
984        let state = match &mut self.state {
985            None => return Ok(0),
986            Some(s) => s,
987        };
988        if finished {
989            state.decoder_scratch.buffer_drain_to_writer(w)
990        } else {
991            state.decoder_scratch.buffer_drain_to_window_size_writer(w)
992        }
993    }
994
995    /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
996    /// because window_size bytes need to be retained for decoding.
997    /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
998    pub fn can_collect(&self) -> usize {
999        let finished = self.is_finished();
1000        let state = match &self.state {
1001            None => return 0,
1002            Some(s) => s,
1003        };
1004        if finished {
1005            state.decoder_scratch.buffer_can_drain()
1006        } else {
1007            state
1008                .decoder_scratch
1009                .buffer_can_drain_to_window_size()
1010                .unwrap_or(0)
1011        }
1012    }
1013
1014    /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
1015    /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
1016    ///
1017    /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
1018    /// which try to serve an old-style c api
1019    ///
1020    /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
1021    /// input will not make any progress!
1022    ///
1023    /// Note that no kind of block can be bigger than 128kb.
1024    /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
1025    ///
1026    /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
1027    pub fn decode_from_to(
1028        &mut self,
1029        source: &[u8],
1030        target: &mut [u8],
1031    ) -> Result<(usize, usize), FrameDecoderError> {
1032        use FrameDecoderError as err;
1033        let bytes_read_at_start = match &self.state {
1034            Some(s) => s.bytes_read_counter,
1035            None => 0,
1036        };
1037
1038        if !self.is_finished() || self.state.is_none() {
1039            let mut mt_source = source;
1040
1041            if self.state.is_none() {
1042                self.init(&mut mt_source)?;
1043            }
1044
1045            //pseudo block to scope "state" so we can borrow self again after the block
1046            {
1047                let state = match &mut self.state {
1048                    Some(s) => s,
1049                    None => panic!("Bug in library"),
1050                };
1051                let mut block_dec = decoding::block_decoder::new();
1052
1053                if state.frame_header.descriptor.content_checksum_flag()
1054                    && state.frame_finished
1055                    && state.check_sum.is_none()
1056                {
1057                    //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
1058                    if mt_source.len() >= 4 {
1059                        let chksum = mt_source[..4].try_into().expect("optimized away");
1060                        state.bytes_read_counter += 4;
1061                        let chksum = u32::from_le_bytes(chksum);
1062                        state.check_sum = Some(chksum);
1063                    }
1064                    return Ok((4, 0));
1065                }
1066
1067                loop {
1068                    //check if there are enough bytes for the next header
1069                    if mt_source.len() < 3 {
1070                        break;
1071                    }
1072                    let (block_header, block_header_size) = block_dec
1073                        .read_block_header(&mut mt_source)
1074                        .map_err(err::FailedToReadBlockHeader)?;
1075
1076                    // check the needed size for the block before updating counters.
1077                    // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
1078                    if mt_source.len() < block_header.content_size as usize {
1079                        break;
1080                    }
1081                    state.bytes_read_counter += u64::from(block_header_size);
1082
1083                    let bytes_read_in_block_body = state
1084                        .decoder_scratch
1085                        .decode_block_content(&mut block_dec, &block_header, &mut mt_source)
1086                        .map_err(err::FailedToReadBlockBody)?;
1087                    state.bytes_read_counter += bytes_read_in_block_body;
1088                    state.block_counter += 1;
1089
1090                    if block_header.last_block {
1091                        state.frame_finished = true;
1092                        if state.frame_header.descriptor.content_checksum_flag() {
1093                            //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
1094                            if mt_source.len() >= 4 {
1095                                let chksum = mt_source[..4].try_into().expect("optimized away");
1096                                state.bytes_read_counter += 4;
1097                                let chksum = u32::from_le_bytes(chksum);
1098                                state.check_sum = Some(chksum);
1099                            }
1100                        }
1101                        break;
1102                    }
1103                }
1104            }
1105        }
1106
1107        let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
1108        let bytes_read_at_end = match &mut self.state {
1109            Some(s) => s.bytes_read_counter,
1110            None => panic!("Bug in library"),
1111        };
1112        let read_len = bytes_read_at_end - bytes_read_at_start;
1113        Ok((read_len as usize, result_len))
1114    }
1115
1116    /// Decode multiple frames into the output slice.
1117    ///
1118    /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1119    /// skipped during decode.
1120    ///
1121    /// `output` must be large enough to hold the decompressed data. If you don't know
1122    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1123    ///
1124    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1125    ///
1126    /// Returns the number of bytes written to `output`.
1127    pub fn decode_all(
1128        &mut self,
1129        input: &[u8],
1130        output: &mut [u8],
1131    ) -> Result<usize, FrameDecoderError> {
1132        #[cfg(not(feature = "lsm"))]
1133        {
1134            self.decode_all_impl(input, output, |this, src| this.init(src))
1135        }
1136        #[cfg(feature = "lsm")]
1137        {
1138            self.decode_all_impl(input, output, |this, src| this.init(src), None)
1139        }
1140    }
1141
1142    /// Decode multiple frames into the output slice, invoking `visitor`
1143    /// for every skippable frame encountered before advancing past it.
1144    ///
1145    /// `input` must contain an exact number of frames. Skippable frames
1146    /// (RFC 8878 §3.1.2 magic numbers `0x184D2A50..=0x184D2A5F`) are
1147    /// allowed and will be both visited AND skipped: the visitor gets
1148    /// `(magic_variant, payload)` where `magic_variant` is the low
1149    /// nibble of the magic (`magic - 0x184D2A50`, range `0..=15`) and
1150    /// `payload` is a borrowed slice of the on-wire payload bytes (the
1151    /// skippable frame's `Frame_Size` field worth of data) into
1152    /// `input` — no allocation.
1153    ///
1154    /// The visitor sees skippable frames in stream order; interleaved
1155    /// regular zstd frames continue to decompress into `output` exactly
1156    /// as `decode_all` does.
1157    ///
1158    /// `output` must be large enough to hold the decompressed data.
1159    /// Returns the number of bytes written to `output`.
1160    ///
1161    /// # Example
1162    ///
1163    /// ```ignore
1164    /// use structured_zstd::decoding::FrameDecoder;
1165    ///
1166    /// let mut decoder = FrameDecoder::new();
1167    /// let mut output = vec![0u8; 1024];
1168    /// let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
1169    /// let n = decoder.decode_all_with_skippable_visitor(
1170    ///     input,
1171    ///     &mut output,
1172    ///     |variant, payload| collected.push((variant, payload.to_vec())),
1173    /// )?;
1174    /// ```
1175    #[cfg(feature = "lsm")]
1176    #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1177    pub fn decode_all_with_skippable_visitor<F>(
1178        &mut self,
1179        input: &[u8],
1180        output: &mut [u8],
1181        mut visitor: F,
1182    ) -> Result<usize, FrameDecoderError>
1183    where
1184        F: FnMut(u8, &[u8]),
1185    {
1186        self.decode_all_impl(
1187            input,
1188            output,
1189            |this, src| this.init(src),
1190            Some(&mut visitor),
1191        )
1192    }
1193
1194    /// Decode multiple frames into the output slice using a pre-parsed dictionary handle.
1195    ///
1196    /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1197    /// skipped during decode.
1198    ///
1199    /// `output` must be large enough to hold the decompressed data. If you don't know
1200    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1201    ///
1202    /// This calls [`FrameDecoder::init_with_dict_handle`], and all bytes currently in the
1203    /// decoder will be lost.
1204    ///
1205    /// # Warning
1206    ///
1207    /// Each decoded frame is initialized with `dict`, even when a frame header
1208    /// omits the optional dictionary ID. Callers must only use this API when
1209    /// they already know the input frames were encoded with the provided
1210    /// dictionary; otherwise decoded output can be silently corrupted.
1211    pub fn decode_all_with_dict_handle(
1212        &mut self,
1213        input: &[u8],
1214        output: &mut [u8],
1215        dict: &DictionaryHandle,
1216    ) -> Result<usize, FrameDecoderError> {
1217        #[cfg(not(feature = "lsm"))]
1218        {
1219            self.decode_all_impl(input, output, |this, src| {
1220                this.init_with_dict_handle(src, dict)
1221            })
1222        }
1223        #[cfg(feature = "lsm")]
1224        {
1225            self.decode_all_impl(
1226                input,
1227                output,
1228                |this, src| this.init_with_dict_handle(src, dict),
1229                None,
1230            )
1231        }
1232    }
1233
1234    /// Default-feature decode_all_impl: no visitor parameter so the
1235    /// no-lsm build's call surface and codegen are byte-identical to
1236    /// the pre-#172 implementation. Compiles only when `lsm` is OFF.
1237    #[cfg(not(feature = "lsm"))]
1238    fn decode_all_impl(
1239        &mut self,
1240        mut input: &[u8],
1241        mut output: &mut [u8],
1242        mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1243    ) -> Result<usize, FrameDecoderError> {
1244        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1245        let mut total_bytes_written = 0;
1246        while !input.is_empty() {
1247            match init_frame(self, &mut input) {
1248                Ok(_) => {}
1249                Err(FrameDecoderError::ReadFrameHeaderError(
1250                    crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
1251                )) => {
1252                    input = input
1253                        .get(length as usize..)
1254                        .ok_or(FrameDecoderError::FailedToSkipFrame)?;
1255                    continue;
1256                }
1257                Err(e) => return Err(e),
1258            };
1259            // Per-frame direct-path dispatch. Now safe to route the
1260            // public `decode_all` here because
1261            // `UserSliceBackend::exec_sequence_inline` returns
1262            // `Result<(), ExecuteSequencesError>` instead of
1263            // panicking on capacity overflow; the error propagates
1264            // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1265            // active dict, remaining `output` slice has WILDCOPY
1266            // slack) puts the frame on the fast path that bypasses
1267            // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1268            // frames (no FCS, active dict, output too small for
1269            // slack) fall through to the legacy `decode_blocks` +
1270            // `read` drain loop below.
1271            let state_ref = self.state.as_ref().expect("init populated state");
1272            let content_size = state_ref.frame_header.frame_content_size();
1273            let fcs_declared = state_ref.frame_header.fcs_declared();
1274            let dict_active = state_ref.using_dict.is_some();
1275            let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1276            // Per-block checksums collected inside `run_direct_decode`
1277            // post-loop (over recorded (start, end) ranges of `output`)
1278            // so the direct path stays eligible AND keeps the
1279            // window-size cap (`drop_to_window_size`) between blocks
1280            // that the spec relies on for `offset <= window_size`
1281            // validation. Path choice no longer alters checksum
1282            // semantics.
1283            let direct_eligible =
1284                content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1285            if direct_eligible {
1286                let written = self.run_direct_decode(&mut input, output, content_size)?;
1287                output = &mut output[written..];
1288                total_bytes_written += written;
1289                continue;
1290            }
1291            let frame_start_total = total_bytes_written;
1292            loop {
1293                self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1294                let bytes_written = self
1295                    .read(output)
1296                    .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1297                output = &mut output[bytes_written..];
1298                total_bytes_written += bytes_written;
1299                if self.can_collect() != 0 {
1300                    return Err(FrameDecoderError::TargetTooSmall);
1301                }
1302                if self.is_finished() {
1303                    break;
1304                }
1305            }
1306            // Per-frame FCS validation on the legacy fallback path.
1307            // Use `fcs_declared()` (NOT `content_size > 0`) so an
1308            // empty frame with explicit FCS=0 on the wire still gets
1309            // validated.
1310            if fcs_declared {
1311                let produced = (total_bytes_written - frame_start_total) as u64;
1312                if produced != content_size {
1313                    return Err(FrameDecoderError::FrameContentSizeMismatch {
1314                        declared: content_size,
1315                        produced,
1316                    });
1317                }
1318            }
1319        }
1320
1321        Ok(total_bytes_written)
1322    }
1323
1324    /// `lsm`-feature decode_all_impl: adds the optional skippable
1325    /// visitor parameter consumed by
1326    /// [`Self::decode_all_with_skippable_visitor`]. Mirrors the no-lsm
1327    /// variant including the direct-path dispatch + FCS-validation
1328    /// rationale comments, so the two functions stay in sync; the only
1329    /// behavioral difference is the SkipFrame arm, which uses
1330    /// `split_at(length)` (single bounds check) instead of two
1331    /// separate `get(..length)` / `get(length..)` slices and invokes
1332    /// the visitor (when `Some`) on the borrowed payload before
1333    /// advancing past it.
1334    #[cfg(feature = "lsm")]
1335    #[allow(clippy::type_complexity)]
1336    fn decode_all_impl(
1337        &mut self,
1338        mut input: &[u8],
1339        mut output: &mut [u8],
1340        mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1341        mut skippable_visitor: Option<&mut dyn FnMut(u8, &[u8])>,
1342    ) -> Result<usize, FrameDecoderError> {
1343        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1344        let mut total_bytes_written = 0;
1345        while !input.is_empty() {
1346            match init_frame(self, &mut input) {
1347                Ok(_) => {}
1348                Err(FrameDecoderError::ReadFrameHeaderError(
1349                    crate::decoding::errors::ReadFrameHeaderError::SkipFrame {
1350                        magic_number,
1351                        length,
1352                    },
1353                )) => {
1354                    let length = length as usize;
1355                    // Visitor sees the payload slice BEFORE we advance
1356                    // past it. Borrowed slice — no allocation. The
1357                    // variant is the low nibble of the magic number
1358                    // (RFC 8878 §3.1.2). `read_frame_header` only emits
1359                    // SkipFrame for magic in 0x184D2A50..=0x184D2A5F, so
1360                    // the subtraction fits in 0..=15.
1361                    if input.len() < length {
1362                        return Err(FrameDecoderError::FailedToSkipFrame);
1363                    }
1364                    let (payload, rest) = input.split_at(length);
1365                    if let Some(visitor) = skippable_visitor.as_mut() {
1366                        let variant = (magic_number - 0x184D2A50) as u8;
1367                        visitor(variant, payload);
1368                    }
1369                    input = rest;
1370                    continue;
1371                }
1372                Err(e) => return Err(e),
1373            };
1374            // Per-frame direct-path dispatch. Now safe to route the
1375            // public `decode_all` here because
1376            // `UserSliceBackend::exec_sequence_inline` returns
1377            // `Result<(), ExecuteSequencesError>` instead of
1378            // panicking on capacity overflow; the error propagates
1379            // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1380            // active dict, remaining `output` slice has WILDCOPY
1381            // slack) puts the frame on the fast path that bypasses
1382            // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1383            // frames (no FCS, active dict, output too small for
1384            // slack) fall through to the legacy `decode_blocks` +
1385            // `read` drain loop below.
1386            let state_ref = self.state.as_ref().expect("init populated state");
1387            let content_size = state_ref.frame_header.frame_content_size();
1388            let fcs_declared = state_ref.frame_header.fcs_declared();
1389            let dict_active = state_ref.using_dict.is_some();
1390            let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1391            let direct_eligible =
1392                content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1393            if direct_eligible {
1394                let written = self.run_direct_decode(&mut input, output, content_size)?;
1395                output = &mut output[written..];
1396                total_bytes_written += written;
1397                continue;
1398            }
1399            let frame_start_total = total_bytes_written;
1400            loop {
1401                self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1402                let bytes_written = self
1403                    .read(output)
1404                    .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1405                output = &mut output[bytes_written..];
1406                total_bytes_written += bytes_written;
1407                if self.can_collect() != 0 {
1408                    return Err(FrameDecoderError::TargetTooSmall);
1409                }
1410                if self.is_finished() {
1411                    break;
1412                }
1413            }
1414            // Per-frame FCS validation on the legacy fallback path.
1415            // Use `fcs_declared()` (NOT `content_size > 0`) so an
1416            // empty frame with explicit FCS=0 on the wire still gets
1417            // validated.
1418            if fcs_declared {
1419                let produced = (total_bytes_written - frame_start_total) as u64;
1420                if produced != content_size {
1421                    return Err(FrameDecoderError::FrameContentSizeMismatch {
1422                        declared: content_size,
1423                        produced,
1424                    });
1425                }
1426            }
1427        }
1428
1429        Ok(total_bytes_written)
1430    }
1431
1432    /// Decode multiple frames into the output slice using a serialized dictionary.
1433    ///
1434    /// # Warning
1435    ///
1436    /// Each decoded frame is initialized with the parsed dictionary, even when a
1437    /// frame header omits the optional dictionary ID. Callers must only use this
1438    /// API when they already know the input frames were encoded with that
1439    /// dictionary; otherwise decoded output can be silently corrupted.
1440    pub fn decode_all_with_dict_bytes(
1441        &mut self,
1442        input: &[u8],
1443        output: &mut [u8],
1444        raw_dictionary: &[u8],
1445    ) -> Result<usize, FrameDecoderError> {
1446        let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
1447        self.decode_all_with_dict_handle(input, output, &dict)
1448    }
1449
1450    /// Decode multiple frames into the extra capacity of the output vector.
1451    ///
1452    /// `input` must contain an exact number of frames.
1453    ///
1454    /// `output` must have enough extra capacity to hold the decompressed data.
1455    /// This function reserves an additional [`WILDCOPY_OVERLENGTH`]
1456    /// bytes on top of the caller's capacity so the per-frame direct
1457    /// decode path stays eligible — that may grow the vector by up
1458    /// to that fixed amount via `Vec::reserve`. It will NOT grow
1459    /// further to fit the decompressed payload itself; the caller's
1460    /// pre-allocated capacity must already cover the data. If you
1461    /// don't know how large the output will be, use
1462    /// [`FrameDecoder::decode_blocks`] instead.
1463    ///
1464    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1465    ///
1466    /// The length of the output vector is updated to include the decompressed data.
1467    /// The length is not changed if an error occurs. The
1468    /// `WILDCOPY_OVERLENGTH` slack is internal — `output.len()` on
1469    /// return is the actual decompressed size, NOT the inflated
1470    /// capacity. Callers who pre-sized the Vec with
1471    /// `Vec::with_capacity(fcs)` see no functional change beyond
1472    /// the small one-time capacity bump.
1473    pub fn decode_all_to_vec(
1474        &mut self,
1475        input: &[u8],
1476        output: &mut Vec<u8>,
1477    ) -> Result<(), FrameDecoderError> {
1478        use super::buffer_backend::WILDCOPY_OVERLENGTH;
1479        let len = output.len();
1480        // Reserve WILDCOPY slack on top of the caller's capacity so
1481        // `decode_all` can land on the direct path even when the
1482        // caller didn't pre-allocate slack themselves.
1483        output.reserve(WILDCOPY_OVERLENGTH);
1484        let cap = output.capacity();
1485        output.resize(cap, 0);
1486        match self.decode_all(input, &mut output[len..]) {
1487            Ok(bytes_written) => {
1488                let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
1489                output.resize(new_len, 0);
1490                Ok(())
1491            }
1492            Err(e) => {
1493                output.resize(len, 0);
1494                Err(e)
1495            }
1496        }
1497    }
1498
1499    /// Single-frame direct-decode path. Decodes one zstd frame into
1500    /// `output[..content_size]` via a stack-local
1501    /// `DecodeBuffer<UserSliceBackend>`, bypassing the per-block
1502    /// FlatBuf/Ring -> `read()` drain copy.
1503    ///
1504    /// # Preconditions (caller-enforced)
1505    ///
1506    /// - `self.init` (or `init_with_dict_handle`) was called for
1507    ///   this frame so `self.state` is populated.
1508    /// - `content_size` matches `self.state.frame_header
1509    ///   .frame_content_size()` and is `> 0` (caller already passed
1510    ///   the eligibility gate).
1511    /// - `output.len() >= content_size + WILDCOPY_OVERLENGTH`.
1512    /// - No active dictionary
1513    ///   (`self.state.using_dict.is_none()`).
1514    ///
1515    /// On return, `input` points at the byte immediately after the
1516    /// frame's checksum (or after the last block, when the frame
1517    /// has `content_checksum_flag = 0`). `self.state.frame_finished`
1518    /// is set so [`Self::is_finished`] reports `true`.
1519    fn run_direct_decode(
1520        &mut self,
1521        input: &mut &[u8],
1522        output: &mut [u8],
1523        content_size: u64,
1524    ) -> Result<usize, FrameDecoderError> {
1525        use super::block_decoder;
1526        use super::decode_buffer::DecodeBuffer;
1527        use super::scratch::DirectScratch;
1528        use super::user_slice_buf::UserSliceBackend;
1529        use crate::io::Read;
1530        use FrameDecoderError as err;
1531
1532        let state = self
1533            .state
1534            .as_mut()
1535            .expect("caller ensures init populated state");
1536
1537        // Borrow persistent fields out of whichever scratch variant
1538        // `init` produced (Flat for single_segment, Ring for
1539        // multi-segment) — both expose the same HUF/FSE/Vec
1540        // fields; only `buffer` differs and we don't use that here.
1541        // Macro-style binding avoids the closure / generic
1542        // gymnastics of returning multiple `&mut` from a match arm.
1543        let (huf, fse, offset_hist, literals_buffer, sequences, block_content_buffer, window_size) =
1544            match &mut state.decoder_scratch {
1545                DecoderScratchKind::Flat(s) => (
1546                    &mut s.huf,
1547                    &mut s.fse,
1548                    &mut s.offset_hist,
1549                    &mut s.literals_buffer,
1550                    &mut s.sequences,
1551                    &mut s.block_content_buffer,
1552                    s.buffer.window_size,
1553                ),
1554                DecoderScratchKind::Ring(s) => (
1555                    &mut s.huf,
1556                    &mut s.fse,
1557                    &mut s.offset_hist,
1558                    &mut s.literals_buffer,
1559                    &mut s.sequences,
1560                    &mut s.block_content_buffer,
1561                    s.buffer.window_size,
1562                ),
1563            };
1564        let backend = UserSliceBackend::from_slice(output);
1565        let buffer = DecodeBuffer::from_backend(backend, window_size);
1566        let mut direct = DirectScratch {
1567            huf,
1568            fse,
1569            offset_hist,
1570            literals_buffer,
1571            sequences,
1572            block_content_buffer,
1573            buffer,
1574        };
1575
1576        // Block loop. Mirrors `decode_blocks` (without the
1577        // strategy-bounded early exit — we always decode the whole
1578        // frame in one shot for the direct path). Keeps
1579        // `state.bytes_read_counter` / `state.block_counter` in
1580        // sync with `decode_blocks` so post-call accessors
1581        // (`bytes_read_from_source`, `blocks_decoded`) return
1582        // accurate values.
1583        let mut block_dec = block_decoder::new();
1584        // Track total output bytes against the declared
1585        // `frame_content_size` via the buffer's actual write
1586        // counter — `BlockHeader.decompressed_size` is 0 for
1587        // Compressed blocks (the header parser can't know the
1588        // expanded size before decoding the body), so per-header
1589        // tracking would always count 0 for those blocks and
1590        // miscount frames that aren't pure Raw/RLE.
1591        let mut produced: u64 = 0;
1592        // Per-block output ranges captured during the direct-path
1593        // loop. After the loop we re-borrow `output` (post-drop of
1594        // `direct`) and XXH64 each range into
1595        // `self.computed_block_checksums`, so the digests vector
1596        // stays consistent with the legacy `decode_blocks` path
1597        // regardless of which dispatch the frame took.
1598        // `Vec::new()` does not allocate, so this stays free when
1599        // `per_block_checksums_enabled` is false: the `push` and the
1600        // post-loop hashing loop are both gated by the same flag.
1601        #[cfg(all(feature = "lsm", feature = "hash"))]
1602        let mut block_ranges: alloc::vec::Vec<(usize, usize)> = alloc::vec::Vec::new();
1603        loop {
1604            #[cfg(all(feature = "lsm", feature = "hash"))]
1605            let produced_before: Option<usize> = if self.per_block_checksums_enabled {
1606                Some(produced as usize)
1607            } else {
1608                None
1609            };
1610            let (block_header, hsize) = block_dec
1611                .read_block_header(&mut *input)
1612                .map_err(err::FailedToReadBlockHeader)?;
1613            state.bytes_read_counter += u64::from(hsize);
1614            // Pre-flight FCS check ONLY for Raw / RLE blocks where
1615            // `decompressed_size` is the actual block output size.
1616            // For Compressed blocks the header field is 0; the
1617            // post-decode check below catches overflow via the
1618            // backend's actual write counter delta.
1619            let block_upper = u64::from(block_header.decompressed_size);
1620            if block_upper > 0 && produced + block_upper > content_size {
1621                // Frame is corrupt — Raw/RLE block headers claim
1622                // more output than the FCS allows.
1623                return Err(err::FrameContentSizeMismatch {
1624                    declared: content_size,
1625                    produced: produced + block_upper,
1626                });
1627            }
1628            // Slice-source fast path: consume the block body
1629            // straight from `input` without copying into the
1630            // persistent `block_content_buffer`.
1631            let body_consumed = match block_dec.decode_block_content_from_slice(
1632                &block_header,
1633                &mut direct,
1634                &mut *input,
1635            ) {
1636                Ok(n) => n,
1637                // Defense-in-depth: RLE / Raw block whose declared
1638                // `decompressed_size` slipped past the per-block
1639                // pre-flight above and tripped the backend's
1640                // fallible write surface.
1641                Err(crate::decoding::errors::DecodeBlockContentError::BackendOverflow {
1642                    ..
1643                }) => {
1644                    // Use saturating_add on the
1645                    // `produced + decompressed_size` sum. Each block
1646                    // is bounded by 128 KiB (MAX_BLOCK_SIZE), but
1647                    // accumulated `produced` can grow toward
1648                    // u64::MAX across adversarial frames. Saturating
1649                    // avoids a panic on the error path itself.
1650                    return Err(err::FrameContentSizeMismatch {
1651                        declared: content_size,
1652                        produced: produced
1653                            .saturating_add(u64::from(block_header.decompressed_size)),
1654                    });
1655                }
1656                Err(e) => return Err(err::FailedToReadBlockBody(e)),
1657            };
1658            produced = direct.buffer.buffer_ref().tail() as u64;
1659            // Post-decode FCS overflow check.
1660            if produced > content_size {
1661                return Err(err::FrameContentSizeMismatch {
1662                    declared: content_size,
1663                    produced,
1664                });
1665            }
1666            state.bytes_read_counter += body_consumed;
1667            state.block_counter += 1;
1668            #[cfg(all(feature = "lsm", feature = "hash"))]
1669            if let Some(produced_before) = produced_before {
1670                block_ranges.push((produced_before, produced as usize));
1671            }
1672            // Cap the visible buffer at window_size between blocks
1673            // so the next block's match-offset validation matches
1674            // the spec's `offset <= window_size` rule.
1675            direct.buffer.drop_to_window_size();
1676            if block_header.last_block {
1677                if state.frame_header.descriptor.content_checksum_flag() {
1678                    let mut chksum = [0u8; 4];
1679                    input
1680                        .read_exact(&mut chksum)
1681                        .map_err(err::FailedToReadChecksum)?;
1682                    state.bytes_read_counter += 4;
1683                    state.check_sum = Some(u32::from_le_bytes(chksum));
1684                }
1685                break;
1686            }
1687        }
1688        // Final sanity: blocks summed to exactly `content_size`.
1689        if produced != content_size {
1690            return Err(err::FrameContentSizeMismatch {
1691                declared: content_size,
1692                produced,
1693            });
1694        }
1695
1696        let written = content_size as usize;
1697        state.frame_finished = true;
1698        // Drop the stack-local DirectScratch (and its DecodeBuffer
1699        // borrow on `output`) so we can re-borrow `output` for the
1700        // hash pass below.
1701        drop(direct);
1702        // Per-block XXH64 (low 32 bits) over the captured ranges.
1703        // Mirrors `decode_blocks`' per-block hashing so the digests
1704        // vector stays identical regardless of which dispatch path
1705        // the frame took. Ranges were recorded inside the loop while
1706        // `direct` held a mutable borrow on `output`; now that the
1707        // borrow is dropped we can read the slices directly.
1708        #[cfg(all(feature = "lsm", feature = "hash"))]
1709        if self.per_block_checksums_enabled {
1710            use core::hash::Hasher;
1711            for (start, end) in &block_ranges {
1712                let mut h = twox_hash::XxHash64::with_seed(0);
1713                h.write(&output[*start..*end]);
1714                self.computed_block_checksums.push(h.finish() as u32);
1715            }
1716        }
1717        #[cfg(feature = "hash")]
1718        {
1719            // Direct path bypasses the per-write hash accounting
1720            // (DecodeBuffer hashes during drain; the direct path
1721            // never drains because the user slice IS the buffer).
1722            // Walk the decoded output once and propagate the
1723            // resulting hasher state into the persistent scratch's
1724            // buffer so `get_calculated_checksum()` returns the
1725            // right value path-independently.
1726            use core::hash::Hasher;
1727            let mut hasher = twox_hash::XxHash64::with_seed(0);
1728            hasher.write(&output[..written]);
1729            match &mut state.decoder_scratch {
1730                DecoderScratchKind::Flat(s) => s.buffer.hash = hasher,
1731                DecoderScratchKind::Ring(s) => s.buffer.hash = hasher,
1732            }
1733        }
1734        Ok(written)
1735    }
1736}
1737
1738/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
1739/// this will retain window_size bytes, else it will drain it completely
1740impl Read for FrameDecoder {
1741    fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
1742        let state = match &mut self.state {
1743            None => return Ok(0),
1744            Some(s) => s,
1745        };
1746        if state.frame_finished {
1747            state.decoder_scratch.buffer_read_all(target)
1748        } else {
1749            state.decoder_scratch.buffer_read(target)
1750        }
1751    }
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756    extern crate std;
1757
1758    use super::{DictionaryHandle, FrameDecoder};
1759    use crate::encoding::{CompressionLevel, FrameCompressor};
1760    use alloc::vec::Vec;
1761
1762    #[test]
1763    fn decode_all_legacy_drain_matches_direct_path_on_single_segment_frame() {
1764        // Roundtrip a small payload through the encoder, then decode
1765        // it via `decode_all` on two output shapes that select
1766        // different internal paths:
1767        //   1. Tight output (no WILDCOPY_OVERLENGTH slack) → legacy
1768        //      `decode_blocks` + `read()` drain path.
1769        //   2. Output with WILDCOPY slack → direct
1770        //      `run_direct_decode` + `UserSliceBackend` path.
1771        // Both paths must produce identical output bytes — the only
1772        // difference is the internal buffer/drain shape, not the
1773        // decoded semantics. This is the regression gate for the
1774        // direct-decode wiring.
1775        let payload: Vec<u8> = (0..4096u32).map(|i| (i & 0xFF) as u8).collect();
1776        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1777        compressor.set_source(payload.as_slice());
1778        let mut compressed = Vec::new();
1779        compressor.set_drain(&mut compressed);
1780        compressor.compress();
1781
1782        // Baseline: tight output → legacy drain path.
1783        let mut dec_a = FrameDecoder::new();
1784        let mut out_a = alloc::vec![0u8; payload.len()];
1785        let n_a = dec_a
1786            .decode_all(compressed.as_slice(), &mut out_a)
1787            .expect("decode_all (legacy drain) should succeed");
1788        assert_eq!(n_a, payload.len());
1789        assert_eq!(&out_a[..n_a], payload.as_slice());
1790
1791        // Direct: output with WILDCOPY slack → direct path.
1792        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1793        let mut dec_b = FrameDecoder::new();
1794        let mut out_b = alloc::vec![0u8; payload.len() + slack];
1795        let n_b = dec_b
1796            .decode_all(compressed.as_slice(), &mut out_b)
1797            .expect("decode_all (direct path) should succeed");
1798        assert_eq!(
1799            n_b,
1800            payload.len(),
1801            "direct decode produced wrong byte count"
1802        );
1803        assert_eq!(&out_b[..n_b], payload.as_slice());
1804    }
1805
1806    #[test]
1807    fn decode_all_multi_segment_frame_decodes_correctly() {
1808        // Multi-segment frame: payload large enough that the
1809        // encoder's default frame layout has `single_segment_flag =
1810        // false` and `window_size < frame_content_size`. The direct
1811        // path must cap the visible buffer at window_size after each
1812        // block (drop_to_window_size) so match-offset validation
1813        // matches the spec rule `offset <= window_size`, and still
1814        // produce the same bytes as decode_all on the
1815        // FlatBuf/Ring-backed path.
1816        //
1817        // Make the payload structured so multi-segment behavior
1818        // actually kicks in: 2 MiB of repeating + random-ish bytes
1819        // forces window_size lower than content_size at the encoder.
1820        let mut payload: Vec<u8> = Vec::with_capacity(2 * 1024 * 1024);
1821        for i in 0..payload.capacity() {
1822            payload.push((i.wrapping_mul(2_654_435_761) & 0xFF) as u8);
1823        }
1824        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1825        compressor.set_source(payload.as_slice());
1826        let mut compressed = Vec::new();
1827        compressor.set_drain(&mut compressed);
1828        compressor.compress();
1829
1830        // Baseline: decode_all through the FlatBuf+drain path.
1831        let mut dec_a = FrameDecoder::new();
1832        let mut out_a = alloc::vec![0u8; payload.len()];
1833        let n_a = dec_a
1834            .decode_all(compressed.as_slice(), &mut out_a)
1835            .expect("decode_all should succeed");
1836        assert_eq!(n_a, payload.len());
1837        assert_eq!(&out_a[..n_a], payload.as_slice());
1838
1839        // Direct path: must give identical bytes via UserSliceBackend
1840        // + per-block drop_to_window_size.
1841        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1842        let mut dec_b = FrameDecoder::new();
1843        let mut out_b = alloc::vec![0u8; payload.len() + slack];
1844        let n_b = dec_b
1845            .decode_all(compressed.as_slice(), &mut out_b)
1846            .expect("decode_all should succeed on multi-segment frame");
1847        assert_eq!(n_b, payload.len(), "wrong byte count on direct path");
1848        assert_eq!(&out_b[..n_b], payload.as_slice());
1849
1850        // Sanity-check: confirm the encoded frame really IS
1851        // multi-segment. If a future encoder default changes,
1852        // catching the assumption here is better than silently
1853        // testing single_segment on this name.
1854        let mut sanity = FrameDecoder::new();
1855        sanity.init(&mut compressed.as_slice()).unwrap();
1856        assert!(
1857            !sanity
1858                .state
1859                .as_ref()
1860                .unwrap()
1861                .frame_header
1862                .descriptor
1863                .single_segment_flag(),
1864            "test precondition violated: frame is single-segment, rename or resize"
1865        );
1866    }
1867
1868    #[cfg(feature = "hash")]
1869    #[test]
1870    fn decode_all_propagates_checksum_into_persistent_scratch() {
1871        // Direct path on a checksum-flagged frame: the FrameCompressor
1872        // under `feature = "hash"` sets content_checksum_flag, so the
1873        // decoded frame has a recorded checksum. After
1874        // decode_all we must be able to verify it matches via
1875        // the public get_calculated_checksum() accessor — the digest
1876        // is computed by walking output at end of decode and stored
1877        // into the persistent scratch's hasher.
1878        let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
1879        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1880        compressor.set_source(payload.as_slice());
1881        let mut compressed = Vec::new();
1882        compressor.set_drain(&mut compressed);
1883        compressor.compress();
1884
1885        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1886        let mut dec = FrameDecoder::new();
1887        let mut out = alloc::vec![0u8; payload.len() + slack];
1888        let n = dec
1889            .decode_all(compressed.as_slice(), &mut out)
1890            .expect("decode_all with checksum must succeed");
1891        assert_eq!(n, payload.len());
1892        assert_eq!(&out[..n], payload.as_slice());
1893
1894        // Both sides must report the same checksum: the frame header
1895        // carries the stored u32, and get_calculated_checksum reads
1896        // the running digest the direct path just propagated.
1897        let stored = dec.get_checksum_from_data();
1898        let calculated = dec.get_calculated_checksum();
1899        assert!(stored.is_some(), "frame must carry stored checksum");
1900        assert!(
1901            calculated.is_some(),
1902            "direct path must propagate calculated checksum"
1903        );
1904        assert_eq!(
1905            stored, calculated,
1906            "stored vs calculated checksum mismatch on direct path"
1907        );
1908    }
1909
1910    #[test]
1911    fn decode_all_fcs_overflow_via_corrupt_frame_returns_structured_error() {
1912        // Hand-build a corrupt frame that declares
1913        // frame_content_size = 4 but the (last) block carries a
1914        // larger Raw payload. The pre-flight FCS check inside the
1915        // direct path's block loop catches this and returns the
1916        // structured FrameContentSizeMismatch variant — not a
1917        // panic, not a generic TargetTooSmall.
1918        //
1919        // Frame layout (single_segment, FCS=4):
1920        //   magic            4 bytes  0xFD2FB528
1921        //   FHD              1 byte   single_segment=1, no checksum,
1922        //                              FCS field size = 0 (-> 1-byte FCS)
1923        //   FCS              1 byte   0x04
1924        //   block_header     3 bytes  last=1, type=Raw, block_size=10
1925        //   block_payload    10 bytes 0xAA repeated
1926        let mut frame = alloc::vec::Vec::new();
1927        // magic
1928        frame.extend_from_slice(&0xFD2FB528u32.to_le_bytes());
1929        // FHD: single_segment=1, fcs_flag=0 (1-byte FCS), no checksum,
1930        // no dict. Bit layout: FCS(7-6)=0, single_segment(5)=1,
1931        // reserved/uncs(4)=0, content_checksum(2)=0, dict(0-1)=00.
1932        frame.push(0b0010_0000);
1933        // FCS: 1 byte
1934        frame.push(4);
1935        // Block header: cBlockSize=10, type=Raw (0), last=1
1936        // 3-byte LE: bit0=last, bits1-2=type(2 bits), bits3-23=size
1937        let cblock_size: u32 = 10;
1938        let bh: u32 = 1 | (cblock_size << 3); // last=1, type=Raw=0
1939        frame.push((bh & 0xFF) as u8);
1940        frame.push((bh >> 8) as u8);
1941        frame.push((bh >> 16) as u8);
1942        // Payload — 10 bytes that, if decoded, would exceed FCS=4.
1943        frame.extend(core::iter::repeat_n(0xAAu8, 10));
1944
1945        let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1946        let mut dec = FrameDecoder::new();
1947        let mut out = alloc::vec![0u8; 4 + slack];
1948        let err = dec
1949            .decode_all(&frame, &mut out)
1950            .expect_err("FCS-overflow frame must fail decode");
1951        assert!(
1952            matches!(
1953                err,
1954                super::FrameDecoderError::FrameContentSizeMismatch { .. }
1955            ),
1956            "expected FrameContentSizeMismatch, got {:?}",
1957            err
1958        );
1959    }
1960
1961    #[test]
1962    fn decode_all_falls_back_when_output_too_small_for_wildcopy_slack() {
1963        // Output sized exactly to frame_content_size (no
1964        // WILDCOPY_OVERLENGTH slack) must NOT trigger the direct
1965        // path — the burst's `extend_from_within_unchecked` writes
1966        // past `tail` into the slack region. Direct dispatcher
1967        // recognises this and falls back to the FlatBuf + drain
1968        // path which still produces the right output.
1969        let payload: Vec<u8> = (0..2048u32)
1970            .map(|i| (i.wrapping_mul(31) & 0xFF) as u8)
1971            .collect();
1972        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1973        compressor.set_source(payload.as_slice());
1974        let mut compressed = Vec::new();
1975        compressor.set_drain(&mut compressed);
1976        compressor.compress();
1977
1978        let mut dec = FrameDecoder::new();
1979        // Exactly payload.len(), no slack — direct path is gated out.
1980        let mut out = alloc::vec![0u8; payload.len()];
1981        let n = dec
1982            .decode_all(compressed.as_slice(), &mut out)
1983            .expect("decode_all should still succeed via fallback");
1984        assert_eq!(n, payload.len());
1985        assert_eq!(&out[..n], payload.as_slice());
1986    }
1987
1988    #[test]
1989    fn decode_all_fallback_validates_fcs_against_total_output() {
1990        // Synthetic single-segment frame: FCS = 20 bytes, but the
1991        // last-block flag fires after only 4 bytes of raw payload.
1992        // On the direct path this would trip the post-block
1993        // `produced > content_size` check; the fallback path
1994        // (eligible=false because output is sized exactly to FCS,
1995        // no WILDCOPY slack) used to silently return Ok(4). With
1996        // the fix it now surfaces `FrameContentSizeMismatch`
1997        // matching the direct path.
1998        //
1999        // Frame layout: 4 B magic | 1 B FHD (single_segment=1,
2000        // FCS_flag=3 → 8-byte FCS) | 8 B FCS=20 | block header
2001        // (Raw, last, size=4) | 4 raw bytes.
2002        let mut wire = Vec::new();
2003        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes()); // magic
2004        // FHD: FCS_flag=3 (8-byte FCS) <<6 | single_segment=1 <<5.
2005        wire.push(0b1110_0000);
2006        wire.extend_from_slice(&20u64.to_le_bytes()); // declared FCS
2007        // Block header: (size << 3) | (block_type << 1) | last_block.
2008        // Raw block (block_type=0), last_block=1, size=4 → 0b00100001 = 0x21.
2009        wire.push(0x21);
2010        wire.push(0x00);
2011        wire.push(0x00);
2012        wire.extend_from_slice(&[1u8, 2, 3, 4]);
2013
2014        let mut dec = FrameDecoder::new();
2015        // Size output exactly at declared FCS (no WILDCOPY slack)
2016        // so the eligibility check gates the direct path out.
2017        let mut out = alloc::vec![0u8; 20];
2018        let err = dec
2019            .decode_all(wire.as_slice(), &mut out)
2020            .expect_err("fallback must reject corrupt FCS underflow");
2021        match err {
2022            crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2023                declared,
2024                produced,
2025            } => {
2026                assert_eq!(declared, 20);
2027                assert_eq!(produced, 4);
2028            }
2029            other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2030        }
2031    }
2032
2033    #[test]
2034    fn decode_all_fallback_treats_explicit_fcs_zero_as_declared() {
2035        // Synthetic multi-segment frame with FCS_flag=2 (4-byte
2036        // FCS) explicitly set to 0. The header DECLARES zero
2037        // content, but the body carries a 5-byte raw last-block.
2038        // `fcs_declared()` must return true (the field is on the
2039        // wire) so the fallback's post-decode size check sees the
2040        // mismatch — even though `frame_content_size == 0`. This
2041        // is exactly the FCS=0 edge case where the previous
2042        // `content_size > 0` proxy would have silently accepted
2043        // the corrupt frame.
2044        //
2045        // Frame layout:
2046        //   4 B magic            — 28 B5 2F FD
2047        //   1 B FHD              — FCS_flag=2 (bits 7-6), no
2048        //                          single_segment, content_checksum=0,
2049        //                          dict_id_flag=0 → 0b1000_0000
2050        //   1 B window_descriptor — exp=10, mantissa=0 → window=1 MiB
2051        //   4 B FCS              — 0 LE
2052        //   3 B block header     — raw, last, size=5 → 0x29 0x00 0x00
2053        //   5 B raw payload      — anything non-empty
2054        let mut wire = Vec::new();
2055        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2056        wire.push(0b1000_0000); // FHD: FCS_flag=2, others 0.
2057        wire.push(0x50); // window_descriptor: exp=10, mantissa=0.
2058        wire.extend_from_slice(&0u32.to_le_bytes()); // FCS = 0.
2059        // Block header (24-bit LE): (size << 3) | (block_type << 1) | last_block
2060        // = (5 << 3) | (0 << 1) | 1 = 0x29.
2061        wire.push(0x29);
2062        wire.push(0x00);
2063        wire.push(0x00);
2064        wire.extend_from_slice(&[1u8, 2, 3, 4, 5]);
2065
2066        let mut dec = FrameDecoder::new();
2067        // FCS=0 declared, so eligibility (`content_size > 0`)
2068        // false — falls through to the drain loop. Output buffer
2069        // size doesn't matter for the eligibility check here;
2070        // give it some room so `read()` can drain the block.
2071        let mut out = alloc::vec![0u8; 16];
2072        let err = dec
2073            .decode_all(wire.as_slice(), &mut out)
2074            .expect_err("corrupt FCS=0 + 5-byte block must error");
2075        match err {
2076            crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2077                declared,
2078                produced,
2079            } => {
2080                assert_eq!(declared, 0);
2081                assert_eq!(produced, 5);
2082            }
2083            other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2084        }
2085    }
2086
2087    #[test]
2088    fn decode_all_fallback_accepts_honest_explicit_fcs_zero() {
2089        // Companion to the corrupt-FCS=0 test above: an HONEST
2090        // empty frame with FCS_flag=2 (4-byte FCS) explicitly set
2091        // to 0 AND a 0-byte raw last-block. `fcs_declared()`
2092        // returns true and `content_size == 0 == total_written`,
2093        // so the fallback validation accepts the frame instead of
2094        // misreporting a mismatch.
2095        //
2096        // (Single-segment FCS=0 would test a similar invariant
2097        // but trips header-stage validation: `window_size =
2098        // frame_content_size = 0 < MIN_WINDOW_SIZE` fails the
2099        // window-size sanity check before decode runs. Use the
2100        // multi-segment shape where `window_size` comes from
2101        // `window_descriptor` independently of FCS.)
2102        //
2103        // Frame layout:
2104        //   4 B magic
2105        //   1 B FHD              — FCS_flag=2, others 0 → 0x80
2106        //   1 B window_descriptor — exp=10 → 1 MiB window
2107        //   4 B FCS              — 0 LE
2108        //   3 B block header     — raw, last, size=0 → 0x01 0x00 0x00
2109        let mut wire = Vec::new();
2110        wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2111        wire.push(0b1000_0000);
2112        wire.push(0x50);
2113        wire.extend_from_slice(&0u32.to_le_bytes());
2114        // Block header: (0 << 3) | (0 << 1) | 1 = 0x01.
2115        wire.push(0x01);
2116        wire.push(0x00);
2117        wire.push(0x00);
2118
2119        let mut dec = FrameDecoder::new();
2120        let mut out = alloc::vec![0u8; 16];
2121        let n = dec
2122            .decode_all(wire.as_slice(), &mut out)
2123            .expect("honest FCS=0 + empty block must succeed");
2124        assert_eq!(n, 0);
2125    }
2126
2127    #[test]
2128    fn reset_with_dict_handle_applies_dict_when_no_dict_id() {
2129        let payload = b"reset-without-dict-id";
2130        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2131        compressor.set_source(payload.as_slice());
2132        let mut compressed = Vec::new();
2133        compressor.set_drain(&mut compressed);
2134        compressor.compress();
2135
2136        let dict_raw = include_bytes!("../../dict_tests/dictionary");
2137        let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
2138
2139        let mut decoder = FrameDecoder::new();
2140        decoder
2141            .reset_with_dict_handle(compressed.as_slice(), &handle)
2142            .expect("reset should succeed");
2143        let state = decoder.state.as_ref().expect("state should be initialized");
2144        assert!(state.frame_header.dictionary_id().is_none());
2145        assert_eq!(state.using_dict, Some(handle.id()));
2146    }
2147
2148    #[cfg(feature = "lsm")]
2149    mod expect_validation {
2150        use super::*;
2151        use crate::decoding::errors::FrameDecoderError;
2152
2153        fn compress(payload: &[u8]) -> Vec<u8> {
2154            let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2155            compressor.set_source(payload);
2156            let mut compressed = Vec::new();
2157            compressor.set_drain(&mut compressed);
2158            compressor.compress();
2159            compressed
2160        }
2161
2162        fn compress_with_dict(payload: &[u8], dict_raw: &[u8]) -> Vec<u8> {
2163            let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2164            compressor
2165                .set_dictionary_from_bytes(dict_raw)
2166                .expect("dict load");
2167            compressor.set_source(payload);
2168            let mut compressed = Vec::new();
2169            compressor.set_drain(&mut compressed);
2170            compressor.compress();
2171            compressed
2172        }
2173
2174        #[test]
2175        fn expect_dict_id_none_default_allows_anything() {
2176            let compressed = compress(b"hello-no-expect");
2177            let mut decoder = FrameDecoder::new();
2178            decoder
2179                .reset(compressed.as_slice())
2180                .expect("default None passes");
2181        }
2182
2183        #[test]
2184        fn expect_dict_id_zero_matches_frame_without_dict_id() {
2185            // Default-encoded frame has no dict_id; pinning Some(0)
2186            // ("no dictionary expected") must accept it.
2187            let compressed = compress(b"payload");
2188            let mut decoder = FrameDecoder::new();
2189            decoder.expect_dict_id(Some(0));
2190            decoder
2191                .reset(compressed.as_slice())
2192                .expect("Some(0) ~ None");
2193        }
2194
2195        #[test]
2196        fn expect_dict_id_matching_value_passes() {
2197            let dict_raw = include_bytes!("../../dict_tests/dictionary");
2198            let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2199            let actual_id = handle.id();
2200
2201            let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2202
2203            let mut decoder = FrameDecoder::new();
2204            decoder.expect_dict_id(Some(actual_id));
2205            // Decode requires the dict to be registered; using
2206            // reset_with_dict_handle for that.
2207            decoder
2208                .reset_with_dict_handle(compressed.as_slice(), &handle)
2209                .expect("matching dict_id passes");
2210        }
2211
2212        #[test]
2213        fn expect_dict_id_mismatching_value_fails_before_decode() {
2214            let dict_raw = include_bytes!("../../dict_tests/dictionary");
2215            let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2216            let actual_id = handle.id();
2217            let wrong_id = actual_id.wrapping_add(1);
2218
2219            let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2220
2221            let mut decoder = FrameDecoder::new();
2222            decoder.expect_dict_id(Some(wrong_id));
2223            let err = decoder
2224                .reset_with_dict_handle(compressed.as_slice(), &handle)
2225                .expect_err("mismatch must fail");
2226            match err {
2227                FrameDecoderError::UnexpectedDictId { expected, found } => {
2228                    assert_eq!(expected, Some(wrong_id));
2229                    assert_eq!(found, Some(actual_id));
2230                }
2231                other => panic!("expected UnexpectedDictId, got {other:?}"),
2232            }
2233        }
2234
2235        #[test]
2236        fn expect_dict_id_nonzero_fails_on_frame_without_dict_id() {
2237            // Frame has no dict_id; expecting Some(42) (non-zero)
2238            // must fail with found = None.
2239            let compressed = compress(b"no-dict-frame");
2240            let mut decoder = FrameDecoder::new();
2241            decoder.expect_dict_id(Some(42));
2242            let err = decoder
2243                .reset(compressed.as_slice())
2244                .expect_err("nonzero expectation on dictless frame must fail");
2245            match err {
2246                FrameDecoderError::UnexpectedDictId { expected, found } => {
2247                    assert_eq!(expected, Some(42));
2248                    assert_eq!(found, None);
2249                }
2250                other => panic!("expected UnexpectedDictId, got {other:?}"),
2251            }
2252        }
2253
2254        #[test]
2255        fn expect_window_descriptor_none_default_allows_anything() {
2256            let compressed = compress(b"hello-no-wd-expect");
2257            let mut decoder = FrameDecoder::new();
2258            decoder
2259                .reset(compressed.as_slice())
2260                .expect("default None passes");
2261        }
2262
2263        #[test]
2264        fn expect_window_descriptor_mismatch_fails_before_decode() {
2265            // Compress a payload large enough to force a
2266            // multi-segment frame (window_descriptor on wire).
2267            // Default compression at >256 KiB produces multi-
2268            // segment frames with a real window_descriptor byte.
2269            let payload = alloc::vec![0xABu8; 512 * 1024];
2270            let compressed = compress(&payload);
2271
2272            // Read the actual window_descriptor by decoding once
2273            // without expectations, then pin a wrong value.
2274            let mut probe_decoder = FrameDecoder::new();
2275            probe_decoder.reset(compressed.as_slice()).unwrap();
2276            let probe_state = probe_decoder.state.as_ref().unwrap();
2277            let actual_wd = probe_state
2278                .frame_header
2279                .window_descriptor()
2280                .expect("multi-segment frame should expose window_descriptor");
2281            let wrong_wd = actual_wd.wrapping_add(0x10); // bump exponent
2282
2283            let mut decoder = FrameDecoder::new();
2284            decoder.expect_window_descriptor(Some(wrong_wd));
2285            let err = decoder
2286                .reset(compressed.as_slice())
2287                .expect_err("wrong window_descriptor must fail");
2288            match err {
2289                FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2290                    assert_eq!(expected, wrong_wd);
2291                    assert_eq!(found, Some(actual_wd));
2292                }
2293                other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2294            }
2295        }
2296
2297        /// Build a minimal synthetic single-segment zstd frame
2298        /// carrying a 4-byte raw payload. RFC 8878 §3.1.1.1
2299        /// layout, hand-rolled because our default
2300        /// `FrameCompressor` settings don't emit
2301        /// `single_segment_flag` for tiny inputs.
2302        ///
2303        /// Wire bytes (13 total for 4-byte payload):
2304        /// ```text
2305        /// 28 B5 2F FD       magic
2306        /// 20                FHD: single_segment=1, FCS_flag=0
2307        /// 04                FCS (single byte, value = payload.len())
2308        /// 21 00 00          block header: raw, last, size=4
2309        /// .. .. .. ..       payload bytes
2310        /// ```
2311        fn synth_single_segment_frame(payload: &[u8]) -> Vec<u8> {
2312            assert!(payload.len() <= 255, "1-byte FCS field caps at 255");
2313            assert!(payload.len() < (1usize << 21), "block size 21-bit max");
2314            let mut out = Vec::new();
2315            // Magic 0xFD2FB528 LE.
2316            out.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2317            // FHD: single_segment_flag (bit 5) set, everything
2318            // else zero. With single_segment + FCS_flag=0 the FCS
2319            // field is 1 byte. No window_descriptor on wire.
2320            out.push(0b0010_0000);
2321            // 1-byte FCS = payload length.
2322            out.push(payload.len() as u8);
2323            // Block header (3 bytes LE):
2324            // last_block=1, block_type=0 (Raw), block_size=payload.len().
2325            // Encoded: (size << 3) | (block_type << 1) | last_block.
2326            // Block header: last_block flag in bit 0, block_type
2327            // (0 = Raw) in bits 1-2, block size in bits 3+.
2328            let bh: u32 = ((payload.len() as u32) << 3) | 1;
2329            out.push((bh & 0xFF) as u8);
2330            out.push(((bh >> 8) & 0xFF) as u8);
2331            out.push(((bh >> 16) & 0xFF) as u8);
2332            // Raw payload.
2333            out.extend_from_slice(payload);
2334            out
2335        }
2336
2337        #[test]
2338        fn expect_window_descriptor_on_single_segment_frame_fails_with_found_none() {
2339            // Single-segment frames omit the window_descriptor
2340            // byte from the wire entirely. Setting an expectation
2341            // here must surface `found: None` so callers
2342            // distinguish "wrong descriptor" from "no descriptor
2343            // on the wire" — never silently pass.
2344            let compressed = synth_single_segment_frame(b"tiny");
2345
2346            // First sanity-check: the synthetic frame decodes
2347            // cleanly without any expectation.
2348            {
2349                let mut probe = FrameDecoder::new();
2350                probe
2351                    .reset(compressed.as_slice())
2352                    .expect("synth frame parses");
2353                let probe_state = probe.state.as_ref().unwrap();
2354                assert!(
2355                    probe_state.frame_header.window_descriptor().is_none(),
2356                    "synth frame must be single-segment"
2357                );
2358            }
2359
2360            let mut decoder = FrameDecoder::new();
2361            decoder.expect_window_descriptor(Some(0x40));
2362            let err = decoder
2363                .reset(compressed.as_slice())
2364                .expect_err("single-segment + expectation must fail");
2365            match err {
2366                FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2367                    assert_eq!(expected, 0x40);
2368                    assert_eq!(found, None);
2369                }
2370                other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2371            }
2372        }
2373
2374        #[test]
2375        fn validation_failure_leaves_decoder_re_resettable() {
2376            // After UnexpectedDictId on a wrong-expectation reset,
2377            // clearing the expectation and re-calling reset must
2378            // succeed on the same source — no lingering failed
2379            // state.
2380            let compressed = compress(b"re-resettable");
2381
2382            let mut decoder = FrameDecoder::new();
2383            decoder.expect_dict_id(Some(42));
2384            let err = decoder
2385                .reset(compressed.as_slice())
2386                .expect_err("first reset fails");
2387            assert!(matches!(err, FrameDecoderError::UnexpectedDictId { .. }));
2388
2389            // Clear expectation and retry on a fresh source.
2390            decoder.expect_dict_id(None);
2391            decoder
2392                .reset(compressed.as_slice())
2393                .expect("retry after clearing expectation should succeed");
2394        }
2395    }
2396
2397    /// Build a skippable frame on the wire: 4-byte LE magic + 4-byte LE
2398    /// length + payload bytes. RFC 8878 §3.1.2 restricts the magic
2399    /// variant to `0..=15`; assert here so accidental misuse of the
2400    /// helper can't smuggle a non-skippable magic past the tests.
2401    #[cfg(feature = "lsm")]
2402    fn build_skippable_frame(variant: u8, payload: &[u8]) -> Vec<u8> {
2403        assert!(
2404            variant <= 15,
2405            "skippable-frame variant {variant} outside RFC 8878 0..=15 range",
2406        );
2407        let mut out = Vec::with_capacity(8 + payload.len());
2408        let magic: u32 = 0x184D2A50 + u32::from(variant);
2409        out.extend_from_slice(&magic.to_le_bytes());
2410        out.extend_from_slice(&u32::try_from(payload.len()).unwrap().to_le_bytes());
2411        out.extend_from_slice(payload);
2412        out
2413    }
2414
2415    #[cfg(feature = "lsm")]
2416    #[test]
2417    fn decode_all_with_skippable_visitor_sees_payloads_in_order() {
2418        // Build a stream: skippable(v0, "alpha") + zstd_frame +
2419        // skippable(v3, "beta") + zstd_frame + skippable(v15, "")
2420        // and verify the visitor is invoked exactly three times with
2421        // the correct (variant, payload) pairs in stream order while
2422        // the zstd frames decode normally.
2423        let payload_a: Vec<u8> = (0..256u16).map(|i| i as u8).collect();
2424        let payload_b: Vec<u8> = (0..256u16).map(|i| (i ^ 0xAA) as u8).collect();
2425
2426        let mut comp_a = Vec::new();
2427        let mut c = FrameCompressor::new(CompressionLevel::Default);
2428        c.set_source(payload_a.as_slice());
2429        c.set_drain(&mut comp_a);
2430        c.compress();
2431
2432        let mut comp_b = Vec::new();
2433        let mut c = FrameCompressor::new(CompressionLevel::Default);
2434        c.set_source(payload_b.as_slice());
2435        c.set_drain(&mut comp_b);
2436        c.compress();
2437
2438        let skip0 = build_skippable_frame(0, b"alpha");
2439        let skip3 = build_skippable_frame(3, b"beta");
2440        let skip15 = build_skippable_frame(15, &[]);
2441
2442        let mut stream = Vec::new();
2443        stream.extend_from_slice(&skip0);
2444        stream.extend_from_slice(&comp_a);
2445        stream.extend_from_slice(&skip3);
2446        stream.extend_from_slice(&comp_b);
2447        stream.extend_from_slice(&skip15);
2448
2449        let mut decoder = FrameDecoder::new();
2450        let mut out = alloc::vec![0u8; payload_a.len() + payload_b.len()];
2451        let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
2452        let n = decoder
2453            .decode_all_with_skippable_visitor(stream.as_slice(), &mut out, |variant, payload| {
2454                collected.push((variant, payload.to_vec()));
2455            })
2456            .expect("decode_all_with_skippable_visitor should succeed");
2457
2458        // All three skippables visited in stream order.
2459        assert_eq!(collected.len(), 3);
2460        assert_eq!(collected[0], (0u8, b"alpha".to_vec()));
2461        assert_eq!(collected[1], (3u8, b"beta".to_vec()));
2462        assert_eq!(collected[2], (15u8, Vec::<u8>::new()));
2463
2464        // Both zstd frames decoded into `out` back-to-back.
2465        assert_eq!(n, payload_a.len() + payload_b.len());
2466        assert_eq!(&out[..payload_a.len()], payload_a.as_slice());
2467        assert_eq!(&out[payload_a.len()..n], payload_b.as_slice());
2468    }
2469
2470    #[cfg(feature = "lsm")]
2471    #[test]
2472    fn decode_all_silently_skips_when_no_visitor() {
2473        // Regression gate: plain decode_all must still silently skip
2474        // skippable frames (RFC 8878 mandated behavior) with no
2475        // behavioral change after the visitor refactor.
2476        let payload: Vec<u8> = (0..512u16).map(|i| i as u8).collect();
2477        let mut comp = Vec::new();
2478        let mut c = FrameCompressor::new(CompressionLevel::Default);
2479        c.set_source(payload.as_slice());
2480        c.set_drain(&mut comp);
2481        c.compress();
2482
2483        let skip = build_skippable_frame(7, b"ignored sidecar");
2484        let mut stream = Vec::new();
2485        stream.extend_from_slice(&skip);
2486        stream.extend_from_slice(&comp);
2487
2488        let mut decoder = FrameDecoder::new();
2489        let mut out = alloc::vec![0u8; payload.len()];
2490        let n = decoder
2491            .decode_all(stream.as_slice(), &mut out)
2492            .expect("decode_all should succeed on skippable + zstd stream");
2493        assert_eq!(n, payload.len());
2494        assert_eq!(&out[..n], payload.as_slice());
2495    }
2496
2497    #[cfg(feature = "lsm")]
2498    #[test]
2499    fn frame_emit_info_describes_emitted_block_layout() {
2500        // Encode a payload large enough to force >1 block, fetch
2501        // FrameEmitInfo, walk blocks[] and verify each block's
2502        // (offset_in_frame, header_size, body_size) matches the bytes
2503        // actually emitted into the drain buffer.
2504        let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2505        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2506        compressor.set_source(payload.as_slice());
2507        let mut compressed = Vec::new();
2508        compressor.set_drain(&mut compressed);
2509        compressor.compress();
2510
2511        let info = compressor
2512            .last_frame_emit_info()
2513            .expect("last_frame_emit_info populated after compress")
2514            .clone();
2515        drop(compressor);
2516
2517        // Frame header range starts at 0 and is non-empty.
2518        assert_eq!(info.frame_header_range.start, 0);
2519        assert!(info.frame_header_range.end > 0);
2520        // Total size matches what was written to the drain.
2521        assert_eq!(info.total_size as usize, compressed.len());
2522        // At least one block, and the last entry has last_block=true.
2523        assert!(!info.blocks.is_empty());
2524        assert!(info.blocks.last().unwrap().last_block);
2525        // All non-final blocks have last_block=false.
2526        for b in &info.blocks[..info.blocks.len() - 1] {
2527            assert!(!b.last_block);
2528        }
2529        // Walk and verify each block's header bytes match the
2530        // recorded type / size by re-decoding the 3-byte header.
2531        // Walking arithmetic: offset_in_frame + header_size + body_size
2532        // must land exactly on the next block's offset_in_frame (or,
2533        // for the last block, on the checksum / end of frame).
2534        for (i, b) in info.blocks.iter().enumerate() {
2535            let off = b.offset_in_frame as usize;
2536            assert_eq!(b.header_size, 3);
2537            let mut hdr = [0u8; 4];
2538            hdr[..3].copy_from_slice(&compressed[off..off + 3]);
2539            let raw = u32::from_le_bytes(hdr);
2540            let last = (raw & 1) != 0;
2541            let ty = (raw >> 1) & 0b11;
2542            let sz = raw >> 3;
2543            assert_eq!(last, b.last_block);
2544            assert_eq!(sz, b.block_size_field);
2545            // body_size is the PHYSICAL length on the wire: spec's
2546            // Block_Size for Raw/Compressed, always 1 for RLE.
2547            let expected_physical = match b.block_type {
2548                crate::encoding::frame_emit_info::BlockType::RLE => 1,
2549                _ => sz,
2550            };
2551            assert_eq!(b.body_size, expected_physical);
2552            let expected_ty = match b.block_type {
2553                crate::encoding::frame_emit_info::BlockType::Raw => 0,
2554                crate::encoding::frame_emit_info::BlockType::RLE => 1,
2555                crate::encoding::frame_emit_info::BlockType::Compressed => 2,
2556                crate::encoding::frame_emit_info::BlockType::Reserved => 3,
2557            };
2558            assert_eq!(ty, expected_ty);
2559            // Walking-arithmetic invariant.
2560            let next_off = b.offset_in_frame + b.header_size as u32 + b.body_size;
2561            if let Some(next) = info.blocks.get(i + 1) {
2562                assert_eq!(
2563                    next_off, next.offset_in_frame,
2564                    "block {i} body_size doesn't reach next block's offset_in_frame",
2565                );
2566            } else if let Some(cs) = info.checksum_range.as_ref() {
2567                assert_eq!(
2568                    next_off, cs.start,
2569                    "last block body_size doesn't reach checksum_range.start",
2570                );
2571            } else {
2572                assert_eq!(
2573                    next_off, info.total_size,
2574                    "last block body_size doesn't reach total_size",
2575                );
2576            }
2577        }
2578        // Checksum range present iff `feature = "hash"` is enabled.
2579        assert_eq!(info.checksum_range.is_some(), cfg!(feature = "hash"));
2580    }
2581
2582    #[cfg(all(feature = "lsm", feature = "hash"))]
2583    #[test]
2584    fn per_block_checksum_round_trip() {
2585        // Encode with per-block checksums enabled. Decode with
2586        // per-block verification. Both sides emit exactly 1
2587        // checksum per physical block written to / read from the
2588        // wire (encoder hashes per emission site, including each
2589        // post-split partition; decoder hashes each decoded block).
2590        // Cardinality and element-wise contents must match
2591        // round-trip.
2592        let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2593        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2594        compressor.set_source(payload.as_slice());
2595        compressor.enable_per_block_checksums();
2596        let mut compressed = Vec::new();
2597        compressor.set_drain(&mut compressed);
2598        compressor.compress();
2599
2600        let encoder_checksums = compressor
2601            .last_frame_block_checksums()
2602            .expect("checksums populated after enable + compress")
2603            .to_vec();
2604        drop(compressor);
2605        assert!(!encoder_checksums.is_empty());
2606
2607        // Decode side: enable verification, decode, compare.
2608        let mut decoder = FrameDecoder::new();
2609        decoder.enable_per_block_checksums();
2610        let mut output = alloc::vec![0u8; payload.len()];
2611        let n = decoder
2612            .decode_all(compressed.as_slice(), &mut output)
2613            .expect("decode_all should succeed");
2614        assert_eq!(n, payload.len());
2615        assert_eq!(&output[..n], payload.as_slice());
2616
2617        let decoder_checksums = decoder.computed_block_checksums();
2618        assert_eq!(decoder_checksums, encoder_checksums.as_slice());
2619    }
2620}