structured_zstd/decoding/frame_decoder.rs
1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::block_decoder::BlockDecoder;
10use crate::decoding::buffer_backend::BufferBackend;
11use crate::decoding::decode_buffer::DecodeBuffer;
12use crate::decoding::dictionary::{Dictionary, DictionaryHandle};
13use crate::decoding::errors::{DecodeBlockContentError, FrameDecoderError};
14use crate::decoding::flat_buf::FlatBuf;
15use crate::decoding::ringbuffer::RingBuffer;
16use crate::decoding::scratch::DecoderScratch;
17use crate::io::{Error, Read, Write};
18use alloc::collections::BTreeMap;
19use alloc::vec::Vec;
20use core::convert::TryInto;
21
22use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
23
24/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
25///
26/// This decoder is able to decode frames only partially and gives control
27/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
28/// It reads bytes as needed from a provided source and can be read from to collect partial results.
29///
30/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
31/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
32///
33/// Workflow is as follows:
34/// ```
35/// use structured_zstd::decoding::BlockDecodingStrategy;
36///
37/// # #[cfg(feature = "std")]
38/// use std::io::{Read, Write};
39///
40/// // no_std environments can use the crate's own Read traits
41/// # #[cfg(not(feature = "std"))]
42/// use structured_zstd::io::{Read, Write};
43///
44/// fn decode_this(mut file: impl Read) {
45/// //Create a new decoder
46/// let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
47/// let mut result = Vec::new();
48///
49/// // Use reset or init to make the decoder ready to decode the frame from the io::Read
50/// frame_dec.reset(&mut file).unwrap();
51///
52/// // Loop until the frame has been decoded completely
53/// while !frame_dec.is_finished() {
54/// // decode (roughly) batch_size many bytes
55/// frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
56///
57/// // read from the decoder to collect bytes from the internal buffer
58/// let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
59///
60/// // then do something with it
61/// do_something(&result[0..bytes_read]);
62/// }
63///
64/// // handle the last chunk of data
65/// while frame_dec.can_collect() > 0 {
66/// let x = frame_dec.read(result.as_mut_slice()).unwrap();
67///
68/// do_something(&result[0..x]);
69/// }
70/// }
71///
72/// fn do_something(data: &[u8]) {
73/// # #[cfg(feature = "std")]
74/// std::io::stdout().write_all(data).unwrap();
75/// }
76/// ```
77pub struct FrameDecoder {
78 state: Option<FrameDecoderState>,
79 owned_dicts: BTreeMap<u32, Dictionary>,
80 #[cfg(target_has_atomic = "ptr")]
81 shared_dicts: BTreeMap<u32, DictionaryHandle>,
82 #[cfg(not(target_has_atomic = "ptr"))]
83 shared_dicts: (),
84 /// `ZSTD_f_zstd1_magicless` — when true, [`init`] / [`reset`]
85 /// expect frames without the 4-byte magic number prefix.
86 /// Default false (standard zstd format).
87 magicless: bool,
88 /// Pinned `Dictionary_ID` expectation set via
89 /// [`Self::expect_dict_id`]. `None` (default) disables the
90 /// check; `Some(0)` matches frames whose header omits the
91 /// optional dict_id (treated as "no dictionary"). Validated in
92 /// [`Self::reset`] AFTER the frame header parses successfully
93 /// and BEFORE any block decode work.
94 #[cfg(feature = "lsm")]
95 expect_dict_id: Option<u32>,
96 /// Pinned `Window_Descriptor` byte expectation set via
97 /// [`Self::expect_window_descriptor`]. `None` (default)
98 /// disables the check. Validated in [`Self::reset`] AFTER the
99 /// frame header parses successfully and BEFORE any block
100 /// decode work. Single-segment frames (which omit the
101 /// `Window_Descriptor` byte from the wire) surface as
102 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`]
103 /// with `found: None`.
104 #[cfg(feature = "lsm")]
105 expect_window_descriptor: Option<u8>,
106 /// When `true`, the per-block decode loop XXH64-hashes each
107 /// block's decompressed bytes and stores the low-32-bit digest in
108 /// [`Self::computed_block_checksums`]. Default `false` (zero
109 /// cost). Set via [`Self::enable_per_block_checksums`]. Gated on
110 /// `all(lsm, hash)` because XXH64 lives behind the `hash`
111 /// feature.
112 #[cfg(all(feature = "lsm", feature = "hash"))]
113 per_block_checksums_enabled: bool,
114 /// Per-block XXH64 (low 32 bits) digests captured during the
115 /// current frame's decode when `per_block_checksums_enabled` is
116 /// set. Reset at the start of every new frame. Gated on
117 /// `all(lsm, hash)` (see `per_block_checksums_enabled`).
118 #[cfg(all(feature = "lsm", feature = "hash"))]
119 computed_block_checksums: alloc::vec::Vec<u32>,
120}
121
122/// Backend-tagged decode scratch — chosen at frame-reset time based
123/// on the parsed `FrameHeader.descriptor.single_segment_flag()` and
124/// kept stable through the lifetime of the frame. The match in each
125/// helper below dispatches **once per call** (e.g. once per block in
126/// `decode_block_content`, once per drain in `drain_to_writer`) —
127/// never inside the hot push/repeat loop, which is fully
128/// monomorphised through the `DecoderScratch<B>` generic.
129enum DecoderScratchKind {
130 Ring(DecoderScratch<RingBuffer>),
131 Flat(DecoderScratch<FlatBuf>),
132}
133
134impl DecoderScratchKind {
135 fn new_ring(window_size: usize) -> Self {
136 // Lazy ring-buffer allocation: do NOT `reserve(window_size)` here.
137 // The direct-decode path (`run_direct_decode`) writes through
138 // `UserSliceBackend` and never touches the ring; allocating it
139 // eagerly wastes one full window of peak memory on the common
140 // direct-eligible frame. On the non-direct path the window is
141 // pre-reserved once at frame entry (`decode_all_impl` and
142 // `decode_blocks` both call `DecoderScratchKind::reserve_buffer`
143 // before any block writes), so multi-block frames pay one
144 // amortised grow instead of repeated `reserve_amortized` steps
145 // per block. Issue #279 round 2.
146 let s = DecoderScratch::<RingBuffer>::new(window_size);
147 Self::Ring(s)
148 }
149
150 /// Construct a flat-backed scratch sized for a single-segment
151 /// frame. `frame_content_size` is the upcoming output size in
152 /// bytes (== `window_size` when the flag is set).
153 fn new_flat(frame_content_size: usize) -> Self {
154 let flat = FlatBuf::with_capacity(frame_content_size);
155 // DecoderScratch's default ctor would discard the pre-sized
156 // FlatBuf — go through from_backend so the buffer carries the
157 // capacity the constructor wants.
158 let mut s = DecoderScratch::<FlatBuf>::new(frame_content_size);
159 s.buffer = DecodeBuffer::from_backend(flat, frame_content_size);
160 Self::Flat(s)
161 }
162
163 /// Reset (or transition between) backends for a new frame.
164 /// Reuses the existing `DecoderScratch` allocations (FSE / HUF
165 /// tables, sequence vec, etc.) when the backend kind is unchanged
166 /// — only the underlying buffer is re-sized for the new frame.
167 /// Building a fresh `DecoderScratch` on every frame would
168 /// re-allocate everything and was measured at +255 % vs ring on
169 /// small frames; reusing it keeps the small-frame cost flat.
170 fn reset(&mut self, frame: &frame::FrameHeader, window_size: usize) {
171 if frame.descriptor.single_segment_flag() {
172 match self {
173 Self::Flat(s) => {
174 s.reset(window_size);
175 // DecodeBuffer::reset clears + reserves
176 // window_size; FlatBuf's reserve grows the
177 // backing Vec if the new FCS is larger than
178 // what's already allocated. No alloc when the
179 // previous flat frame had >= this capacity.
180 }
181 Self::Ring(_) => *self = Self::new_flat(window_size),
182 }
183 } else {
184 match self {
185 Self::Ring(s) => s.reset(window_size),
186 Self::Flat(_) => *self = Self::new_ring(window_size),
187 }
188 }
189 }
190
191 fn init_from_dict(&mut self, dict: &Dictionary) {
192 match self {
193 Self::Ring(s) => s.init_from_dict(dict),
194 Self::Flat(s) => s.init_from_dict(dict),
195 }
196 }
197
198 #[inline]
199 fn buffer_len(&self) -> usize {
200 match self {
201 Self::Ring(s) => s.buffer.len(),
202 Self::Flat(s) => s.buffer.len(),
203 }
204 }
205
206 /// Pre-reserve the backing buffer to `window_size` in a single
207 /// allocation. Called once on the non-direct (`decode_blocks`) path
208 /// after direct-eligibility is ruled out, so multi-segment fallback
209 /// decodes don't pay repeated `reserve_amortized` grow steps
210 /// (128 KiB → 256 KiB → ... → window) as blocks accumulate.
211 ///
212 /// Direct-eligible ring-backed frames never call this and pay zero
213 /// ring allocation for the window. Flat-backed (single-segment)
214 /// frames eagerly size their `FlatBuf` to `frame_content_size` at
215 /// `new_flat` construction time, so a direct-eligible flat frame
216 /// already paid for that capacity before this helper is reachable;
217 /// the "zero buffer allocation" guarantee here applies to the
218 /// ring path only.
219 #[inline]
220 fn reserve_buffer(&mut self, window_size: usize) {
221 match self {
222 Self::Ring(s) => s.buffer.reserve(window_size),
223 Self::Flat(s) => s.buffer.reserve(window_size),
224 }
225 }
226
227 /// Last `n` bytes of the visible buffer as `(s1, s2)` (wrap-aware).
228 /// Routes through whichever backend the current scratch holds.
229 #[cfg(all(feature = "lsm", feature = "hash"))]
230 fn last_n_as_slices(&self, n: usize) -> (&[u8], &[u8]) {
231 match self {
232 Self::Ring(s) => s.buffer.last_n_as_slices(n),
233 Self::Flat(s) => s.buffer.last_n_as_slices(n),
234 }
235 }
236
237 fn buffer_drain(&mut self) -> Vec<u8> {
238 match self {
239 Self::Ring(s) => s.buffer.drain(),
240 Self::Flat(s) => s.buffer.drain(),
241 }
242 }
243
244 fn buffer_drain_to_window_size(&mut self) -> Option<Vec<u8>> {
245 match self {
246 Self::Ring(s) => s.buffer.drain_to_window_size(),
247 Self::Flat(s) => s.buffer.drain_to_window_size(),
248 }
249 }
250
251 fn buffer_drain_to_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
252 match self {
253 Self::Ring(s) => s.buffer.drain_to_writer(sink),
254 Self::Flat(s) => s.buffer.drain_to_writer(sink),
255 }
256 }
257
258 fn buffer_drain_to_window_size_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
259 match self {
260 Self::Ring(s) => s.buffer.drain_to_window_size_writer(sink),
261 Self::Flat(s) => s.buffer.drain_to_window_size_writer(sink),
262 }
263 }
264
265 fn buffer_can_drain(&self) -> usize {
266 match self {
267 Self::Ring(s) => s.buffer.can_drain(),
268 Self::Flat(s) => s.buffer.can_drain(),
269 }
270 }
271
272 fn buffer_can_drain_to_window_size(&self) -> Option<usize> {
273 match self {
274 Self::Ring(s) => s.buffer.can_drain_to_window_size(),
275 Self::Flat(s) => s.buffer.can_drain_to_window_size(),
276 }
277 }
278
279 fn buffer_read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
280 match self {
281 Self::Ring(s) => s.buffer.read(target),
282 Self::Flat(s) => s.buffer.read(target),
283 }
284 }
285
286 fn buffer_read_all(&mut self, target: &mut [u8]) -> Result<usize, Error> {
287 match self {
288 Self::Ring(s) => s.buffer.read_all(target),
289 Self::Flat(s) => s.buffer.read_all(target),
290 }
291 }
292
293 fn decode_block_content<R: Read>(
294 &mut self,
295 decoder: &mut BlockDecoder,
296 header: &crate::blocks::block::BlockHeader,
297 source: R,
298 ) -> Result<u64, DecodeBlockContentError> {
299 match self {
300 Self::Ring(s) => decoder.decode_block_content(header, s, source),
301 Self::Flat(s) => decoder.decode_block_content(header, s, source),
302 }
303 }
304
305 #[cfg(feature = "hash")]
306 fn hash_finish(&self) -> u64 {
307 use core::hash::Hasher;
308 match self {
309 Self::Ring(s) => s.buffer.hash.finish(),
310 Self::Flat(s) => s.buffer.hash.finish(),
311 }
312 }
313}
314
315struct FrameDecoderState {
316 pub frame_header: frame::FrameHeader,
317 decoder_scratch: DecoderScratchKind,
318 frame_finished: bool,
319 block_counter: usize,
320 bytes_read_counter: u64,
321 check_sum: Option<u32>,
322 using_dict: Option<u32>,
323}
324
325pub enum BlockDecodingStrategy {
326 All,
327 UptoBlocks(usize),
328 UptoBytes(usize),
329}
330
331impl FrameDecoderState {
332 /// Construct a new frame decoder state, reading the frame header
333 /// from `source`. When `magicless` is `true`, the 4-byte magic
334 /// number prefix is NOT consumed (donor `ZSTD_f_zstd1_magicless`).
335 /// Crate-internal — reached only via `FrameDecoder::init` /
336 /// `FrameDecoder::init_with_dict_handle`. For multi-segment
337 /// (ring-backed) frames the decode buffer is allocated lazily —
338 /// direct-eligible frames pay zero buffer allocation, and the
339 /// non-direct fallback reserves `window_size` once in
340 /// `decode_all_impl` via `reserve_buffer`. Single-segment
341 /// (flat-backed) frames eagerly size the backing `FlatBuf` to
342 /// `frame_content_size` because the flat path writes the entire
343 /// output into the same buffer and cannot defer that allocation.
344 pub(crate) fn new_with_format(
345 source: impl Read,
346 magicless: bool,
347 ) -> Result<FrameDecoderState, FrameDecoderError> {
348 let (frame, header_size) = frame::read_frame_header_with_format(source, magicless)?;
349 let window_size = frame.window_size()?;
350
351 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
352 return Err(FrameDecoderError::WindowSizeTooBig {
353 requested: window_size,
354 });
355 }
356
357 let decoder_scratch = if frame.descriptor.single_segment_flag() {
358 DecoderScratchKind::new_flat(window_size as usize)
359 } else {
360 DecoderScratchKind::new_ring(window_size as usize)
361 };
362 Ok(FrameDecoderState {
363 frame_header: frame,
364 frame_finished: false,
365 block_counter: 0,
366 decoder_scratch,
367 bytes_read_counter: u64::from(header_size),
368 check_sum: None,
369 using_dict: None,
370 })
371 }
372
373 /// Reset this state for a new frame read from `source`, reusing
374 /// existing allocations. When `magicless` is `true`, the frame
375 /// header is read WITHOUT expecting a magic-number prefix
376 /// (donor `ZSTD_f_zstd1_magicless`). Crate-internal — reached
377 /// only via `FrameDecoder::reset`.
378 ///
379 /// `DecodeBuffer::reset` no longer reserves window_size for either
380 /// backend — capacity decisions live one layer up. For ring-backed
381 /// scratch, direct-eligible frames pay zero allocation here and
382 /// the non-direct path is pre-reserved by `decode_all_impl` /
383 /// `decode_blocks` via `DecoderScratchKind::reserve_buffer(window_size)`
384 /// before any block writes. Flat-backed scratch is sized to
385 /// `frame_content_size` by `DecoderScratchKind::new_flat` at first
386 /// construction (and on Ring → Flat transition); a reused flat
387 /// scratch whose new frame fits within the prior FCS reuses the
388 /// existing capacity, and one whose new FCS is larger is also
389 /// pre-reserved by the same `reserve_buffer(window_size)` call at
390 /// frame entry (single-segment frames have window_size == FCS).
391 pub(crate) fn reset_with_format(
392 &mut self,
393 source: impl Read,
394 magicless: bool,
395 ) -> Result<(), FrameDecoderError> {
396 let (frame_header, header_size) = frame::read_frame_header_with_format(source, magicless)?;
397 let window_size = frame_header.window_size()?;
398
399 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
400 return Err(FrameDecoderError::WindowSizeTooBig {
401 requested: window_size,
402 });
403 }
404
405 self.decoder_scratch
406 .reset(&frame_header, window_size as usize);
407 self.frame_header = frame_header;
408 self.frame_finished = false;
409 self.block_counter = 0;
410 self.bytes_read_counter = u64::from(header_size);
411 self.check_sum = None;
412 self.using_dict = None;
413 Ok(())
414 }
415}
416
417impl Default for FrameDecoder {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423impl FrameDecoder {
424 /// This will create a new decoder without allocating anything yet.
425 /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
426 /// else they just reset these buffers with not further allocations
427 pub fn new() -> FrameDecoder {
428 FrameDecoder {
429 state: None,
430 owned_dicts: BTreeMap::new(),
431 #[cfg(target_has_atomic = "ptr")]
432 shared_dicts: BTreeMap::new(),
433 #[cfg(not(target_has_atomic = "ptr"))]
434 shared_dicts: (),
435 magicless: false,
436 #[cfg(feature = "lsm")]
437 expect_dict_id: None,
438 #[cfg(feature = "lsm")]
439 expect_window_descriptor: None,
440 #[cfg(all(feature = "lsm", feature = "hash"))]
441 per_block_checksums_enabled: false,
442 #[cfg(all(feature = "lsm", feature = "hash"))]
443 computed_block_checksums: alloc::vec::Vec::new(),
444 }
445 }
446
447 /// Opt in to per-block XXH64 verification during decode.
448 /// Default off; zero cost when disabled. Each block's decompressed
449 /// bytes are XXH64-hashed (low 32 bits) and appended to
450 /// [`Self::computed_block_checksums`] as the decode progresses.
451 /// Callers compare the captured digests against externally-stored
452 /// expected values (e.g. from a per-block sidecar in the
453 /// containing application protocol).
454 ///
455 /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
456 /// primitive lives behind the `hash` feature, so this method
457 /// only compiles when both are enabled.
458 #[cfg(all(feature = "lsm", feature = "hash"))]
459 pub fn enable_per_block_checksums(&mut self) {
460 self.per_block_checksums_enabled = true;
461 }
462
463 /// Per-block XXH64 (low 32 bits) digests captured during the
464 /// current frame's decode. Empty unless
465 /// [`Self::enable_per_block_checksums`] was called before
466 /// [`Self::decode_all`] / [`Self::reset`].
467 ///
468 /// Reset at the start of every new frame.
469 ///
470 /// Behind `all(feature = "lsm", feature = "hash")`.
471 #[cfg(all(feature = "lsm", feature = "hash"))]
472 pub fn computed_block_checksums(&self) -> &[u32] {
473 &self.computed_block_checksums
474 }
475
476 /// Pin the expected `Dictionary_ID` for the next frame.
477 ///
478 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
479 /// validate it against the parsed frame header BEFORE any
480 /// block decode work runs. A mismatch returns
481 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedDictId`]
482 /// before any block decode and before any output is produced.
483 /// Scratch buffer allocation / reservation for the decode
484 /// pipeline happens during frame-header parsing, which is
485 /// already complete when this validation fires — the cost of
486 /// scratch sizing is paid even on a mismatched header. The
487 /// guarantee is "no block decode, no XXH64 init, no partial
488 /// output", not "zero allocation".
489 ///
490 /// `Some(0)` is treated as "no dictionary expected": a frame
491 /// whose header omits the optional `Dictionary_ID` field
492 /// (flag value 0) passes the check; a frame that carries an
493 /// explicit non-zero id fails.
494 ///
495 /// `None` (default) disables the check.
496 ///
497 /// Primary use case: post-AEAD-decrypt sanity check in
498 /// wire-format consumers (e.g. lsm-tree's encrypted block
499 /// format pins the `dict_id` baked into the AAD against the
500 /// inner zstd frame's `dict_id` to defeat dict-substitution
501 /// attacks).
502 ///
503 /// NOT a replacement for AEAD authentication. NOT the same
504 /// semantic as donor `ZSTD_d_windowLogMax` (which is a
505 /// ceiling-style limit, separate concern).
506 #[cfg(feature = "lsm")]
507 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
508 pub fn expect_dict_id(&mut self, expected: Option<u32>) {
509 self.expect_dict_id = expected;
510 }
511
512 /// Pin the expected raw `Window_Descriptor` byte (RFC 8878
513 /// §3.1.1.1.2 layout: `(exp << 3) | mantissa`) for the next
514 /// frame.
515 ///
516 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
517 /// validate it against the parsed frame header BEFORE any
518 /// block decode work runs. A mismatch returns
519 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`].
520 ///
521 /// Single-segment frames omit the `Window_Descriptor` byte
522 /// from the wire entirely. Setting an expectation while
523 /// receiving a single-segment frame fails the check with
524 /// `found: None` — there is no on-wire byte to match against,
525 /// which is reported explicitly rather than silently passing.
526 ///
527 /// `None` (default) disables the check.
528 ///
529 /// Byte-exact equality, NOT a ceiling. Donor
530 /// `ZSTD_d_windowLogMax` is a separate ceiling-style limit
531 /// available through the C FFI surface; this method is for
532 /// strict equality validation against a pinned expectation
533 /// (e.g. lsm-tree's wire format pins the window descriptor
534 /// from the AAD to defeat decompression-bomb-swap attacks).
535 #[cfg(feature = "lsm")]
536 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
537 pub fn expect_window_descriptor(&mut self, expected: Option<u8>) {
538 self.expect_window_descriptor = expected;
539 }
540
541 /// Validate the just-parsed frame header against any pinned
542 /// expectations set via [`Self::expect_dict_id`] /
543 /// [`Self::expect_window_descriptor`].
544 ///
545 /// Returns the typed error variant on mismatch and leaves
546 /// `self.state` in a re-resettable shape — a subsequent
547 /// `reset()` will overwrite `frame_header` from the new source
548 /// without needing intermediate cleanup.
549 #[cfg(feature = "lsm")]
550 fn validate_expectations(
551 &self,
552 frame_header: &frame::FrameHeader,
553 ) -> Result<(), FrameDecoderError> {
554 if let Some(expected) = self.expect_dict_id {
555 let found = frame_header.dictionary_id();
556 // `Some(0)` is the "no dictionary expected" sentinel —
557 // matches a frame whose header omits the optional
558 // dict_id field (which is reported as `None` by the
559 // parser). All other values must match exactly.
560 let matches = match (expected, found) {
561 (0, None) => true,
562 (e, Some(f)) => e == f,
563 _ => false,
564 };
565 if !matches {
566 return Err(FrameDecoderError::UnexpectedDictId {
567 expected: Some(expected),
568 found,
569 });
570 }
571 }
572 if let Some(expected) = self.expect_window_descriptor {
573 let found = frame_header.window_descriptor();
574 if found != Some(expected) {
575 return Err(FrameDecoderError::UnexpectedWindowDescriptor { expected, found });
576 }
577 }
578 Ok(())
579 }
580
581 /// Enable or disable magicless frame format
582 /// (`ZSTD_f_zstd1_magicless`). When set to `true`, subsequent
583 /// [`init`] / [`reset`] calls expect the frame header to begin
584 /// directly with the frame-header descriptor — no 4-byte magic
585 /// number prefix. Default false. Must match the encoder's
586 /// magicless setting; the format is unambiguous only when the
587 /// caller knows it out-of-band.
588 ///
589 /// Note: magicless mode also disables skippable-frame detection.
590 /// The `0x184D2A50..=0x184D2A5F` skippable-frame magic range is
591 /// only recognised when the 4-byte magic prefix is consumed, so
592 /// `decode_all` / `init` / `reset` will treat a skippable frame
593 /// at the head of a magicless stream as a malformed frame header
594 /// (bad descriptor / window-size error) instead of skipping it.
595 /// Mixed-format streams that interleave skippable frames must be
596 /// pre-split by the caller; `set_magicless(true)` is only safe
597 /// when the entire stream is known to be magicless zstd frames.
598 pub fn set_magicless(&mut self, magicless: bool) {
599 self.magicless = magicless;
600 }
601
602 #[cfg(target_has_atomic = "ptr")]
603 fn shared_dict_exists(&self, dict_id: u32) -> bool {
604 self.shared_dicts.contains_key(&dict_id)
605 }
606
607 #[cfg(not(target_has_atomic = "ptr"))]
608 fn shared_dict_exists(&self, _dict_id: u32) -> bool {
609 false
610 }
611
612 fn validate_registered_dictionary(dict: &Dictionary) -> Result<(), FrameDecoderError> {
613 use crate::decoding::errors::DictionaryDecodeError as dict_err;
614
615 if dict.id == 0 {
616 return Err(FrameDecoderError::from(dict_err::ZeroDictionaryId));
617 }
618 if let Some(index) = dict.offset_hist.iter().position(|&rep| rep == 0) {
619 return Err(FrameDecoderError::from(
620 dict_err::ZeroRepeatOffsetInDictionary { index: index as u8 },
621 ));
622 }
623 Ok(())
624 }
625
626 /// init() will allocate all needed buffers if it is the first time this decoder is used
627 /// else they just reset these buffers with not further allocations
628 ///
629 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
630 ///
631 /// equivalent to reset()
632 pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
633 self.reset(source)
634 }
635
636 /// Initialize the decoder for a new frame using a pre-parsed dictionary handle.
637 ///
638 /// If the frame header has a dictionary ID, this validates it against
639 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
640 ///
641 /// If the header omits the optional dictionary ID, this still applies the
642 /// provided dictionary handle.
643 ///
644 /// # Warning
645 ///
646 /// This method always applies `dict` unless the frame header contains a
647 /// non-matching dictionary ID. Callers must only use this API when they
648 /// already know the frame was encoded with the provided dictionary, even if
649 /// the frame header omits the dictionary ID or encodes an explicit
650 /// dictionary ID of `0`.
651 ///
652 /// Passing a dictionary for a frame that was not encoded with it can
653 /// silently corrupt the decoded output.
654 pub fn init_with_dict_handle(
655 &mut self,
656 source: impl Read,
657 dict: &DictionaryHandle,
658 ) -> Result<(), FrameDecoderError> {
659 self.reset_with_dict_handle(source, dict)
660 }
661
662 /// reset() will allocate all needed buffers if it is the first time this decoder is used
663 /// else they just reset these buffers with not further allocations
664 ///
665 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
666 ///
667 /// equivalent to init()
668 pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
669 use FrameDecoderError as err;
670 // Fresh frame → start with an empty per-block checksum vec so
671 // the values for the next frame don't carry over from the
672 // previous one.
673 #[cfg(all(feature = "lsm", feature = "hash"))]
674 self.computed_block_checksums.clear();
675 let magicless = self.magicless;
676 let dict_id = match &mut self.state {
677 Some(s) => {
678 s.reset_with_format(source, magicless)?;
679 s.frame_header.dictionary_id()
680 }
681 None => {
682 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
683 self.state
684 .as_ref()
685 .and_then(|state| state.frame_header.dictionary_id())
686 }
687 };
688 // Validate any pinned expectations BEFORE block decode work
689 // runs. Catches dict_id substitution / window-descriptor
690 // tampering on inputs already authenticated by an outer
691 // layer (e.g. AEAD). Returning here leaves `self.state` in
692 // a re-resettable shape — next `reset()` re-parses the
693 // frame header without intermediate cleanup.
694 #[cfg(feature = "lsm")]
695 if let Some(state) = self.state.as_ref() {
696 self.validate_expectations(&state.frame_header)?;
697 }
698 if let Some(dict_id) = dict_id {
699 let state = self.state.as_mut().expect("state initialized");
700 let owned_dicts = &self.owned_dicts;
701 #[cfg(target_has_atomic = "ptr")]
702 let shared_dicts = &self.shared_dicts;
703 let dict = owned_dicts
704 .get(&dict_id)
705 .or_else(|| {
706 #[cfg(target_has_atomic = "ptr")]
707 {
708 shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
709 }
710 #[cfg(not(target_has_atomic = "ptr"))]
711 {
712 None
713 }
714 })
715 .ok_or(err::DictNotProvided { dict_id })?;
716 state.decoder_scratch.init_from_dict(dict);
717 state.using_dict = Some(dict_id);
718 }
719 Ok(())
720 }
721
722 /// Reset this decoder for a new frame using a pre-parsed dictionary handle.
723 ///
724 /// If the frame header has a dictionary ID, this validates it against
725 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
726 ///
727 /// If the header omits the optional dictionary ID, this still applies the
728 /// provided dictionary handle.
729 ///
730 /// # Warning
731 ///
732 /// This method always applies `dict` unless the frame header contains a
733 /// non-matching dictionary ID. Callers must only use this API when they
734 /// already know the frame was encoded with the provided dictionary, even if
735 /// the frame header omits the dictionary ID or encodes an explicit
736 /// dictionary ID of `0`.
737 ///
738 /// Passing a dictionary for a frame that was not encoded with it can
739 /// silently corrupt the decoded output.
740 pub fn reset_with_dict_handle(
741 &mut self,
742 source: impl Read,
743 dict: &DictionaryHandle,
744 ) -> Result<(), FrameDecoderError> {
745 use FrameDecoderError as err;
746 // Fresh frame → drop the previous frame's per-block checksum
747 // digests so the next decode starts with an empty vec.
748 // Mirrors the same clear in `reset()`; reset_with_dict_handle
749 // is a parallel entry point so it needs its own call.
750 #[cfg(all(feature = "lsm", feature = "hash"))]
751 self.computed_block_checksums.clear();
752 Self::validate_registered_dictionary(dict.as_dict())?;
753 let magicless = self.magicless;
754 // Scope the &mut borrow of `self.state` to the header parse
755 // alone, so the subsequent `validate_expectations(&self, ...)`
756 // call below can take a fresh shared borrow of self without
757 // tripping the borrow checker.
758 match &mut self.state {
759 Some(s) => s.reset_with_format(source, magicless)?,
760 None => {
761 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
762 }
763 }
764 // Single source of truth: route through the same
765 // `validate_expectations` used by `reset()`. Routing through
766 // the helper keeps the two code paths from drifting (e.g.,
767 // if expect-semantics or error wiring changes later).
768 #[cfg(feature = "lsm")]
769 {
770 let header = &self
771 .state
772 .as_ref()
773 .expect("state populated by reset_with_format/new_with_format")
774 .frame_header;
775 self.validate_expectations(header)?;
776 }
777 let state = self
778 .state
779 .as_mut()
780 .expect("state populated by reset_with_format/new_with_format");
781 if let Some(dict_id) = state.frame_header.dictionary_id()
782 && dict_id != dict.id()
783 {
784 return Err(err::DictIdMismatch {
785 expected: dict_id,
786 provided: dict.id(),
787 });
788 }
789 state.decoder_scratch.init_from_dict(dict.as_dict());
790 state.using_dict = Some(dict.id());
791 Ok(())
792 }
793
794 /// Add a dictionary that can be selected dynamically by frame dictionary ID.
795 ///
796 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
797 /// registered (either as owned or shared).
798 pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
799 Self::validate_registered_dictionary(&dict)?;
800 let dict_id = dict.id;
801 if self.owned_dicts.contains_key(&dict_id) || self.shared_dict_exists(dict_id) {
802 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
803 }
804 self.owned_dicts.insert(dict_id, dict);
805 Ok(())
806 }
807
808 /// Parse and add a serialized dictionary blob.
809 pub fn add_dict_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), FrameDecoderError> {
810 let dict = Dictionary::decode_dict(raw_dictionary)?;
811 self.add_dict(dict)
812 }
813
814 /// Add a pre-parsed dictionary handle for reuse across decoders.
815 ///
816 /// This API is available on targets with pointer-width atomics
817 /// (`target_has_atomic = "ptr"`).
818 ///
819 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
820 /// registered (either as owned or shared).
821 #[cfg(target_has_atomic = "ptr")]
822 pub fn add_dict_handle(&mut self, dict: DictionaryHandle) -> Result<(), FrameDecoderError> {
823 Self::validate_registered_dictionary(dict.as_dict())?;
824 let dict_id = dict.id();
825 if self.owned_dicts.contains_key(&dict_id) || self.shared_dicts.contains_key(&dict_id) {
826 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
827 }
828 self.shared_dicts.insert(dict_id, dict);
829 Ok(())
830 }
831
832 pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
833 use FrameDecoderError as err;
834 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
835 let owned_dicts = &self.owned_dicts;
836 #[cfg(target_has_atomic = "ptr")]
837 let shared_dicts = &self.shared_dicts;
838
839 let dict = owned_dicts
840 .get(&dict_id)
841 .or_else(|| {
842 #[cfg(target_has_atomic = "ptr")]
843 {
844 shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
845 }
846 #[cfg(not(target_has_atomic = "ptr"))]
847 {
848 None
849 }
850 })
851 .ok_or(err::DictNotProvided { dict_id })?;
852 state.decoder_scratch.init_from_dict(dict);
853 state.using_dict = Some(dict_id);
854
855 Ok(())
856 }
857
858 /// Returns how many bytes the frame contains after decompression
859 pub fn content_size(&self) -> u64 {
860 match &self.state {
861 None => 0,
862 Some(s) => s.frame_header.frame_content_size(),
863 }
864 }
865
866 /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
867 pub fn get_checksum_from_data(&self) -> Option<u32> {
868 let state = self.state.as_ref()?;
869
870 state.check_sum
871 }
872
873 /// Returns the checksum that was calculated while decoding.
874 /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder.
875 /// Returns `None` when the frame header has `content_checksum_flag = 0`:
876 /// no hash is computed for such frames (the post-decode XXH64 pass was a
877 /// 63 % decode-wall hotspot on flag-off frames; skipping it when the
878 /// frame format declares no trailing digest avoids that wasted work).
879 #[cfg(feature = "hash")]
880 pub fn get_calculated_checksum(&self) -> Option<u32> {
881 let state = self.state.as_ref()?;
882 if !state.frame_header.descriptor.content_checksum_flag() {
883 return None;
884 }
885 let cksum_64bit = state.decoder_scratch.hash_finish();
886 //truncate to lower 32bit because reasons...
887 Some(cksum_64bit as u32)
888 }
889
890 /// Counter for how many bytes have been consumed while decoding the frame
891 pub fn bytes_read_from_source(&self) -> u64 {
892 let state = match &self.state {
893 None => return 0,
894 Some(s) => s,
895 };
896 state.bytes_read_counter
897 }
898
899 /// Whether the current frames last block has been decoded yet
900 /// If this returns true you can call the drain* functions to get all content
901 /// (the read() function will drain automatically if this returns true)
902 pub fn is_finished(&self) -> bool {
903 let state = match &self.state {
904 None => return true,
905 Some(s) => s,
906 };
907 if state.frame_header.descriptor.content_checksum_flag() {
908 state.frame_finished && state.check_sum.is_some()
909 } else {
910 state.frame_finished
911 }
912 }
913
914 /// Counter for how many blocks have already been decoded
915 pub fn blocks_decoded(&self) -> usize {
916 let state = match &self.state {
917 None => return 0,
918 Some(s) => s,
919 };
920 state.block_counter
921 }
922
923 /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
924 /// The Strategy influences how many blocks will be decoded before the function returns
925 /// This is important if you want to manage memory consumption carefully. If you don't care
926 /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
927 pub fn decode_blocks(
928 &mut self,
929 mut source: impl Read,
930 strat: BlockDecodingStrategy,
931 ) -> Result<bool, FrameDecoderError> {
932 use FrameDecoderError as err;
933 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
934
935 // Streaming entry point: pre-reserve the backing buffer to
936 // `window_size` so multi-block frames don't pay repeated
937 // `reserve_amortized` grow steps (128 KiB → 256 KiB → ... →
938 // window) as blocks accumulate. `decode_all` does the same up
939 // front in `decode_all_impl`; this mirrors it for callers
940 // driving `decode_blocks` directly. Idempotent — the
941 // backend's `reserve` early-returns when capacity is already
942 // sufficient.
943 let window_size = state.frame_header.window_size().unwrap_or(0) as usize;
944 state.decoder_scratch.reserve_buffer(window_size);
945
946 let mut block_dec = decoding::block_decoder::new();
947
948 let buffer_size_before = state.decoder_scratch.buffer_len();
949 let block_counter_before = state.block_counter;
950 loop {
951 vprintln!("################");
952 vprintln!("Next Block: {}", state.block_counter);
953 vprintln!("################");
954 let (block_header, block_header_size) = block_dec
955 .read_block_header(&mut source)
956 .map_err(err::FailedToReadBlockHeader)?;
957 state.bytes_read_counter += u64::from(block_header_size);
958
959 vprintln!();
960 vprintln!(
961 "Found {} block with size: {}, which will be of size: {}",
962 block_header.block_type,
963 block_header.content_size,
964 block_header.decompressed_size
965 );
966
967 #[cfg(all(feature = "lsm", feature = "hash"))]
968 let len_before_block: Option<usize> = if self.per_block_checksums_enabled {
969 Some(state.decoder_scratch.buffer_len())
970 } else {
971 None
972 };
973 let bytes_read_in_block_body = state
974 .decoder_scratch
975 .decode_block_content(&mut block_dec, &block_header, &mut source)
976 .map_err(err::FailedToReadBlockBody)?;
977 state.bytes_read_counter += bytes_read_in_block_body;
978
979 // Per-block XXH64 (low 32 bits) of the just-decompressed
980 // bytes. Hashed from `last_n_as_slices` so RingBuffer wrap
981 // is handled in-place, no extra copy.
982 #[cfg(all(feature = "lsm", feature = "hash"))]
983 if let Some(len_before_block) = len_before_block {
984 let added = state.decoder_scratch.buffer_len() - len_before_block;
985 let (s1, s2) = state.decoder_scratch.last_n_as_slices(added);
986 let mut h = twox_hash::XxHash64::with_seed(0);
987 use core::hash::Hasher;
988 h.write(s1);
989 h.write(s2);
990 self.computed_block_checksums.push(h.finish() as u32);
991 }
992
993 state.block_counter += 1;
994
995 vprintln!("Output: {}", state.decoder_scratch.buffer_len());
996
997 if block_header.last_block {
998 state.frame_finished = true;
999 if state.frame_header.descriptor.content_checksum_flag() {
1000 let mut chksum = [0u8; 4];
1001 source
1002 .read_exact(&mut chksum)
1003 .map_err(err::FailedToReadChecksum)?;
1004 state.bytes_read_counter += 4;
1005 let chksum = u32::from_le_bytes(chksum);
1006 state.check_sum = Some(chksum);
1007 }
1008 break;
1009 }
1010
1011 match strat {
1012 BlockDecodingStrategy::All => { /* keep going */ }
1013 BlockDecodingStrategy::UptoBlocks(n) => {
1014 if state.block_counter - block_counter_before >= n {
1015 break;
1016 }
1017 }
1018 BlockDecodingStrategy::UptoBytes(n) => {
1019 if state.decoder_scratch.buffer_len() - buffer_size_before >= n {
1020 break;
1021 }
1022 }
1023 }
1024 }
1025
1026 Ok(state.frame_finished)
1027 }
1028
1029 /// Collect bytes and retain window_size bytes while decoding is still going on.
1030 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1031 pub fn collect(&mut self) -> Option<Vec<u8>> {
1032 let finished = self.is_finished();
1033 let state = self.state.as_mut()?;
1034 if finished {
1035 Some(state.decoder_scratch.buffer_drain())
1036 } else {
1037 state.decoder_scratch.buffer_drain_to_window_size()
1038 }
1039 }
1040
1041 /// Collect bytes and retain window_size bytes while decoding is still going on.
1042 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
1043 pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
1044 let finished = self.is_finished();
1045 let state = match &mut self.state {
1046 None => return Ok(0),
1047 Some(s) => s,
1048 };
1049 if finished {
1050 state.decoder_scratch.buffer_drain_to_writer(w)
1051 } else {
1052 state.decoder_scratch.buffer_drain_to_window_size_writer(w)
1053 }
1054 }
1055
1056 /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
1057 /// because window_size bytes need to be retained for decoding.
1058 /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
1059 pub fn can_collect(&self) -> usize {
1060 let finished = self.is_finished();
1061 let state = match &self.state {
1062 None => return 0,
1063 Some(s) => s,
1064 };
1065 if finished {
1066 state.decoder_scratch.buffer_can_drain()
1067 } else {
1068 state
1069 .decoder_scratch
1070 .buffer_can_drain_to_window_size()
1071 .unwrap_or(0)
1072 }
1073 }
1074
1075 /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
1076 /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
1077 ///
1078 /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
1079 /// which try to serve an old-style c api
1080 ///
1081 /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
1082 /// input will not make any progress!
1083 ///
1084 /// Note that no kind of block can be bigger than 128kb.
1085 /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
1086 ///
1087 /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
1088 pub fn decode_from_to(
1089 &mut self,
1090 source: &[u8],
1091 target: &mut [u8],
1092 ) -> Result<(usize, usize), FrameDecoderError> {
1093 use FrameDecoderError as err;
1094 let bytes_read_at_start = match &self.state {
1095 Some(s) => s.bytes_read_counter,
1096 None => 0,
1097 };
1098
1099 if !self.is_finished() || self.state.is_none() {
1100 let mut mt_source = source;
1101
1102 if self.state.is_none() {
1103 self.init(&mut mt_source)?;
1104 }
1105
1106 //pseudo block to scope "state" so we can borrow self again after the block
1107 {
1108 let state = match &mut self.state {
1109 Some(s) => s,
1110 None => panic!("Bug in library"),
1111 };
1112 let mut block_dec = decoding::block_decoder::new();
1113
1114 if state.frame_header.descriptor.content_checksum_flag()
1115 && state.frame_finished
1116 && state.check_sum.is_none()
1117 {
1118 //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
1119 if mt_source.len() >= 4 {
1120 let chksum = mt_source[..4].try_into().expect("optimized away");
1121 state.bytes_read_counter += 4;
1122 let chksum = u32::from_le_bytes(chksum);
1123 state.check_sum = Some(chksum);
1124 }
1125 return Ok((4, 0));
1126 }
1127
1128 loop {
1129 //check if there are enough bytes for the next header
1130 if mt_source.len() < 3 {
1131 break;
1132 }
1133 let (block_header, block_header_size) = block_dec
1134 .read_block_header(&mut mt_source)
1135 .map_err(err::FailedToReadBlockHeader)?;
1136
1137 // check the needed size for the block before updating counters.
1138 // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
1139 if mt_source.len() < block_header.content_size as usize {
1140 break;
1141 }
1142 state.bytes_read_counter += u64::from(block_header_size);
1143
1144 let bytes_read_in_block_body = state
1145 .decoder_scratch
1146 .decode_block_content(&mut block_dec, &block_header, &mut mt_source)
1147 .map_err(err::FailedToReadBlockBody)?;
1148 state.bytes_read_counter += bytes_read_in_block_body;
1149 state.block_counter += 1;
1150
1151 if block_header.last_block {
1152 state.frame_finished = true;
1153 if state.frame_header.descriptor.content_checksum_flag() {
1154 //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
1155 if mt_source.len() >= 4 {
1156 let chksum = mt_source[..4].try_into().expect("optimized away");
1157 state.bytes_read_counter += 4;
1158 let chksum = u32::from_le_bytes(chksum);
1159 state.check_sum = Some(chksum);
1160 }
1161 }
1162 break;
1163 }
1164 }
1165 }
1166 }
1167
1168 let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
1169 let bytes_read_at_end = match &mut self.state {
1170 Some(s) => s.bytes_read_counter,
1171 None => panic!("Bug in library"),
1172 };
1173 let read_len = bytes_read_at_end - bytes_read_at_start;
1174 Ok((read_len as usize, result_len))
1175 }
1176
1177 /// Decode multiple frames into the output slice.
1178 ///
1179 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1180 /// skipped during decode.
1181 ///
1182 /// `output` must be large enough to hold the decompressed data. If you don't know
1183 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1184 ///
1185 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1186 ///
1187 /// Returns the number of bytes written to `output`.
1188 pub fn decode_all(
1189 &mut self,
1190 input: &[u8],
1191 output: &mut [u8],
1192 ) -> Result<usize, FrameDecoderError> {
1193 #[cfg(not(feature = "lsm"))]
1194 {
1195 self.decode_all_impl(input, output, |this, src| this.init(src))
1196 }
1197 #[cfg(feature = "lsm")]
1198 {
1199 self.decode_all_impl(input, output, |this, src| this.init(src), None)
1200 }
1201 }
1202
1203 /// Decode multiple frames into the output slice, invoking `visitor`
1204 /// for every skippable frame encountered before advancing past it.
1205 ///
1206 /// `input` must contain an exact number of frames. Skippable frames
1207 /// (RFC 8878 §3.1.2 magic numbers `0x184D2A50..=0x184D2A5F`) are
1208 /// allowed and will be both visited AND skipped: the visitor gets
1209 /// `(magic_variant, payload)` where `magic_variant` is the low
1210 /// nibble of the magic (`magic - 0x184D2A50`, range `0..=15`) and
1211 /// `payload` is a borrowed slice of the on-wire payload bytes (the
1212 /// skippable frame's `Frame_Size` field worth of data) into
1213 /// `input` — no allocation.
1214 ///
1215 /// The visitor sees skippable frames in stream order; interleaved
1216 /// regular zstd frames continue to decompress into `output` exactly
1217 /// as `decode_all` does.
1218 ///
1219 /// `output` must be large enough to hold the decompressed data.
1220 /// Returns the number of bytes written to `output`.
1221 ///
1222 /// # Example
1223 ///
1224 /// ```ignore
1225 /// use structured_zstd::decoding::FrameDecoder;
1226 ///
1227 /// let mut decoder = FrameDecoder::new();
1228 /// let mut output = vec![0u8; 1024];
1229 /// let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
1230 /// let n = decoder.decode_all_with_skippable_visitor(
1231 /// input,
1232 /// &mut output,
1233 /// |variant, payload| collected.push((variant, payload.to_vec())),
1234 /// )?;
1235 /// ```
1236 #[cfg(feature = "lsm")]
1237 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1238 pub fn decode_all_with_skippable_visitor<F>(
1239 &mut self,
1240 input: &[u8],
1241 output: &mut [u8],
1242 mut visitor: F,
1243 ) -> Result<usize, FrameDecoderError>
1244 where
1245 F: FnMut(u8, &[u8]),
1246 {
1247 self.decode_all_impl(
1248 input,
1249 output,
1250 |this, src| this.init(src),
1251 Some(&mut visitor),
1252 )
1253 }
1254
1255 /// Decode multiple frames into the output slice using a pre-parsed dictionary handle.
1256 ///
1257 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1258 /// skipped during decode.
1259 ///
1260 /// `output` must be large enough to hold the decompressed data. If you don't know
1261 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1262 ///
1263 /// This calls [`FrameDecoder::init_with_dict_handle`], and all bytes currently in the
1264 /// decoder will be lost.
1265 ///
1266 /// # Warning
1267 ///
1268 /// Each decoded frame is initialized with `dict`, even when a frame header
1269 /// omits the optional dictionary ID. Callers must only use this API when
1270 /// they already know the input frames were encoded with the provided
1271 /// dictionary; otherwise decoded output can be silently corrupted.
1272 pub fn decode_all_with_dict_handle(
1273 &mut self,
1274 input: &[u8],
1275 output: &mut [u8],
1276 dict: &DictionaryHandle,
1277 ) -> Result<usize, FrameDecoderError> {
1278 #[cfg(not(feature = "lsm"))]
1279 {
1280 self.decode_all_impl(input, output, |this, src| {
1281 this.init_with_dict_handle(src, dict)
1282 })
1283 }
1284 #[cfg(feature = "lsm")]
1285 {
1286 self.decode_all_impl(
1287 input,
1288 output,
1289 |this, src| this.init_with_dict_handle(src, dict),
1290 None,
1291 )
1292 }
1293 }
1294
1295 /// Default-feature decode_all_impl: no visitor parameter so the
1296 /// no-lsm build's call surface and codegen are byte-identical to
1297 /// the pre-#172 implementation. Compiles only when `lsm` is OFF.
1298 #[cfg(not(feature = "lsm"))]
1299 fn decode_all_impl(
1300 &mut self,
1301 mut input: &[u8],
1302 mut output: &mut [u8],
1303 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1304 ) -> Result<usize, FrameDecoderError> {
1305 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1306 let mut total_bytes_written = 0;
1307 while !input.is_empty() {
1308 match init_frame(self, &mut input) {
1309 Ok(_) => {}
1310 Err(FrameDecoderError::ReadFrameHeaderError(
1311 crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
1312 )) => {
1313 input = input
1314 .get(length as usize..)
1315 .ok_or(FrameDecoderError::FailedToSkipFrame)?;
1316 continue;
1317 }
1318 Err(e) => return Err(e),
1319 };
1320 // Per-frame direct-path dispatch. Now safe to route the
1321 // public `decode_all` here because
1322 // `UserSliceBackend::exec_sequence_inline` returns
1323 // `Result<(), ExecuteSequencesError>` instead of
1324 // panicking on capacity overflow; the error propagates
1325 // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1326 // active dict, remaining `output` slice has WILDCOPY
1327 // slack) puts the frame on the fast path that bypasses
1328 // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1329 // frames (no FCS, active dict, output too small for
1330 // slack) fall through to the legacy `decode_blocks` +
1331 // `read` drain loop below.
1332 let (content_size, fcs_declared, dict_active, window_size) = {
1333 let state_ref = self.state.as_ref().expect("init populated state");
1334 (
1335 state_ref.frame_header.frame_content_size(),
1336 state_ref.frame_header.fcs_declared(),
1337 state_ref.using_dict.is_some(),
1338 state_ref.frame_header.window_size().unwrap_or(0),
1339 )
1340 };
1341 let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1342 // Per-block checksums collected inside `run_direct_decode`
1343 // post-loop (over recorded (start, end) ranges of `output`)
1344 // so the direct path stays eligible AND keeps the
1345 // window-size cap (`drop_to_window_size`) between blocks
1346 // that the spec relies on for `offset <= window_size`
1347 // validation. Path choice no longer alters checksum
1348 // semantics.
1349 let direct_eligible =
1350 content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1351 if direct_eligible {
1352 let written = self.run_direct_decode(&mut input, output, content_size)?;
1353 output = &mut output[written..];
1354 total_bytes_written += written;
1355 continue;
1356 }
1357 // Non-direct fallback: pre-reserve the backing buffer to
1358 // `window_size` in a single allocation before block decode
1359 // starts, so multi-segment frames don't pay repeated
1360 // `reserve_amortized` grow steps as blocks accumulate (each
1361 // block only reserves MAX_BLOCK_SIZE = 128 KiB, so a window
1362 // > 128 KiB otherwise grows through several intermediate
1363 // sizes with `alloc_zeroed + memcpy` each time).
1364 if let Some(state) = self.state.as_mut() {
1365 state.decoder_scratch.reserve_buffer(window_size as usize);
1366 }
1367 let frame_start_total = total_bytes_written;
1368 loop {
1369 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1370 let bytes_written = self
1371 .read(output)
1372 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1373 output = &mut output[bytes_written..];
1374 total_bytes_written += bytes_written;
1375 if self.can_collect() != 0 {
1376 return Err(FrameDecoderError::TargetTooSmall);
1377 }
1378 if self.is_finished() {
1379 break;
1380 }
1381 }
1382 // Per-frame FCS validation on the legacy fallback path.
1383 // Use `fcs_declared()` (NOT `content_size > 0`) so an
1384 // empty frame with explicit FCS=0 on the wire still gets
1385 // validated.
1386 if fcs_declared {
1387 let produced = (total_bytes_written - frame_start_total) as u64;
1388 if produced != content_size {
1389 return Err(FrameDecoderError::FrameContentSizeMismatch {
1390 declared: content_size,
1391 produced,
1392 });
1393 }
1394 }
1395 }
1396
1397 Ok(total_bytes_written)
1398 }
1399
1400 /// `lsm`-feature decode_all_impl: adds the optional skippable
1401 /// visitor parameter consumed by
1402 /// [`Self::decode_all_with_skippable_visitor`]. Mirrors the no-lsm
1403 /// variant including the direct-path dispatch + FCS-validation
1404 /// rationale comments, so the two functions stay in sync; the only
1405 /// behavioral difference is the SkipFrame arm, which uses
1406 /// `split_at(length)` (single bounds check) instead of two
1407 /// separate `get(..length)` / `get(length..)` slices and invokes
1408 /// the visitor (when `Some`) on the borrowed payload before
1409 /// advancing past it.
1410 #[cfg(feature = "lsm")]
1411 #[allow(clippy::type_complexity)]
1412 fn decode_all_impl(
1413 &mut self,
1414 mut input: &[u8],
1415 mut output: &mut [u8],
1416 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1417 mut skippable_visitor: Option<&mut dyn FnMut(u8, &[u8])>,
1418 ) -> Result<usize, FrameDecoderError> {
1419 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1420 let mut total_bytes_written = 0;
1421 while !input.is_empty() {
1422 match init_frame(self, &mut input) {
1423 Ok(_) => {}
1424 Err(FrameDecoderError::ReadFrameHeaderError(
1425 crate::decoding::errors::ReadFrameHeaderError::SkipFrame {
1426 magic_number,
1427 length,
1428 },
1429 )) => {
1430 let length = length as usize;
1431 // Visitor sees the payload slice BEFORE we advance
1432 // past it. Borrowed slice — no allocation. The
1433 // variant is the low nibble of the magic number
1434 // (RFC 8878 §3.1.2). `read_frame_header` only emits
1435 // SkipFrame for magic in 0x184D2A50..=0x184D2A5F, so
1436 // the subtraction fits in 0..=15.
1437 if input.len() < length {
1438 return Err(FrameDecoderError::FailedToSkipFrame);
1439 }
1440 let (payload, rest) = input.split_at(length);
1441 if let Some(visitor) = skippable_visitor.as_mut() {
1442 let variant = (magic_number - 0x184D2A50) as u8;
1443 visitor(variant, payload);
1444 }
1445 input = rest;
1446 continue;
1447 }
1448 Err(e) => return Err(e),
1449 };
1450 // Per-frame direct-path dispatch. Now safe to route the
1451 // public `decode_all` here because
1452 // `UserSliceBackend::exec_sequence_inline` returns
1453 // `Result<(), ExecuteSequencesError>` instead of
1454 // panicking on capacity overflow; the error propagates
1455 // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1456 // active dict, remaining `output` slice has WILDCOPY
1457 // slack) puts the frame on the fast path that bypasses
1458 // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1459 // frames (no FCS, active dict, output too small for
1460 // slack) fall through to the legacy `decode_blocks` +
1461 // `read` drain loop below.
1462 let (content_size, fcs_declared, dict_active, window_size) = {
1463 let state_ref = self.state.as_ref().expect("init populated state");
1464 (
1465 state_ref.frame_header.frame_content_size(),
1466 state_ref.frame_header.fcs_declared(),
1467 state_ref.using_dict.is_some(),
1468 state_ref.frame_header.window_size().unwrap_or(0),
1469 )
1470 };
1471 let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1472 let direct_eligible =
1473 content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1474 if direct_eligible {
1475 let written = self.run_direct_decode(&mut input, output, content_size)?;
1476 output = &mut output[written..];
1477 total_bytes_written += written;
1478 continue;
1479 }
1480 // Non-direct fallback: pre-reserve the backing buffer to
1481 // `window_size` once so the per-block growth cycle is
1482 // skipped (see same comment on the no-lsm path above).
1483 if let Some(state) = self.state.as_mut() {
1484 state.decoder_scratch.reserve_buffer(window_size as usize);
1485 }
1486 let frame_start_total = total_bytes_written;
1487 loop {
1488 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1489 let bytes_written = self
1490 .read(output)
1491 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1492 output = &mut output[bytes_written..];
1493 total_bytes_written += bytes_written;
1494 if self.can_collect() != 0 {
1495 return Err(FrameDecoderError::TargetTooSmall);
1496 }
1497 if self.is_finished() {
1498 break;
1499 }
1500 }
1501 // Per-frame FCS validation on the legacy fallback path.
1502 // Use `fcs_declared()` (NOT `content_size > 0`) so an
1503 // empty frame with explicit FCS=0 on the wire still gets
1504 // validated.
1505 if fcs_declared {
1506 let produced = (total_bytes_written - frame_start_total) as u64;
1507 if produced != content_size {
1508 return Err(FrameDecoderError::FrameContentSizeMismatch {
1509 declared: content_size,
1510 produced,
1511 });
1512 }
1513 }
1514 }
1515
1516 Ok(total_bytes_written)
1517 }
1518
1519 /// Decode multiple frames into the output slice using a serialized dictionary.
1520 ///
1521 /// # Warning
1522 ///
1523 /// Each decoded frame is initialized with the parsed dictionary, even when a
1524 /// frame header omits the optional dictionary ID. Callers must only use this
1525 /// API when they already know the input frames were encoded with that
1526 /// dictionary; otherwise decoded output can be silently corrupted.
1527 pub fn decode_all_with_dict_bytes(
1528 &mut self,
1529 input: &[u8],
1530 output: &mut [u8],
1531 raw_dictionary: &[u8],
1532 ) -> Result<usize, FrameDecoderError> {
1533 let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
1534 self.decode_all_with_dict_handle(input, output, &dict)
1535 }
1536
1537 /// Decode multiple frames into the extra capacity of the output vector.
1538 ///
1539 /// `input` must contain an exact number of frames.
1540 ///
1541 /// `output` must have enough extra capacity to hold the decompressed data.
1542 /// This function reserves an additional [`WILDCOPY_OVERLENGTH`]
1543 /// bytes on top of the caller's capacity so the per-frame direct
1544 /// decode path stays eligible — that may grow the vector by up
1545 /// to that fixed amount via `Vec::reserve`. It will NOT grow
1546 /// further to fit the decompressed payload itself; the caller's
1547 /// pre-allocated capacity must already cover the data. If you
1548 /// don't know how large the output will be, use
1549 /// [`FrameDecoder::decode_blocks`] instead.
1550 ///
1551 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1552 ///
1553 /// The length of the output vector is updated to include the decompressed data.
1554 /// The length is not changed if an error occurs. The
1555 /// `WILDCOPY_OVERLENGTH` slack is internal — `output.len()` on
1556 /// return is the actual decompressed size, NOT the inflated
1557 /// capacity. Callers who pre-sized the Vec with
1558 /// `Vec::with_capacity(fcs)` see no functional change beyond
1559 /// the small one-time capacity bump.
1560 pub fn decode_all_to_vec(
1561 &mut self,
1562 input: &[u8],
1563 output: &mut Vec<u8>,
1564 ) -> Result<(), FrameDecoderError> {
1565 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1566 let len = output.len();
1567 // Reserve WILDCOPY slack on top of the caller's capacity so
1568 // `decode_all` can land on the direct path even when the
1569 // caller didn't pre-allocate slack themselves.
1570 output.reserve(WILDCOPY_OVERLENGTH);
1571 let cap = output.capacity();
1572 output.resize(cap, 0);
1573 match self.decode_all(input, &mut output[len..]) {
1574 Ok(bytes_written) => {
1575 let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
1576 output.resize(new_len, 0);
1577 Ok(())
1578 }
1579 Err(e) => {
1580 output.resize(len, 0);
1581 Err(e)
1582 }
1583 }
1584 }
1585
1586 /// Single-frame direct-decode path. Decodes one zstd frame into
1587 /// `output[..content_size]` via a stack-local
1588 /// `DecodeBuffer<UserSliceBackend>`, bypassing the per-block
1589 /// FlatBuf/Ring -> `read()` drain copy.
1590 ///
1591 /// # Preconditions (caller-enforced)
1592 ///
1593 /// - `self.init` (or `init_with_dict_handle`) was called for
1594 /// this frame so `self.state` is populated.
1595 /// - `content_size` matches `self.state.frame_header
1596 /// .frame_content_size()` and is `> 0` (caller already passed
1597 /// the eligibility gate).
1598 /// - `output.len() >= content_size + WILDCOPY_OVERLENGTH`.
1599 /// - No active dictionary
1600 /// (`self.state.using_dict.is_none()`).
1601 ///
1602 /// On return, `input` points at the byte immediately after the
1603 /// frame's checksum (or after the last block, when the frame
1604 /// has `content_checksum_flag = 0`). `self.state.frame_finished`
1605 /// is set so [`Self::is_finished`] reports `true`.
1606 fn run_direct_decode(
1607 &mut self,
1608 input: &mut &[u8],
1609 output: &mut [u8],
1610 content_size: u64,
1611 ) -> Result<usize, FrameDecoderError> {
1612 use super::block_decoder;
1613 use super::decode_buffer::DecodeBuffer;
1614 use super::scratch::DirectScratch;
1615 use super::user_slice_buf::UserSliceBackend;
1616 use crate::io::Read;
1617 use FrameDecoderError as err;
1618
1619 let state = self
1620 .state
1621 .as_mut()
1622 .expect("caller ensures init populated state");
1623
1624 // Borrow persistent fields out of whichever scratch variant
1625 // `init` produced (Flat for single_segment, Ring for
1626 // multi-segment) — both expose the same HUF/FSE/Vec
1627 // fields; only `buffer` differs and we don't use that here.
1628 // Macro-style binding avoids the closure / generic
1629 // gymnastics of returning multiple `&mut` from a match arm.
1630 let (huf, fse, offset_hist, literals_buffer, sequences, block_content_buffer, window_size) =
1631 match &mut state.decoder_scratch {
1632 DecoderScratchKind::Flat(s) => (
1633 &mut s.huf,
1634 &mut s.fse,
1635 &mut s.offset_hist,
1636 &mut s.literals_buffer,
1637 &mut s.sequences,
1638 &mut s.block_content_buffer,
1639 s.buffer.window_size,
1640 ),
1641 DecoderScratchKind::Ring(s) => (
1642 &mut s.huf,
1643 &mut s.fse,
1644 &mut s.offset_hist,
1645 &mut s.literals_buffer,
1646 &mut s.sequences,
1647 &mut s.block_content_buffer,
1648 s.buffer.window_size,
1649 ),
1650 };
1651 let backend = UserSliceBackend::from_slice(output);
1652 let buffer = DecodeBuffer::from_backend(backend, window_size);
1653 let mut direct = DirectScratch {
1654 huf,
1655 fse,
1656 offset_hist,
1657 literals_buffer,
1658 sequences,
1659 block_content_buffer,
1660 buffer,
1661 };
1662
1663 // Block loop. Mirrors `decode_blocks` (without the
1664 // strategy-bounded early exit — we always decode the whole
1665 // frame in one shot for the direct path). Keeps
1666 // `state.bytes_read_counter` / `state.block_counter` in
1667 // sync with `decode_blocks` so post-call accessors
1668 // (`bytes_read_from_source`, `blocks_decoded`) return
1669 // accurate values.
1670 let mut block_dec = block_decoder::new();
1671 // Track total output bytes against the declared
1672 // `frame_content_size` via the buffer's actual write
1673 // counter — `BlockHeader.decompressed_size` is 0 for
1674 // Compressed blocks (the header parser can't know the
1675 // expanded size before decoding the body), so per-header
1676 // tracking would always count 0 for those blocks and
1677 // miscount frames that aren't pure Raw/RLE.
1678 let mut produced: u64 = 0;
1679 // Per-block output ranges captured during the direct-path
1680 // loop. After the loop we re-borrow `output` (post-drop of
1681 // `direct`) and XXH64 each range into
1682 // `self.computed_block_checksums`, so the digests vector
1683 // stays consistent with the legacy `decode_blocks` path
1684 // regardless of which dispatch the frame took.
1685 // `Vec::new()` does not allocate, so this stays free when
1686 // `per_block_checksums_enabled` is false: the `push` and the
1687 // post-loop hashing loop are both gated by the same flag.
1688 #[cfg(all(feature = "lsm", feature = "hash"))]
1689 let mut block_ranges: alloc::vec::Vec<(usize, usize)> = alloc::vec::Vec::new();
1690 loop {
1691 #[cfg(all(feature = "lsm", feature = "hash"))]
1692 let produced_before: Option<usize> = if self.per_block_checksums_enabled {
1693 Some(produced as usize)
1694 } else {
1695 None
1696 };
1697 let (block_header, hsize) = block_dec
1698 .read_block_header(&mut *input)
1699 .map_err(err::FailedToReadBlockHeader)?;
1700 state.bytes_read_counter += u64::from(hsize);
1701 // Pre-flight FCS check ONLY for Raw / RLE blocks where
1702 // `decompressed_size` is the actual block output size.
1703 // For Compressed blocks the header field is 0; the
1704 // post-decode check below catches overflow via the
1705 // backend's actual write counter delta.
1706 let block_upper = u64::from(block_header.decompressed_size);
1707 if block_upper > 0 && produced + block_upper > content_size {
1708 // Frame is corrupt — Raw/RLE block headers claim
1709 // more output than the FCS allows.
1710 return Err(err::FrameContentSizeMismatch {
1711 declared: content_size,
1712 produced: produced + block_upper,
1713 });
1714 }
1715 // Slice-source fast path: consume the block body
1716 // straight from `input` without copying into the
1717 // persistent `block_content_buffer`.
1718 let body_consumed = match block_dec.decode_block_content_from_slice(
1719 &block_header,
1720 &mut direct,
1721 &mut *input,
1722 ) {
1723 Ok(n) => n,
1724 // Defense-in-depth: RLE / Raw block whose declared
1725 // `decompressed_size` slipped past the per-block
1726 // pre-flight above and tripped the backend's
1727 // fallible write surface.
1728 Err(crate::decoding::errors::DecodeBlockContentError::BackendOverflow {
1729 ..
1730 }) => {
1731 // Use saturating_add on the
1732 // `produced + decompressed_size` sum. Each block
1733 // is bounded by 128 KiB (MAX_BLOCK_SIZE), but
1734 // accumulated `produced` can grow toward
1735 // u64::MAX across adversarial frames. Saturating
1736 // avoids a panic on the error path itself.
1737 return Err(err::FrameContentSizeMismatch {
1738 declared: content_size,
1739 produced: produced
1740 .saturating_add(u64::from(block_header.decompressed_size)),
1741 });
1742 }
1743 Err(e) => return Err(err::FailedToReadBlockBody(e)),
1744 };
1745 produced = direct.buffer.buffer_ref().tail() as u64;
1746 // Post-decode FCS overflow check.
1747 if produced > content_size {
1748 return Err(err::FrameContentSizeMismatch {
1749 declared: content_size,
1750 produced,
1751 });
1752 }
1753 state.bytes_read_counter += body_consumed;
1754 state.block_counter += 1;
1755 #[cfg(all(feature = "lsm", feature = "hash"))]
1756 if let Some(produced_before) = produced_before {
1757 block_ranges.push((produced_before, produced as usize));
1758 }
1759 // Cap the visible buffer at window_size between blocks
1760 // so the next block's match-offset validation matches
1761 // the spec's `offset <= window_size` rule.
1762 direct.buffer.drop_to_window_size();
1763 if block_header.last_block {
1764 if state.frame_header.descriptor.content_checksum_flag() {
1765 let mut chksum = [0u8; 4];
1766 input
1767 .read_exact(&mut chksum)
1768 .map_err(err::FailedToReadChecksum)?;
1769 state.bytes_read_counter += 4;
1770 state.check_sum = Some(u32::from_le_bytes(chksum));
1771 }
1772 break;
1773 }
1774 }
1775 // Final sanity: blocks summed to exactly `content_size`.
1776 if produced != content_size {
1777 return Err(err::FrameContentSizeMismatch {
1778 declared: content_size,
1779 produced,
1780 });
1781 }
1782
1783 let written = content_size as usize;
1784 state.frame_finished = true;
1785 // Drop the stack-local DirectScratch (and its DecodeBuffer
1786 // borrow on `output`) so we can re-borrow `output` for the
1787 // hash pass below.
1788 drop(direct);
1789 // Per-block XXH64 (low 32 bits) over the captured ranges.
1790 // Mirrors `decode_blocks`' per-block hashing so the digests
1791 // vector stays identical regardless of which dispatch path
1792 // the frame took. Ranges were recorded inside the loop while
1793 // `direct` held a mutable borrow on `output`; now that the
1794 // borrow is dropped we can read the slices directly.
1795 #[cfg(all(feature = "lsm", feature = "hash"))]
1796 if self.per_block_checksums_enabled {
1797 use core::hash::Hasher;
1798 for (start, end) in &block_ranges {
1799 let mut h = twox_hash::XxHash64::with_seed(0);
1800 h.write(&output[*start..*end]);
1801 self.computed_block_checksums.push(h.finish() as u32);
1802 }
1803 }
1804 #[cfg(feature = "hash")]
1805 if state.frame_header.descriptor.content_checksum_flag() {
1806 // Direct path bypasses the per-write hash accounting
1807 // (DecodeBuffer hashes during drain; the direct path
1808 // never drains because the user slice IS the buffer).
1809 // Walk the decoded output once and propagate the
1810 // resulting hasher state so the frame-tail XXH64 check
1811 // (in the block loop above) can verify the digest.
1812 //
1813 // Gated on `content_checksum_flag`: frames without a
1814 // trailing checksum byte have nothing to verify, and the
1815 // 1 MiB+ second-pass scan dominated decode wall time
1816 // (63 % of total on z000033 L-3 in the standalone perf
1817 // loop). `get_calculated_checksum()` now returns `None`
1818 // for flag-off frames, matching this skip.
1819 use core::hash::Hasher;
1820 let mut hasher = twox_hash::XxHash64::with_seed(0);
1821 hasher.write(&output[..written]);
1822 match &mut state.decoder_scratch {
1823 DecoderScratchKind::Flat(s) => s.buffer.hash = hasher,
1824 DecoderScratchKind::Ring(s) => s.buffer.hash = hasher,
1825 }
1826 }
1827 Ok(written)
1828 }
1829}
1830
1831/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
1832/// this will retain window_size bytes, else it will drain it completely
1833impl Read for FrameDecoder {
1834 fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
1835 let state = match &mut self.state {
1836 None => return Ok(0),
1837 Some(s) => s,
1838 };
1839 if state.frame_finished {
1840 state.decoder_scratch.buffer_read_all(target)
1841 } else {
1842 state.decoder_scratch.buffer_read(target)
1843 }
1844 }
1845}
1846
1847#[cfg(test)]
1848mod tests {
1849 extern crate std;
1850
1851 use super::{DictionaryHandle, FrameDecoder};
1852 use crate::encoding::{CompressionLevel, FrameCompressor};
1853 use alloc::vec::Vec;
1854
1855 #[test]
1856 fn decode_all_legacy_drain_matches_direct_path_on_single_segment_frame() {
1857 // Roundtrip a small payload through the encoder, then decode
1858 // it via `decode_all` on two output shapes that select
1859 // different internal paths:
1860 // 1. Tight output (no WILDCOPY_OVERLENGTH slack) → legacy
1861 // `decode_blocks` + `read()` drain path.
1862 // 2. Output with WILDCOPY slack → direct
1863 // `run_direct_decode` + `UserSliceBackend` path.
1864 // Both paths must produce identical output bytes — the only
1865 // difference is the internal buffer/drain shape, not the
1866 // decoded semantics. This is the regression gate for the
1867 // direct-decode wiring.
1868 let payload: Vec<u8> = (0..4096u32).map(|i| (i & 0xFF) as u8).collect();
1869 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1870 compressor.set_source(payload.as_slice());
1871 let mut compressed = Vec::new();
1872 compressor.set_drain(&mut compressed);
1873 compressor.compress();
1874
1875 // Baseline: tight output → legacy drain path.
1876 let mut dec_a = FrameDecoder::new();
1877 let mut out_a = alloc::vec![0u8; payload.len()];
1878 let n_a = dec_a
1879 .decode_all(compressed.as_slice(), &mut out_a)
1880 .expect("decode_all (legacy drain) should succeed");
1881 assert_eq!(n_a, payload.len());
1882 assert_eq!(&out_a[..n_a], payload.as_slice());
1883
1884 // Direct: output with WILDCOPY slack → direct path.
1885 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1886 let mut dec_b = FrameDecoder::new();
1887 let mut out_b = alloc::vec![0u8; payload.len() + slack];
1888 let n_b = dec_b
1889 .decode_all(compressed.as_slice(), &mut out_b)
1890 .expect("decode_all (direct path) should succeed");
1891 assert_eq!(
1892 n_b,
1893 payload.len(),
1894 "direct decode produced wrong byte count"
1895 );
1896 assert_eq!(&out_b[..n_b], payload.as_slice());
1897 }
1898
1899 #[test]
1900 fn decode_all_multi_segment_frame_decodes_correctly() {
1901 // Multi-segment frame: payload large enough that the
1902 // encoder's default frame layout has `single_segment_flag =
1903 // false` and `window_size < frame_content_size`. The direct
1904 // path must cap the visible buffer at window_size after each
1905 // block (drop_to_window_size) so match-offset validation
1906 // matches the spec rule `offset <= window_size`, and still
1907 // produce the same bytes as decode_all on the
1908 // FlatBuf/Ring-backed path.
1909 //
1910 // Make the payload structured so multi-segment behavior
1911 // actually kicks in: 2 MiB of repeating + random-ish bytes
1912 // forces window_size lower than content_size at the encoder.
1913 let mut payload: Vec<u8> = Vec::with_capacity(2 * 1024 * 1024);
1914 for i in 0..payload.capacity() {
1915 payload.push((i.wrapping_mul(2_654_435_761) & 0xFF) as u8);
1916 }
1917 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1918 compressor.set_source(payload.as_slice());
1919 let mut compressed = Vec::new();
1920 compressor.set_drain(&mut compressed);
1921 compressor.compress();
1922
1923 // Baseline: decode_all through the FlatBuf+drain path.
1924 let mut dec_a = FrameDecoder::new();
1925 let mut out_a = alloc::vec![0u8; payload.len()];
1926 let n_a = dec_a
1927 .decode_all(compressed.as_slice(), &mut out_a)
1928 .expect("decode_all should succeed");
1929 assert_eq!(n_a, payload.len());
1930 assert_eq!(&out_a[..n_a], payload.as_slice());
1931
1932 // Direct path: must give identical bytes via UserSliceBackend
1933 // + per-block drop_to_window_size.
1934 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1935 let mut dec_b = FrameDecoder::new();
1936 let mut out_b = alloc::vec![0u8; payload.len() + slack];
1937 let n_b = dec_b
1938 .decode_all(compressed.as_slice(), &mut out_b)
1939 .expect("decode_all should succeed on multi-segment frame");
1940 assert_eq!(n_b, payload.len(), "wrong byte count on direct path");
1941 assert_eq!(&out_b[..n_b], payload.as_slice());
1942
1943 // Sanity-check: confirm the encoded frame really IS
1944 // multi-segment. If a future encoder default changes,
1945 // catching the assumption here is better than silently
1946 // testing single_segment on this name.
1947 let mut sanity = FrameDecoder::new();
1948 sanity.init(&mut compressed.as_slice()).unwrap();
1949 assert!(
1950 !sanity
1951 .state
1952 .as_ref()
1953 .unwrap()
1954 .frame_header
1955 .descriptor
1956 .single_segment_flag(),
1957 "test precondition violated: frame is single-segment, rename or resize"
1958 );
1959 }
1960
1961 #[cfg(feature = "hash")]
1962 #[test]
1963 fn decode_all_propagates_checksum_into_persistent_scratch() {
1964 // Direct path on a checksum-flagged frame: the FrameCompressor
1965 // under `feature = "hash"` sets content_checksum_flag, so the
1966 // decoded frame has a recorded checksum. After
1967 // decode_all we must be able to verify it matches via
1968 // the public get_calculated_checksum() accessor — the digest
1969 // is computed by walking output at end of decode and stored
1970 // into the persistent scratch's hasher.
1971 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
1972 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1973 compressor.set_source(payload.as_slice());
1974 let mut compressed = Vec::new();
1975 compressor.set_drain(&mut compressed);
1976 compressor.compress();
1977
1978 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1979 let mut dec = FrameDecoder::new();
1980 let mut out = alloc::vec![0u8; payload.len() + slack];
1981 let n = dec
1982 .decode_all(compressed.as_slice(), &mut out)
1983 .expect("decode_all with checksum must succeed");
1984 assert_eq!(n, payload.len());
1985 assert_eq!(&out[..n], payload.as_slice());
1986
1987 // Both sides must report the same checksum: the frame header
1988 // carries the stored u32, and get_calculated_checksum reads
1989 // the running digest the direct path just propagated.
1990 let stored = dec.get_checksum_from_data();
1991 let calculated = dec.get_calculated_checksum();
1992 assert!(stored.is_some(), "frame must carry stored checksum");
1993 assert!(
1994 calculated.is_some(),
1995 "direct path must propagate calculated checksum"
1996 );
1997 assert_eq!(
1998 stored, calculated,
1999 "stored vs calculated checksum mismatch on direct path"
2000 );
2001 }
2002
2003 #[test]
2004 fn decode_all_fcs_overflow_via_corrupt_frame_returns_structured_error() {
2005 // Hand-build a corrupt frame that declares
2006 // frame_content_size = 4 but the (last) block carries a
2007 // larger Raw payload. The pre-flight FCS check inside the
2008 // direct path's block loop catches this and returns the
2009 // structured FrameContentSizeMismatch variant — not a
2010 // panic, not a generic TargetTooSmall.
2011 //
2012 // Frame layout (single_segment, FCS=4):
2013 // magic 4 bytes 0xFD2FB528
2014 // FHD 1 byte single_segment=1, no checksum,
2015 // FCS field size = 0 (-> 1-byte FCS)
2016 // FCS 1 byte 0x04
2017 // block_header 3 bytes last=1, type=Raw, block_size=10
2018 // block_payload 10 bytes 0xAA repeated
2019 let mut frame = alloc::vec::Vec::new();
2020 // magic
2021 frame.extend_from_slice(&0xFD2FB528u32.to_le_bytes());
2022 // FHD: single_segment=1, fcs_flag=0 (1-byte FCS), no checksum,
2023 // no dict. Bit layout: FCS(7-6)=0, single_segment(5)=1,
2024 // reserved/uncs(4)=0, content_checksum(2)=0, dict(0-1)=00.
2025 frame.push(0b0010_0000);
2026 // FCS: 1 byte
2027 frame.push(4);
2028 // Block header: cBlockSize=10, type=Raw (0), last=1
2029 // 3-byte LE: bit0=last, bits1-2=type(2 bits), bits3-23=size
2030 let cblock_size: u32 = 10;
2031 let bh: u32 = 1 | (cblock_size << 3); // last=1, type=Raw=0
2032 frame.push((bh & 0xFF) as u8);
2033 frame.push((bh >> 8) as u8);
2034 frame.push((bh >> 16) as u8);
2035 // Payload — 10 bytes that, if decoded, would exceed FCS=4.
2036 frame.extend(core::iter::repeat_n(0xAAu8, 10));
2037
2038 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
2039 let mut dec = FrameDecoder::new();
2040 let mut out = alloc::vec![0u8; 4 + slack];
2041 let err = dec
2042 .decode_all(&frame, &mut out)
2043 .expect_err("FCS-overflow frame must fail decode");
2044 assert!(
2045 matches!(
2046 err,
2047 super::FrameDecoderError::FrameContentSizeMismatch { .. }
2048 ),
2049 "expected FrameContentSizeMismatch, got {:?}",
2050 err
2051 );
2052 }
2053
2054 #[test]
2055 fn decode_all_falls_back_when_output_too_small_for_wildcopy_slack() {
2056 // Output sized exactly to frame_content_size (no
2057 // WILDCOPY_OVERLENGTH slack) must NOT trigger the direct
2058 // path — the burst's `extend_from_within_unchecked` writes
2059 // past `tail` into the slack region. Direct dispatcher
2060 // recognises this and falls back to the FlatBuf + drain
2061 // path which still produces the right output.
2062 let payload: Vec<u8> = (0..2048u32)
2063 .map(|i| (i.wrapping_mul(31) & 0xFF) as u8)
2064 .collect();
2065 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2066 compressor.set_source(payload.as_slice());
2067 let mut compressed = Vec::new();
2068 compressor.set_drain(&mut compressed);
2069 compressor.compress();
2070
2071 let mut dec = FrameDecoder::new();
2072 // Exactly payload.len(), no slack — direct path is gated out.
2073 let mut out = alloc::vec![0u8; payload.len()];
2074 let n = dec
2075 .decode_all(compressed.as_slice(), &mut out)
2076 .expect("decode_all should still succeed via fallback");
2077 assert_eq!(n, payload.len());
2078 assert_eq!(&out[..n], payload.as_slice());
2079 }
2080
2081 #[test]
2082 fn decode_all_fallback_validates_fcs_against_total_output() {
2083 // Synthetic single-segment frame: FCS = 20 bytes, but the
2084 // last-block flag fires after only 4 bytes of raw payload.
2085 // On the direct path this would trip the post-block
2086 // `produced > content_size` check; the fallback path
2087 // (eligible=false because output is sized exactly to FCS,
2088 // no WILDCOPY slack) used to silently return Ok(4). With
2089 // the fix it now surfaces `FrameContentSizeMismatch`
2090 // matching the direct path.
2091 //
2092 // Frame layout: 4 B magic | 1 B FHD (single_segment=1,
2093 // FCS_flag=3 → 8-byte FCS) | 8 B FCS=20 | block header
2094 // (Raw, last, size=4) | 4 raw bytes.
2095 let mut wire = Vec::new();
2096 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes()); // magic
2097 // FHD: FCS_flag=3 (8-byte FCS) <<6 | single_segment=1 <<5.
2098 wire.push(0b1110_0000);
2099 wire.extend_from_slice(&20u64.to_le_bytes()); // declared FCS
2100 // Block header: (size << 3) | (block_type << 1) | last_block.
2101 // Raw block (block_type=0), last_block=1, size=4 → 0b00100001 = 0x21.
2102 wire.push(0x21);
2103 wire.push(0x00);
2104 wire.push(0x00);
2105 wire.extend_from_slice(&[1u8, 2, 3, 4]);
2106
2107 let mut dec = FrameDecoder::new();
2108 // Size output exactly at declared FCS (no WILDCOPY slack)
2109 // so the eligibility check gates the direct path out.
2110 let mut out = alloc::vec![0u8; 20];
2111 let err = dec
2112 .decode_all(wire.as_slice(), &mut out)
2113 .expect_err("fallback must reject corrupt FCS underflow");
2114 match err {
2115 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2116 declared,
2117 produced,
2118 } => {
2119 assert_eq!(declared, 20);
2120 assert_eq!(produced, 4);
2121 }
2122 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2123 }
2124 }
2125
2126 #[test]
2127 fn decode_all_fallback_treats_explicit_fcs_zero_as_declared() {
2128 // Synthetic multi-segment frame with FCS_flag=2 (4-byte
2129 // FCS) explicitly set to 0. The header DECLARES zero
2130 // content, but the body carries a 5-byte raw last-block.
2131 // `fcs_declared()` must return true (the field is on the
2132 // wire) so the fallback's post-decode size check sees the
2133 // mismatch — even though `frame_content_size == 0`. This
2134 // is exactly the FCS=0 edge case where the previous
2135 // `content_size > 0` proxy would have silently accepted
2136 // the corrupt frame.
2137 //
2138 // Frame layout:
2139 // 4 B magic — 28 B5 2F FD
2140 // 1 B FHD — FCS_flag=2 (bits 7-6), no
2141 // single_segment, content_checksum=0,
2142 // dict_id_flag=0 → 0b1000_0000
2143 // 1 B window_descriptor — exp=10, mantissa=0 → window=1 MiB
2144 // 4 B FCS — 0 LE
2145 // 3 B block header — raw, last, size=5 → 0x29 0x00 0x00
2146 // 5 B raw payload — anything non-empty
2147 let mut wire = Vec::new();
2148 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2149 wire.push(0b1000_0000); // FHD: FCS_flag=2, others 0.
2150 wire.push(0x50); // window_descriptor: exp=10, mantissa=0.
2151 wire.extend_from_slice(&0u32.to_le_bytes()); // FCS = 0.
2152 // Block header (24-bit LE): (size << 3) | (block_type << 1) | last_block
2153 // = (5 << 3) | (0 << 1) | 1 = 0x29.
2154 wire.push(0x29);
2155 wire.push(0x00);
2156 wire.push(0x00);
2157 wire.extend_from_slice(&[1u8, 2, 3, 4, 5]);
2158
2159 let mut dec = FrameDecoder::new();
2160 // FCS=0 declared, so eligibility (`content_size > 0`)
2161 // false — falls through to the drain loop. Output buffer
2162 // size doesn't matter for the eligibility check here;
2163 // give it some room so `read()` can drain the block.
2164 let mut out = alloc::vec![0u8; 16];
2165 let err = dec
2166 .decode_all(wire.as_slice(), &mut out)
2167 .expect_err("corrupt FCS=0 + 5-byte block must error");
2168 match err {
2169 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2170 declared,
2171 produced,
2172 } => {
2173 assert_eq!(declared, 0);
2174 assert_eq!(produced, 5);
2175 }
2176 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2177 }
2178 }
2179
2180 #[test]
2181 fn decode_all_fallback_accepts_honest_explicit_fcs_zero() {
2182 // Companion to the corrupt-FCS=0 test above: an HONEST
2183 // empty frame with FCS_flag=2 (4-byte FCS) explicitly set
2184 // to 0 AND a 0-byte raw last-block. `fcs_declared()`
2185 // returns true and `content_size == 0 == total_written`,
2186 // so the fallback validation accepts the frame instead of
2187 // misreporting a mismatch.
2188 //
2189 // (Single-segment FCS=0 would test a similar invariant
2190 // but trips header-stage validation: `window_size =
2191 // frame_content_size = 0 < MIN_WINDOW_SIZE` fails the
2192 // window-size sanity check before decode runs. Use the
2193 // multi-segment shape where `window_size` comes from
2194 // `window_descriptor` independently of FCS.)
2195 //
2196 // Frame layout:
2197 // 4 B magic
2198 // 1 B FHD — FCS_flag=2, others 0 → 0x80
2199 // 1 B window_descriptor — exp=10 → 1 MiB window
2200 // 4 B FCS — 0 LE
2201 // 3 B block header — raw, last, size=0 → 0x01 0x00 0x00
2202 let mut wire = Vec::new();
2203 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2204 wire.push(0b1000_0000);
2205 wire.push(0x50);
2206 wire.extend_from_slice(&0u32.to_le_bytes());
2207 // Block header: (0 << 3) | (0 << 1) | 1 = 0x01.
2208 wire.push(0x01);
2209 wire.push(0x00);
2210 wire.push(0x00);
2211
2212 let mut dec = FrameDecoder::new();
2213 let mut out = alloc::vec![0u8; 16];
2214 let n = dec
2215 .decode_all(wire.as_slice(), &mut out)
2216 .expect("honest FCS=0 + empty block must succeed");
2217 assert_eq!(n, 0);
2218 }
2219
2220 #[test]
2221 fn reset_with_dict_handle_applies_dict_when_no_dict_id() {
2222 let payload = b"reset-without-dict-id";
2223 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2224 compressor.set_source(payload.as_slice());
2225 let mut compressed = Vec::new();
2226 compressor.set_drain(&mut compressed);
2227 compressor.compress();
2228
2229 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2230 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
2231
2232 let mut decoder = FrameDecoder::new();
2233 decoder
2234 .reset_with_dict_handle(compressed.as_slice(), &handle)
2235 .expect("reset should succeed");
2236 let state = decoder.state.as_ref().expect("state should be initialized");
2237 assert!(state.frame_header.dictionary_id().is_none());
2238 assert_eq!(state.using_dict, Some(handle.id()));
2239 }
2240
2241 #[cfg(feature = "lsm")]
2242 mod expect_validation {
2243 use super::*;
2244 use crate::decoding::errors::FrameDecoderError;
2245
2246 fn compress(payload: &[u8]) -> Vec<u8> {
2247 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2248 compressor.set_source(payload);
2249 let mut compressed = Vec::new();
2250 compressor.set_drain(&mut compressed);
2251 compressor.compress();
2252 compressed
2253 }
2254
2255 fn compress_with_dict(payload: &[u8], dict_raw: &[u8]) -> Vec<u8> {
2256 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2257 compressor
2258 .set_dictionary_from_bytes(dict_raw)
2259 .expect("dict load");
2260 compressor.set_source(payload);
2261 let mut compressed = Vec::new();
2262 compressor.set_drain(&mut compressed);
2263 compressor.compress();
2264 compressed
2265 }
2266
2267 #[test]
2268 fn expect_dict_id_none_default_allows_anything() {
2269 let compressed = compress(b"hello-no-expect");
2270 let mut decoder = FrameDecoder::new();
2271 decoder
2272 .reset(compressed.as_slice())
2273 .expect("default None passes");
2274 }
2275
2276 #[test]
2277 fn expect_dict_id_zero_matches_frame_without_dict_id() {
2278 // Default-encoded frame has no dict_id; pinning Some(0)
2279 // ("no dictionary expected") must accept it.
2280 let compressed = compress(b"payload");
2281 let mut decoder = FrameDecoder::new();
2282 decoder.expect_dict_id(Some(0));
2283 decoder
2284 .reset(compressed.as_slice())
2285 .expect("Some(0) ~ None");
2286 }
2287
2288 #[test]
2289 fn expect_dict_id_matching_value_passes() {
2290 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2291 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2292 let actual_id = handle.id();
2293
2294 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2295
2296 let mut decoder = FrameDecoder::new();
2297 decoder.expect_dict_id(Some(actual_id));
2298 // Decode requires the dict to be registered; using
2299 // reset_with_dict_handle for that.
2300 decoder
2301 .reset_with_dict_handle(compressed.as_slice(), &handle)
2302 .expect("matching dict_id passes");
2303 }
2304
2305 #[test]
2306 fn expect_dict_id_mismatching_value_fails_before_decode() {
2307 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2308 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2309 let actual_id = handle.id();
2310 let wrong_id = actual_id.wrapping_add(1);
2311
2312 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2313
2314 let mut decoder = FrameDecoder::new();
2315 decoder.expect_dict_id(Some(wrong_id));
2316 let err = decoder
2317 .reset_with_dict_handle(compressed.as_slice(), &handle)
2318 .expect_err("mismatch must fail");
2319 match err {
2320 FrameDecoderError::UnexpectedDictId { expected, found } => {
2321 assert_eq!(expected, Some(wrong_id));
2322 assert_eq!(found, Some(actual_id));
2323 }
2324 other => panic!("expected UnexpectedDictId, got {other:?}"),
2325 }
2326 }
2327
2328 #[test]
2329 fn expect_dict_id_nonzero_fails_on_frame_without_dict_id() {
2330 // Frame has no dict_id; expecting Some(42) (non-zero)
2331 // must fail with found = None.
2332 let compressed = compress(b"no-dict-frame");
2333 let mut decoder = FrameDecoder::new();
2334 decoder.expect_dict_id(Some(42));
2335 let err = decoder
2336 .reset(compressed.as_slice())
2337 .expect_err("nonzero expectation on dictless frame must fail");
2338 match err {
2339 FrameDecoderError::UnexpectedDictId { expected, found } => {
2340 assert_eq!(expected, Some(42));
2341 assert_eq!(found, None);
2342 }
2343 other => panic!("expected UnexpectedDictId, got {other:?}"),
2344 }
2345 }
2346
2347 #[test]
2348 fn expect_window_descriptor_none_default_allows_anything() {
2349 let compressed = compress(b"hello-no-wd-expect");
2350 let mut decoder = FrameDecoder::new();
2351 decoder
2352 .reset(compressed.as_slice())
2353 .expect("default None passes");
2354 }
2355
2356 #[test]
2357 fn expect_window_descriptor_mismatch_fails_before_decode() {
2358 // Compress a payload large enough to force a
2359 // multi-segment frame (window_descriptor on wire).
2360 // Default compression at >256 KiB produces multi-
2361 // segment frames with a real window_descriptor byte.
2362 let payload = alloc::vec![0xABu8; 512 * 1024];
2363 let compressed = compress(&payload);
2364
2365 // Read the actual window_descriptor by decoding once
2366 // without expectations, then pin a wrong value.
2367 let mut probe_decoder = FrameDecoder::new();
2368 probe_decoder.reset(compressed.as_slice()).unwrap();
2369 let probe_state = probe_decoder.state.as_ref().unwrap();
2370 let actual_wd = probe_state
2371 .frame_header
2372 .window_descriptor()
2373 .expect("multi-segment frame should expose window_descriptor");
2374 let wrong_wd = actual_wd.wrapping_add(0x10); // bump exponent
2375
2376 let mut decoder = FrameDecoder::new();
2377 decoder.expect_window_descriptor(Some(wrong_wd));
2378 let err = decoder
2379 .reset(compressed.as_slice())
2380 .expect_err("wrong window_descriptor must fail");
2381 match err {
2382 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2383 assert_eq!(expected, wrong_wd);
2384 assert_eq!(found, Some(actual_wd));
2385 }
2386 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2387 }
2388 }
2389
2390 /// Build a minimal synthetic single-segment zstd frame
2391 /// carrying a 4-byte raw payload. RFC 8878 §3.1.1.1
2392 /// layout, hand-rolled because our default
2393 /// `FrameCompressor` settings don't emit
2394 /// `single_segment_flag` for tiny inputs.
2395 ///
2396 /// Wire bytes (13 total for 4-byte payload):
2397 /// ```text
2398 /// 28 B5 2F FD magic
2399 /// 20 FHD: single_segment=1, FCS_flag=0
2400 /// 04 FCS (single byte, value = payload.len())
2401 /// 21 00 00 block header: raw, last, size=4
2402 /// .. .. .. .. payload bytes
2403 /// ```
2404 fn synth_single_segment_frame(payload: &[u8]) -> Vec<u8> {
2405 assert!(payload.len() <= 255, "1-byte FCS field caps at 255");
2406 assert!(payload.len() < (1usize << 21), "block size 21-bit max");
2407 let mut out = Vec::new();
2408 // Magic 0xFD2FB528 LE.
2409 out.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2410 // FHD: single_segment_flag (bit 5) set, everything
2411 // else zero. With single_segment + FCS_flag=0 the FCS
2412 // field is 1 byte. No window_descriptor on wire.
2413 out.push(0b0010_0000);
2414 // 1-byte FCS = payload length.
2415 out.push(payload.len() as u8);
2416 // Block header (3 bytes LE):
2417 // last_block=1, block_type=0 (Raw), block_size=payload.len().
2418 // Encoded: (size << 3) | (block_type << 1) | last_block.
2419 // Block header: last_block flag in bit 0, block_type
2420 // (0 = Raw) in bits 1-2, block size in bits 3+.
2421 let bh: u32 = ((payload.len() as u32) << 3) | 1;
2422 out.push((bh & 0xFF) as u8);
2423 out.push(((bh >> 8) & 0xFF) as u8);
2424 out.push(((bh >> 16) & 0xFF) as u8);
2425 // Raw payload.
2426 out.extend_from_slice(payload);
2427 out
2428 }
2429
2430 #[test]
2431 fn expect_window_descriptor_on_single_segment_frame_fails_with_found_none() {
2432 // Single-segment frames omit the window_descriptor
2433 // byte from the wire entirely. Setting an expectation
2434 // here must surface `found: None` so callers
2435 // distinguish "wrong descriptor" from "no descriptor
2436 // on the wire" — never silently pass.
2437 let compressed = synth_single_segment_frame(b"tiny");
2438
2439 // First sanity-check: the synthetic frame decodes
2440 // cleanly without any expectation.
2441 {
2442 let mut probe = FrameDecoder::new();
2443 probe
2444 .reset(compressed.as_slice())
2445 .expect("synth frame parses");
2446 let probe_state = probe.state.as_ref().unwrap();
2447 assert!(
2448 probe_state.frame_header.window_descriptor().is_none(),
2449 "synth frame must be single-segment"
2450 );
2451 }
2452
2453 let mut decoder = FrameDecoder::new();
2454 decoder.expect_window_descriptor(Some(0x40));
2455 let err = decoder
2456 .reset(compressed.as_slice())
2457 .expect_err("single-segment + expectation must fail");
2458 match err {
2459 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2460 assert_eq!(expected, 0x40);
2461 assert_eq!(found, None);
2462 }
2463 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2464 }
2465 }
2466
2467 #[test]
2468 fn validation_failure_leaves_decoder_re_resettable() {
2469 // After UnexpectedDictId on a wrong-expectation reset,
2470 // clearing the expectation and re-calling reset must
2471 // succeed on the same source — no lingering failed
2472 // state.
2473 let compressed = compress(b"re-resettable");
2474
2475 let mut decoder = FrameDecoder::new();
2476 decoder.expect_dict_id(Some(42));
2477 let err = decoder
2478 .reset(compressed.as_slice())
2479 .expect_err("first reset fails");
2480 assert!(matches!(err, FrameDecoderError::UnexpectedDictId { .. }));
2481
2482 // Clear expectation and retry on a fresh source.
2483 decoder.expect_dict_id(None);
2484 decoder
2485 .reset(compressed.as_slice())
2486 .expect("retry after clearing expectation should succeed");
2487 }
2488 }
2489
2490 /// Build a skippable frame on the wire: 4-byte LE magic + 4-byte LE
2491 /// length + payload bytes. RFC 8878 §3.1.2 restricts the magic
2492 /// variant to `0..=15`; assert here so accidental misuse of the
2493 /// helper can't smuggle a non-skippable magic past the tests.
2494 #[cfg(feature = "lsm")]
2495 fn build_skippable_frame(variant: u8, payload: &[u8]) -> Vec<u8> {
2496 assert!(
2497 variant <= 15,
2498 "skippable-frame variant {variant} outside RFC 8878 0..=15 range",
2499 );
2500 let mut out = Vec::with_capacity(8 + payload.len());
2501 let magic: u32 = 0x184D2A50 + u32::from(variant);
2502 out.extend_from_slice(&magic.to_le_bytes());
2503 out.extend_from_slice(&u32::try_from(payload.len()).unwrap().to_le_bytes());
2504 out.extend_from_slice(payload);
2505 out
2506 }
2507
2508 #[cfg(feature = "lsm")]
2509 #[test]
2510 fn decode_all_with_skippable_visitor_sees_payloads_in_order() {
2511 // Build a stream: skippable(v0, "alpha") + zstd_frame +
2512 // skippable(v3, "beta") + zstd_frame + skippable(v15, "")
2513 // and verify the visitor is invoked exactly three times with
2514 // the correct (variant, payload) pairs in stream order while
2515 // the zstd frames decode normally.
2516 let payload_a: Vec<u8> = (0..256u16).map(|i| i as u8).collect();
2517 let payload_b: Vec<u8> = (0..256u16).map(|i| (i ^ 0xAA) as u8).collect();
2518
2519 let mut comp_a = Vec::new();
2520 let mut c = FrameCompressor::new(CompressionLevel::Default);
2521 c.set_source(payload_a.as_slice());
2522 c.set_drain(&mut comp_a);
2523 c.compress();
2524
2525 let mut comp_b = Vec::new();
2526 let mut c = FrameCompressor::new(CompressionLevel::Default);
2527 c.set_source(payload_b.as_slice());
2528 c.set_drain(&mut comp_b);
2529 c.compress();
2530
2531 let skip0 = build_skippable_frame(0, b"alpha");
2532 let skip3 = build_skippable_frame(3, b"beta");
2533 let skip15 = build_skippable_frame(15, &[]);
2534
2535 let mut stream = Vec::new();
2536 stream.extend_from_slice(&skip0);
2537 stream.extend_from_slice(&comp_a);
2538 stream.extend_from_slice(&skip3);
2539 stream.extend_from_slice(&comp_b);
2540 stream.extend_from_slice(&skip15);
2541
2542 let mut decoder = FrameDecoder::new();
2543 let mut out = alloc::vec![0u8; payload_a.len() + payload_b.len()];
2544 let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
2545 let n = decoder
2546 .decode_all_with_skippable_visitor(stream.as_slice(), &mut out, |variant, payload| {
2547 collected.push((variant, payload.to_vec()));
2548 })
2549 .expect("decode_all_with_skippable_visitor should succeed");
2550
2551 // All three skippables visited in stream order.
2552 assert_eq!(collected.len(), 3);
2553 assert_eq!(collected[0], (0u8, b"alpha".to_vec()));
2554 assert_eq!(collected[1], (3u8, b"beta".to_vec()));
2555 assert_eq!(collected[2], (15u8, Vec::<u8>::new()));
2556
2557 // Both zstd frames decoded into `out` back-to-back.
2558 assert_eq!(n, payload_a.len() + payload_b.len());
2559 assert_eq!(&out[..payload_a.len()], payload_a.as_slice());
2560 assert_eq!(&out[payload_a.len()..n], payload_b.as_slice());
2561 }
2562
2563 #[cfg(feature = "lsm")]
2564 #[test]
2565 fn decode_all_silently_skips_when_no_visitor() {
2566 // Regression gate: plain decode_all must still silently skip
2567 // skippable frames (RFC 8878 mandated behavior) with no
2568 // behavioral change after the visitor refactor.
2569 let payload: Vec<u8> = (0..512u16).map(|i| i as u8).collect();
2570 let mut comp = Vec::new();
2571 let mut c = FrameCompressor::new(CompressionLevel::Default);
2572 c.set_source(payload.as_slice());
2573 c.set_drain(&mut comp);
2574 c.compress();
2575
2576 let skip = build_skippable_frame(7, b"ignored sidecar");
2577 let mut stream = Vec::new();
2578 stream.extend_from_slice(&skip);
2579 stream.extend_from_slice(&comp);
2580
2581 let mut decoder = FrameDecoder::new();
2582 let mut out = alloc::vec![0u8; payload.len()];
2583 let n = decoder
2584 .decode_all(stream.as_slice(), &mut out)
2585 .expect("decode_all should succeed on skippable + zstd stream");
2586 assert_eq!(n, payload.len());
2587 assert_eq!(&out[..n], payload.as_slice());
2588 }
2589
2590 #[cfg(feature = "lsm")]
2591 #[test]
2592 fn frame_emit_info_describes_emitted_block_layout() {
2593 // Encode a payload large enough to force >1 block, fetch
2594 // FrameEmitInfo, walk blocks[] and verify each block's
2595 // (offset_in_frame, header_size, body_size) matches the bytes
2596 // actually emitted into the drain buffer.
2597 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2598 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2599 compressor.set_source(payload.as_slice());
2600 let mut compressed = Vec::new();
2601 compressor.set_drain(&mut compressed);
2602 compressor.compress();
2603
2604 let info = compressor
2605 .last_frame_emit_info()
2606 .expect("last_frame_emit_info populated after compress")
2607 .clone();
2608 drop(compressor);
2609
2610 // Frame header range starts at 0 and is non-empty.
2611 assert_eq!(info.frame_header_range.start, 0);
2612 assert!(info.frame_header_range.end > 0);
2613 // Total size matches what was written to the drain.
2614 assert_eq!(info.total_size as usize, compressed.len());
2615 // At least one block, and the last entry has last_block=true.
2616 assert!(!info.blocks.is_empty());
2617 assert!(info.blocks.last().unwrap().last_block);
2618 // All non-final blocks have last_block=false.
2619 for b in &info.blocks[..info.blocks.len() - 1] {
2620 assert!(!b.last_block);
2621 }
2622 // Walk and verify each block's header bytes match the
2623 // recorded type / size by re-decoding the 3-byte header.
2624 // Walking arithmetic: offset_in_frame + header_size + body_size
2625 // must land exactly on the next block's offset_in_frame (or,
2626 // for the last block, on the checksum / end of frame).
2627 for (i, b) in info.blocks.iter().enumerate() {
2628 let off = b.offset_in_frame as usize;
2629 assert_eq!(b.header_size, 3);
2630 let mut hdr = [0u8; 4];
2631 hdr[..3].copy_from_slice(&compressed[off..off + 3]);
2632 let raw = u32::from_le_bytes(hdr);
2633 let last = (raw & 1) != 0;
2634 let ty = (raw >> 1) & 0b11;
2635 let sz = raw >> 3;
2636 assert_eq!(last, b.last_block);
2637 assert_eq!(sz, b.block_size_field);
2638 // body_size is the PHYSICAL length on the wire: spec's
2639 // Block_Size for Raw/Compressed, always 1 for RLE.
2640 let expected_physical = match b.block_type {
2641 crate::encoding::frame_emit_info::BlockType::RLE => 1,
2642 _ => sz,
2643 };
2644 assert_eq!(b.body_size, expected_physical);
2645 let expected_ty = match b.block_type {
2646 crate::encoding::frame_emit_info::BlockType::Raw => 0,
2647 crate::encoding::frame_emit_info::BlockType::RLE => 1,
2648 crate::encoding::frame_emit_info::BlockType::Compressed => 2,
2649 crate::encoding::frame_emit_info::BlockType::Reserved => 3,
2650 };
2651 assert_eq!(ty, expected_ty);
2652 // Walking-arithmetic invariant.
2653 let next_off = b.offset_in_frame + b.header_size as u32 + b.body_size;
2654 if let Some(next) = info.blocks.get(i + 1) {
2655 assert_eq!(
2656 next_off, next.offset_in_frame,
2657 "block {i} body_size doesn't reach next block's offset_in_frame",
2658 );
2659 } else if let Some(cs) = info.checksum_range.as_ref() {
2660 assert_eq!(
2661 next_off, cs.start,
2662 "last block body_size doesn't reach checksum_range.start",
2663 );
2664 } else {
2665 assert_eq!(
2666 next_off, info.total_size,
2667 "last block body_size doesn't reach total_size",
2668 );
2669 }
2670 }
2671 // Checksum range present iff `feature = "hash"` is enabled.
2672 assert_eq!(info.checksum_range.is_some(), cfg!(feature = "hash"));
2673 }
2674
2675 #[cfg(all(feature = "lsm", feature = "hash"))]
2676 #[test]
2677 fn per_block_checksum_round_trip() {
2678 // Encode with per-block checksums enabled. Decode with
2679 // per-block verification. Both sides emit exactly 1
2680 // checksum per physical block written to / read from the
2681 // wire (encoder hashes per emission site, including each
2682 // post-split partition; decoder hashes each decoded block).
2683 // Cardinality and element-wise contents must match
2684 // round-trip.
2685 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2686 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2687 compressor.set_source(payload.as_slice());
2688 compressor.enable_per_block_checksums();
2689 let mut compressed = Vec::new();
2690 compressor.set_drain(&mut compressed);
2691 compressor.compress();
2692
2693 let encoder_checksums = compressor
2694 .last_frame_block_checksums()
2695 .expect("checksums populated after enable + compress")
2696 .to_vec();
2697 drop(compressor);
2698 assert!(!encoder_checksums.is_empty());
2699
2700 // Decode side: enable verification, decode, compare.
2701 let mut decoder = FrameDecoder::new();
2702 decoder.enable_per_block_checksums();
2703 let mut output = alloc::vec![0u8; payload.len()];
2704 let n = decoder
2705 .decode_all(compressed.as_slice(), &mut output)
2706 .expect("decode_all should succeed");
2707 assert_eq!(n, payload.len());
2708 assert_eq!(&output[..n], payload.as_slice());
2709
2710 let decoder_checksums = decoder.computed_block_checksums();
2711 assert_eq!(decoder_checksums, encoder_checksums.as_slice());
2712 }
2713}