structured_zstd/decoding/frame_decoder.rs
1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::block_decoder::BlockDecoder;
10use crate::decoding::buffer_backend::BufferBackend;
11use crate::decoding::decode_buffer::DecodeBuffer;
12use crate::decoding::dictionary::{Dictionary, DictionaryHandle};
13use crate::decoding::errors::{DecodeBlockContentError, FrameDecoderError};
14use crate::decoding::flat_buf::FlatBuf;
15use crate::decoding::ringbuffer::RingBuffer;
16use crate::decoding::scratch::DecoderScratch;
17use crate::io::{Error, Read, Write};
18use alloc::collections::BTreeMap;
19use alloc::vec::Vec;
20use core::convert::TryInto;
21
22use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
23
24/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
25///
26/// This decoder is able to decode frames only partially and gives control
27/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
28/// It reads bytes as needed from a provided source and can be read from to collect partial results.
29///
30/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
31/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
32///
33/// Workflow is as follows:
34/// ```
35/// use structured_zstd::decoding::BlockDecodingStrategy;
36///
37/// # #[cfg(feature = "std")]
38/// use std::io::{Read, Write};
39///
40/// // no_std environments can use the crate's own Read traits
41/// # #[cfg(not(feature = "std"))]
42/// use structured_zstd::io::{Read, Write};
43///
44/// fn decode_this(mut file: impl Read) {
45/// //Create a new decoder
46/// let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
47/// let mut result = Vec::new();
48///
49/// // Use reset or init to make the decoder ready to decode the frame from the io::Read
50/// frame_dec.reset(&mut file).unwrap();
51///
52/// // Loop until the frame has been decoded completely
53/// while !frame_dec.is_finished() {
54/// // decode (roughly) batch_size many bytes
55/// frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
56///
57/// // read from the decoder to collect bytes from the internal buffer
58/// let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
59///
60/// // then do something with it
61/// do_something(&result[0..bytes_read]);
62/// }
63///
64/// // handle the last chunk of data
65/// while frame_dec.can_collect() > 0 {
66/// let x = frame_dec.read(result.as_mut_slice()).unwrap();
67///
68/// do_something(&result[0..x]);
69/// }
70/// }
71///
72/// fn do_something(data: &[u8]) {
73/// # #[cfg(feature = "std")]
74/// std::io::stdout().write_all(data).unwrap();
75/// }
76/// ```
77pub struct FrameDecoder {
78 state: Option<FrameDecoderState>,
79 owned_dicts: BTreeMap<u32, Dictionary>,
80 #[cfg(target_has_atomic = "ptr")]
81 shared_dicts: BTreeMap<u32, DictionaryHandle>,
82 #[cfg(not(target_has_atomic = "ptr"))]
83 shared_dicts: (),
84 /// `ZSTD_f_zstd1_magicless` — when true, [`init`] / [`reset`]
85 /// expect frames without the 4-byte magic number prefix.
86 /// Default false (standard zstd format).
87 magicless: bool,
88 /// Pinned `Dictionary_ID` expectation set via
89 /// [`Self::expect_dict_id`]. `None` (default) disables the
90 /// check; `Some(0)` matches frames whose header omits the
91 /// optional dict_id (treated as "no dictionary"). Validated in
92 /// [`Self::reset`] AFTER the frame header parses successfully
93 /// and BEFORE any block decode work.
94 #[cfg(feature = "lsm")]
95 expect_dict_id: Option<u32>,
96 /// Pinned `Window_Descriptor` byte expectation set via
97 /// [`Self::expect_window_descriptor`]. `None` (default)
98 /// disables the check. Validated in [`Self::reset`] AFTER the
99 /// frame header parses successfully and BEFORE any block
100 /// decode work. Single-segment frames (which omit the
101 /// `Window_Descriptor` byte from the wire) surface as
102 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`]
103 /// with `found: None`.
104 #[cfg(feature = "lsm")]
105 expect_window_descriptor: Option<u8>,
106 /// When `true`, the per-block decode loop XXH64-hashes each
107 /// block's decompressed bytes and stores the low-32-bit digest in
108 /// [`Self::computed_block_checksums`]. Default `false` (zero
109 /// cost). Set via [`Self::enable_per_block_checksums`]. Gated on
110 /// `all(lsm, hash)` because XXH64 lives behind the `hash`
111 /// feature.
112 #[cfg(all(feature = "lsm", feature = "hash"))]
113 per_block_checksums_enabled: bool,
114 /// Per-block XXH64 (low 32 bits) digests captured during the
115 /// current frame's decode when `per_block_checksums_enabled` is
116 /// set. Reset at the start of every new frame. Gated on
117 /// `all(lsm, hash)` (see `per_block_checksums_enabled`).
118 #[cfg(all(feature = "lsm", feature = "hash"))]
119 computed_block_checksums: alloc::vec::Vec<u32>,
120}
121
122/// Backend-tagged decode scratch — chosen at frame-reset time based
123/// on the parsed `FrameHeader.descriptor.single_segment_flag()` and
124/// kept stable through the lifetime of the frame. The match in each
125/// helper below dispatches **once per call** (e.g. once per block in
126/// `decode_block_content`, once per drain in `drain_to_writer`) —
127/// never inside the hot push/repeat loop, which is fully
128/// monomorphised through the `DecoderScratch<B>` generic.
129enum DecoderScratchKind {
130 Ring(DecoderScratch<RingBuffer>),
131 Flat(DecoderScratch<FlatBuf>),
132}
133
134impl DecoderScratchKind {
135 fn new_ring(window_size: usize) -> Self {
136 let mut s = DecoderScratch::<RingBuffer>::new(window_size);
137 s.buffer.reserve(window_size);
138 Self::Ring(s)
139 }
140
141 /// Construct a flat-backed scratch sized for a single-segment
142 /// frame. `frame_content_size` is the upcoming output size in
143 /// bytes (== `window_size` when the flag is set).
144 fn new_flat(frame_content_size: usize) -> Self {
145 let flat = FlatBuf::with_capacity(frame_content_size);
146 // DecoderScratch's default ctor would discard the pre-sized
147 // FlatBuf — go through from_backend so the buffer carries the
148 // capacity the constructor wants.
149 let mut s = DecoderScratch::<FlatBuf>::new(frame_content_size);
150 s.buffer = DecodeBuffer::from_backend(flat, frame_content_size);
151 Self::Flat(s)
152 }
153
154 /// Reset (or transition between) backends for a new frame.
155 /// Reuses the existing `DecoderScratch` allocations (FSE / HUF
156 /// tables, sequence vec, etc.) when the backend kind is unchanged
157 /// — only the underlying buffer is re-sized for the new frame.
158 /// Building a fresh `DecoderScratch` on every frame would
159 /// re-allocate everything and was measured at +255 % vs ring on
160 /// small frames; reusing it keeps the small-frame cost flat.
161 fn reset(&mut self, frame: &frame::FrameHeader, window_size: usize) {
162 if frame.descriptor.single_segment_flag() {
163 match self {
164 Self::Flat(s) => {
165 s.reset(window_size);
166 // DecodeBuffer::reset clears + reserves
167 // window_size; FlatBuf's reserve grows the
168 // backing Vec if the new FCS is larger than
169 // what's already allocated. No alloc when the
170 // previous flat frame had >= this capacity.
171 }
172 Self::Ring(_) => *self = Self::new_flat(window_size),
173 }
174 } else {
175 match self {
176 Self::Ring(s) => s.reset(window_size),
177 Self::Flat(_) => *self = Self::new_ring(window_size),
178 }
179 }
180 }
181
182 fn init_from_dict(&mut self, dict: &Dictionary) {
183 match self {
184 Self::Ring(s) => s.init_from_dict(dict),
185 Self::Flat(s) => s.init_from_dict(dict),
186 }
187 }
188
189 #[inline]
190 fn buffer_len(&self) -> usize {
191 match self {
192 Self::Ring(s) => s.buffer.len(),
193 Self::Flat(s) => s.buffer.len(),
194 }
195 }
196
197 /// Last `n` bytes of the visible buffer as `(s1, s2)` (wrap-aware).
198 /// Routes through whichever backend the current scratch holds.
199 #[cfg(all(feature = "lsm", feature = "hash"))]
200 fn last_n_as_slices(&self, n: usize) -> (&[u8], &[u8]) {
201 match self {
202 Self::Ring(s) => s.buffer.last_n_as_slices(n),
203 Self::Flat(s) => s.buffer.last_n_as_slices(n),
204 }
205 }
206
207 fn buffer_drain(&mut self) -> Vec<u8> {
208 match self {
209 Self::Ring(s) => s.buffer.drain(),
210 Self::Flat(s) => s.buffer.drain(),
211 }
212 }
213
214 fn buffer_drain_to_window_size(&mut self) -> Option<Vec<u8>> {
215 match self {
216 Self::Ring(s) => s.buffer.drain_to_window_size(),
217 Self::Flat(s) => s.buffer.drain_to_window_size(),
218 }
219 }
220
221 fn buffer_drain_to_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
222 match self {
223 Self::Ring(s) => s.buffer.drain_to_writer(sink),
224 Self::Flat(s) => s.buffer.drain_to_writer(sink),
225 }
226 }
227
228 fn buffer_drain_to_window_size_writer(&mut self, sink: impl Write) -> Result<usize, Error> {
229 match self {
230 Self::Ring(s) => s.buffer.drain_to_window_size_writer(sink),
231 Self::Flat(s) => s.buffer.drain_to_window_size_writer(sink),
232 }
233 }
234
235 fn buffer_can_drain(&self) -> usize {
236 match self {
237 Self::Ring(s) => s.buffer.can_drain(),
238 Self::Flat(s) => s.buffer.can_drain(),
239 }
240 }
241
242 fn buffer_can_drain_to_window_size(&self) -> Option<usize> {
243 match self {
244 Self::Ring(s) => s.buffer.can_drain_to_window_size(),
245 Self::Flat(s) => s.buffer.can_drain_to_window_size(),
246 }
247 }
248
249 fn buffer_read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
250 match self {
251 Self::Ring(s) => s.buffer.read(target),
252 Self::Flat(s) => s.buffer.read(target),
253 }
254 }
255
256 fn buffer_read_all(&mut self, target: &mut [u8]) -> Result<usize, Error> {
257 match self {
258 Self::Ring(s) => s.buffer.read_all(target),
259 Self::Flat(s) => s.buffer.read_all(target),
260 }
261 }
262
263 fn decode_block_content<R: Read>(
264 &mut self,
265 decoder: &mut BlockDecoder,
266 header: &crate::blocks::block::BlockHeader,
267 source: R,
268 ) -> Result<u64, DecodeBlockContentError> {
269 match self {
270 Self::Ring(s) => decoder.decode_block_content(header, s, source),
271 Self::Flat(s) => decoder.decode_block_content(header, s, source),
272 }
273 }
274
275 #[cfg(feature = "hash")]
276 fn hash_finish(&self) -> u64 {
277 use core::hash::Hasher;
278 match self {
279 Self::Ring(s) => s.buffer.hash.finish(),
280 Self::Flat(s) => s.buffer.hash.finish(),
281 }
282 }
283}
284
285struct FrameDecoderState {
286 pub frame_header: frame::FrameHeader,
287 decoder_scratch: DecoderScratchKind,
288 frame_finished: bool,
289 block_counter: usize,
290 bytes_read_counter: u64,
291 check_sum: Option<u32>,
292 using_dict: Option<u32>,
293}
294
295pub enum BlockDecodingStrategy {
296 All,
297 UptoBlocks(usize),
298 UptoBytes(usize),
299}
300
301impl FrameDecoderState {
302 /// Construct a new frame decoder state, reading the frame header
303 /// from `source`. When `magicless` is `true`, the 4-byte magic
304 /// number prefix is NOT consumed (donor `ZSTD_f_zstd1_magicless`).
305 /// Crate-internal — reached only via `FrameDecoder::init` /
306 /// `FrameDecoder::init_with_dict_handle`. Pre-allocates the
307 /// decode buffer to `window_size` so the first block does not
308 /// trigger incremental growth from zero capacity.
309 pub(crate) fn new_with_format(
310 source: impl Read,
311 magicless: bool,
312 ) -> Result<FrameDecoderState, FrameDecoderError> {
313 let (frame, header_size) = frame::read_frame_header_with_format(source, magicless)?;
314 let window_size = frame.window_size()?;
315
316 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
317 return Err(FrameDecoderError::WindowSizeTooBig {
318 requested: window_size,
319 });
320 }
321
322 let decoder_scratch = if frame.descriptor.single_segment_flag() {
323 DecoderScratchKind::new_flat(window_size as usize)
324 } else {
325 DecoderScratchKind::new_ring(window_size as usize)
326 };
327 Ok(FrameDecoderState {
328 frame_header: frame,
329 frame_finished: false,
330 block_counter: 0,
331 decoder_scratch,
332 bytes_read_counter: u64::from(header_size),
333 check_sum: None,
334 using_dict: None,
335 })
336 }
337
338 /// Reset this state for a new frame read from `source`, reusing
339 /// existing allocations. When `magicless` is `true`, the frame
340 /// header is read WITHOUT expecting a magic-number prefix
341 /// (donor `ZSTD_f_zstd1_magicless`). Crate-internal — reached
342 /// only via `FrameDecoder::reset`.
343 ///
344 /// `DecodeBuffer::reset` reserves `window_size` internally, so
345 /// no additional frame-level reservation is needed here.
346 /// Further buffer growth during decoding is performed on demand
347 /// by the active block path.
348 pub(crate) fn reset_with_format(
349 &mut self,
350 source: impl Read,
351 magicless: bool,
352 ) -> Result<(), FrameDecoderError> {
353 let (frame_header, header_size) = frame::read_frame_header_with_format(source, magicless)?;
354 let window_size = frame_header.window_size()?;
355
356 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
357 return Err(FrameDecoderError::WindowSizeTooBig {
358 requested: window_size,
359 });
360 }
361
362 self.decoder_scratch
363 .reset(&frame_header, window_size as usize);
364 self.frame_header = frame_header;
365 self.frame_finished = false;
366 self.block_counter = 0;
367 self.bytes_read_counter = u64::from(header_size);
368 self.check_sum = None;
369 self.using_dict = None;
370 Ok(())
371 }
372}
373
374impl Default for FrameDecoder {
375 fn default() -> Self {
376 Self::new()
377 }
378}
379
380impl FrameDecoder {
381 /// This will create a new decoder without allocating anything yet.
382 /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
383 /// else they just reset these buffers with not further allocations
384 pub fn new() -> FrameDecoder {
385 FrameDecoder {
386 state: None,
387 owned_dicts: BTreeMap::new(),
388 #[cfg(target_has_atomic = "ptr")]
389 shared_dicts: BTreeMap::new(),
390 #[cfg(not(target_has_atomic = "ptr"))]
391 shared_dicts: (),
392 magicless: false,
393 #[cfg(feature = "lsm")]
394 expect_dict_id: None,
395 #[cfg(feature = "lsm")]
396 expect_window_descriptor: None,
397 #[cfg(all(feature = "lsm", feature = "hash"))]
398 per_block_checksums_enabled: false,
399 #[cfg(all(feature = "lsm", feature = "hash"))]
400 computed_block_checksums: alloc::vec::Vec::new(),
401 }
402 }
403
404 /// Opt in to per-block XXH64 verification during decode.
405 /// Default off; zero cost when disabled. Each block's decompressed
406 /// bytes are XXH64-hashed (low 32 bits) and appended to
407 /// [`Self::computed_block_checksums`] as the decode progresses.
408 /// Callers compare the captured digests against externally-stored
409 /// expected values (e.g. from a per-block sidecar in the
410 /// containing application protocol).
411 ///
412 /// Behind `all(feature = "lsm", feature = "hash")` — the XXH64
413 /// primitive lives behind the `hash` feature, so this method
414 /// only compiles when both are enabled.
415 #[cfg(all(feature = "lsm", feature = "hash"))]
416 pub fn enable_per_block_checksums(&mut self) {
417 self.per_block_checksums_enabled = true;
418 }
419
420 /// Per-block XXH64 (low 32 bits) digests captured during the
421 /// current frame's decode. Empty unless
422 /// [`Self::enable_per_block_checksums`] was called before
423 /// [`Self::decode_all`] / [`Self::reset`].
424 ///
425 /// Reset at the start of every new frame.
426 ///
427 /// Behind `all(feature = "lsm", feature = "hash")`.
428 #[cfg(all(feature = "lsm", feature = "hash"))]
429 pub fn computed_block_checksums(&self) -> &[u32] {
430 &self.computed_block_checksums
431 }
432
433 /// Pin the expected `Dictionary_ID` for the next frame.
434 ///
435 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
436 /// validate it against the parsed frame header BEFORE any
437 /// block decode work runs. A mismatch returns
438 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedDictId`]
439 /// before any block decode and before any output is produced.
440 /// Scratch buffer allocation / reservation for the decode
441 /// pipeline happens during frame-header parsing, which is
442 /// already complete when this validation fires — the cost of
443 /// scratch sizing is paid even on a mismatched header. The
444 /// guarantee is "no block decode, no XXH64 init, no partial
445 /// output", not "zero allocation".
446 ///
447 /// `Some(0)` is treated as "no dictionary expected": a frame
448 /// whose header omits the optional `Dictionary_ID` field
449 /// (flag value 0) passes the check; a frame that carries an
450 /// explicit non-zero id fails.
451 ///
452 /// `None` (default) disables the check.
453 ///
454 /// Primary use case: post-AEAD-decrypt sanity check in
455 /// wire-format consumers (e.g. lsm-tree's encrypted block
456 /// format pins the `dict_id` baked into the AAD against the
457 /// inner zstd frame's `dict_id` to defeat dict-substitution
458 /// attacks).
459 ///
460 /// NOT a replacement for AEAD authentication. NOT the same
461 /// semantic as donor `ZSTD_d_windowLogMax` (which is a
462 /// ceiling-style limit, separate concern).
463 #[cfg(feature = "lsm")]
464 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
465 pub fn expect_dict_id(&mut self, expected: Option<u32>) {
466 self.expect_dict_id = expected;
467 }
468
469 /// Pin the expected raw `Window_Descriptor` byte (RFC 8878
470 /// §3.1.1.1.2 layout: `(exp << 3) | mantissa`) for the next
471 /// frame.
472 ///
473 /// When `expected` is set, [`Self::init`] / [`Self::reset`]
474 /// validate it against the parsed frame header BEFORE any
475 /// block decode work runs. A mismatch returns
476 /// [`crate::decoding::errors::FrameDecoderError::UnexpectedWindowDescriptor`].
477 ///
478 /// Single-segment frames omit the `Window_Descriptor` byte
479 /// from the wire entirely. Setting an expectation while
480 /// receiving a single-segment frame fails the check with
481 /// `found: None` — there is no on-wire byte to match against,
482 /// which is reported explicitly rather than silently passing.
483 ///
484 /// `None` (default) disables the check.
485 ///
486 /// Byte-exact equality, NOT a ceiling. Donor
487 /// `ZSTD_d_windowLogMax` is a separate ceiling-style limit
488 /// available through the C FFI surface; this method is for
489 /// strict equality validation against a pinned expectation
490 /// (e.g. lsm-tree's wire format pins the window descriptor
491 /// from the AAD to defeat decompression-bomb-swap attacks).
492 #[cfg(feature = "lsm")]
493 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
494 pub fn expect_window_descriptor(&mut self, expected: Option<u8>) {
495 self.expect_window_descriptor = expected;
496 }
497
498 /// Validate the just-parsed frame header against any pinned
499 /// expectations set via [`Self::expect_dict_id`] /
500 /// [`Self::expect_window_descriptor`].
501 ///
502 /// Returns the typed error variant on mismatch and leaves
503 /// `self.state` in a re-resettable shape — a subsequent
504 /// `reset()` will overwrite `frame_header` from the new source
505 /// without needing intermediate cleanup.
506 #[cfg(feature = "lsm")]
507 fn validate_expectations(
508 &self,
509 frame_header: &frame::FrameHeader,
510 ) -> Result<(), FrameDecoderError> {
511 if let Some(expected) = self.expect_dict_id {
512 let found = frame_header.dictionary_id();
513 // `Some(0)` is the "no dictionary expected" sentinel —
514 // matches a frame whose header omits the optional
515 // dict_id field (which is reported as `None` by the
516 // parser). All other values must match exactly.
517 let matches = match (expected, found) {
518 (0, None) => true,
519 (e, Some(f)) => e == f,
520 _ => false,
521 };
522 if !matches {
523 return Err(FrameDecoderError::UnexpectedDictId {
524 expected: Some(expected),
525 found,
526 });
527 }
528 }
529 if let Some(expected) = self.expect_window_descriptor {
530 let found = frame_header.window_descriptor();
531 if found != Some(expected) {
532 return Err(FrameDecoderError::UnexpectedWindowDescriptor { expected, found });
533 }
534 }
535 Ok(())
536 }
537
538 /// Enable or disable magicless frame format
539 /// (`ZSTD_f_zstd1_magicless`). When set to `true`, subsequent
540 /// [`init`] / [`reset`] calls expect the frame header to begin
541 /// directly with the frame-header descriptor — no 4-byte magic
542 /// number prefix. Default false. Must match the encoder's
543 /// magicless setting; the format is unambiguous only when the
544 /// caller knows it out-of-band.
545 ///
546 /// Note: magicless mode also disables skippable-frame detection.
547 /// The `0x184D2A50..=0x184D2A5F` skippable-frame magic range is
548 /// only recognised when the 4-byte magic prefix is consumed, so
549 /// `decode_all` / `init` / `reset` will treat a skippable frame
550 /// at the head of a magicless stream as a malformed frame header
551 /// (bad descriptor / window-size error) instead of skipping it.
552 /// Mixed-format streams that interleave skippable frames must be
553 /// pre-split by the caller; `set_magicless(true)` is only safe
554 /// when the entire stream is known to be magicless zstd frames.
555 pub fn set_magicless(&mut self, magicless: bool) {
556 self.magicless = magicless;
557 }
558
559 #[cfg(target_has_atomic = "ptr")]
560 fn shared_dict_exists(&self, dict_id: u32) -> bool {
561 self.shared_dicts.contains_key(&dict_id)
562 }
563
564 #[cfg(not(target_has_atomic = "ptr"))]
565 fn shared_dict_exists(&self, _dict_id: u32) -> bool {
566 false
567 }
568
569 fn validate_registered_dictionary(dict: &Dictionary) -> Result<(), FrameDecoderError> {
570 use crate::decoding::errors::DictionaryDecodeError as dict_err;
571
572 if dict.id == 0 {
573 return Err(FrameDecoderError::from(dict_err::ZeroDictionaryId));
574 }
575 if let Some(index) = dict.offset_hist.iter().position(|&rep| rep == 0) {
576 return Err(FrameDecoderError::from(
577 dict_err::ZeroRepeatOffsetInDictionary { index: index as u8 },
578 ));
579 }
580 Ok(())
581 }
582
583 /// init() will allocate all needed buffers if it is the first time this decoder is used
584 /// else they just reset these buffers with not further allocations
585 ///
586 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
587 ///
588 /// equivalent to reset()
589 pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
590 self.reset(source)
591 }
592
593 /// Initialize the decoder for a new frame using a pre-parsed dictionary handle.
594 ///
595 /// If the frame header has a dictionary ID, this validates it against
596 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
597 ///
598 /// If the header omits the optional dictionary ID, this still applies the
599 /// provided dictionary handle.
600 ///
601 /// # Warning
602 ///
603 /// This method always applies `dict` unless the frame header contains a
604 /// non-matching dictionary ID. Callers must only use this API when they
605 /// already know the frame was encoded with the provided dictionary, even if
606 /// the frame header omits the dictionary ID or encodes an explicit
607 /// dictionary ID of `0`.
608 ///
609 /// Passing a dictionary for a frame that was not encoded with it can
610 /// silently corrupt the decoded output.
611 pub fn init_with_dict_handle(
612 &mut self,
613 source: impl Read,
614 dict: &DictionaryHandle,
615 ) -> Result<(), FrameDecoderError> {
616 self.reset_with_dict_handle(source, dict)
617 }
618
619 /// reset() will allocate all needed buffers if it is the first time this decoder is used
620 /// else they just reset these buffers with not further allocations
621 ///
622 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
623 ///
624 /// equivalent to init()
625 pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
626 use FrameDecoderError as err;
627 // Fresh frame → start with an empty per-block checksum vec so
628 // the values for the next frame don't carry over from the
629 // previous one.
630 #[cfg(all(feature = "lsm", feature = "hash"))]
631 self.computed_block_checksums.clear();
632 let magicless = self.magicless;
633 let dict_id = match &mut self.state {
634 Some(s) => {
635 s.reset_with_format(source, magicless)?;
636 s.frame_header.dictionary_id()
637 }
638 None => {
639 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
640 self.state
641 .as_ref()
642 .and_then(|state| state.frame_header.dictionary_id())
643 }
644 };
645 // Validate any pinned expectations BEFORE block decode work
646 // runs. Catches dict_id substitution / window-descriptor
647 // tampering on inputs already authenticated by an outer
648 // layer (e.g. AEAD). Returning here leaves `self.state` in
649 // a re-resettable shape — next `reset()` re-parses the
650 // frame header without intermediate cleanup.
651 #[cfg(feature = "lsm")]
652 if let Some(state) = self.state.as_ref() {
653 self.validate_expectations(&state.frame_header)?;
654 }
655 if let Some(dict_id) = dict_id {
656 let state = self.state.as_mut().expect("state initialized");
657 let owned_dicts = &self.owned_dicts;
658 #[cfg(target_has_atomic = "ptr")]
659 let shared_dicts = &self.shared_dicts;
660 let dict = owned_dicts
661 .get(&dict_id)
662 .or_else(|| {
663 #[cfg(target_has_atomic = "ptr")]
664 {
665 shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
666 }
667 #[cfg(not(target_has_atomic = "ptr"))]
668 {
669 None
670 }
671 })
672 .ok_or(err::DictNotProvided { dict_id })?;
673 state.decoder_scratch.init_from_dict(dict);
674 state.using_dict = Some(dict_id);
675 }
676 Ok(())
677 }
678
679 /// Reset this decoder for a new frame using a pre-parsed dictionary handle.
680 ///
681 /// If the frame header has a dictionary ID, this validates it against
682 /// `dict.id()` and returns [`FrameDecoderError::DictIdMismatch`] on mismatch.
683 ///
684 /// If the header omits the optional dictionary ID, this still applies the
685 /// provided dictionary handle.
686 ///
687 /// # Warning
688 ///
689 /// This method always applies `dict` unless the frame header contains a
690 /// non-matching dictionary ID. Callers must only use this API when they
691 /// already know the frame was encoded with the provided dictionary, even if
692 /// the frame header omits the dictionary ID or encodes an explicit
693 /// dictionary ID of `0`.
694 ///
695 /// Passing a dictionary for a frame that was not encoded with it can
696 /// silently corrupt the decoded output.
697 pub fn reset_with_dict_handle(
698 &mut self,
699 source: impl Read,
700 dict: &DictionaryHandle,
701 ) -> Result<(), FrameDecoderError> {
702 use FrameDecoderError as err;
703 // Fresh frame → drop the previous frame's per-block checksum
704 // digests so the next decode starts with an empty vec.
705 // Mirrors the same clear in `reset()`; reset_with_dict_handle
706 // is a parallel entry point so it needs its own call.
707 #[cfg(all(feature = "lsm", feature = "hash"))]
708 self.computed_block_checksums.clear();
709 Self::validate_registered_dictionary(dict.as_dict())?;
710 let magicless = self.magicless;
711 // Scope the &mut borrow of `self.state` to the header parse
712 // alone, so the subsequent `validate_expectations(&self, ...)`
713 // call below can take a fresh shared borrow of self without
714 // tripping the borrow checker.
715 match &mut self.state {
716 Some(s) => s.reset_with_format(source, magicless)?,
717 None => {
718 self.state = Some(FrameDecoderState::new_with_format(source, magicless)?);
719 }
720 }
721 // Single source of truth: route through the same
722 // `validate_expectations` used by `reset()`. Routing through
723 // the helper keeps the two code paths from drifting (e.g.,
724 // if expect-semantics or error wiring changes later).
725 #[cfg(feature = "lsm")]
726 {
727 let header = &self
728 .state
729 .as_ref()
730 .expect("state populated by reset_with_format/new_with_format")
731 .frame_header;
732 self.validate_expectations(header)?;
733 }
734 let state = self
735 .state
736 .as_mut()
737 .expect("state populated by reset_with_format/new_with_format");
738 if let Some(dict_id) = state.frame_header.dictionary_id()
739 && dict_id != dict.id()
740 {
741 return Err(err::DictIdMismatch {
742 expected: dict_id,
743 provided: dict.id(),
744 });
745 }
746 state.decoder_scratch.init_from_dict(dict.as_dict());
747 state.using_dict = Some(dict.id());
748 Ok(())
749 }
750
751 /// Add a dictionary that can be selected dynamically by frame dictionary ID.
752 ///
753 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
754 /// registered (either as owned or shared).
755 pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
756 Self::validate_registered_dictionary(&dict)?;
757 let dict_id = dict.id;
758 if self.owned_dicts.contains_key(&dict_id) || self.shared_dict_exists(dict_id) {
759 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
760 }
761 self.owned_dicts.insert(dict_id, dict);
762 Ok(())
763 }
764
765 /// Parse and add a serialized dictionary blob.
766 pub fn add_dict_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), FrameDecoderError> {
767 let dict = Dictionary::decode_dict(raw_dictionary)?;
768 self.add_dict(dict)
769 }
770
771 /// Add a pre-parsed dictionary handle for reuse across decoders.
772 ///
773 /// This API is available on targets with pointer-width atomics
774 /// (`target_has_atomic = "ptr"`).
775 ///
776 /// Returns [`FrameDecoderError::DictAlreadyRegistered`] if the ID is already
777 /// registered (either as owned or shared).
778 #[cfg(target_has_atomic = "ptr")]
779 pub fn add_dict_handle(&mut self, dict: DictionaryHandle) -> Result<(), FrameDecoderError> {
780 Self::validate_registered_dictionary(dict.as_dict())?;
781 let dict_id = dict.id();
782 if self.owned_dicts.contains_key(&dict_id) || self.shared_dicts.contains_key(&dict_id) {
783 return Err(FrameDecoderError::DictAlreadyRegistered { dict_id });
784 }
785 self.shared_dicts.insert(dict_id, dict);
786 Ok(())
787 }
788
789 pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
790 use FrameDecoderError as err;
791 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
792 let owned_dicts = &self.owned_dicts;
793 #[cfg(target_has_atomic = "ptr")]
794 let shared_dicts = &self.shared_dicts;
795
796 let dict = owned_dicts
797 .get(&dict_id)
798 .or_else(|| {
799 #[cfg(target_has_atomic = "ptr")]
800 {
801 shared_dicts.get(&dict_id).map(DictionaryHandle::as_dict)
802 }
803 #[cfg(not(target_has_atomic = "ptr"))]
804 {
805 None
806 }
807 })
808 .ok_or(err::DictNotProvided { dict_id })?;
809 state.decoder_scratch.init_from_dict(dict);
810 state.using_dict = Some(dict_id);
811
812 Ok(())
813 }
814
815 /// Returns how many bytes the frame contains after decompression
816 pub fn content_size(&self) -> u64 {
817 match &self.state {
818 None => 0,
819 Some(s) => s.frame_header.frame_content_size(),
820 }
821 }
822
823 /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
824 pub fn get_checksum_from_data(&self) -> Option<u32> {
825 let state = self.state.as_ref()?;
826
827 state.check_sum
828 }
829
830 /// Returns the checksum that was calculated while decoding.
831 /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder
832 #[cfg(feature = "hash")]
833 pub fn get_calculated_checksum(&self) -> Option<u32> {
834 let state = self.state.as_ref()?;
835 let cksum_64bit = state.decoder_scratch.hash_finish();
836 //truncate to lower 32bit because reasons...
837 Some(cksum_64bit as u32)
838 }
839
840 /// Counter for how many bytes have been consumed while decoding the frame
841 pub fn bytes_read_from_source(&self) -> u64 {
842 let state = match &self.state {
843 None => return 0,
844 Some(s) => s,
845 };
846 state.bytes_read_counter
847 }
848
849 /// Whether the current frames last block has been decoded yet
850 /// If this returns true you can call the drain* functions to get all content
851 /// (the read() function will drain automatically if this returns true)
852 pub fn is_finished(&self) -> bool {
853 let state = match &self.state {
854 None => return true,
855 Some(s) => s,
856 };
857 if state.frame_header.descriptor.content_checksum_flag() {
858 state.frame_finished && state.check_sum.is_some()
859 } else {
860 state.frame_finished
861 }
862 }
863
864 /// Counter for how many blocks have already been decoded
865 pub fn blocks_decoded(&self) -> usize {
866 let state = match &self.state {
867 None => return 0,
868 Some(s) => s,
869 };
870 state.block_counter
871 }
872
873 /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
874 /// The Strategy influences how many blocks will be decoded before the function returns
875 /// This is important if you want to manage memory consumption carefully. If you don't care
876 /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
877 pub fn decode_blocks(
878 &mut self,
879 mut source: impl Read,
880 strat: BlockDecodingStrategy,
881 ) -> Result<bool, FrameDecoderError> {
882 use FrameDecoderError as err;
883 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
884
885 let mut block_dec = decoding::block_decoder::new();
886
887 let buffer_size_before = state.decoder_scratch.buffer_len();
888 let block_counter_before = state.block_counter;
889 loop {
890 vprintln!("################");
891 vprintln!("Next Block: {}", state.block_counter);
892 vprintln!("################");
893 let (block_header, block_header_size) = block_dec
894 .read_block_header(&mut source)
895 .map_err(err::FailedToReadBlockHeader)?;
896 state.bytes_read_counter += u64::from(block_header_size);
897
898 vprintln!();
899 vprintln!(
900 "Found {} block with size: {}, which will be of size: {}",
901 block_header.block_type,
902 block_header.content_size,
903 block_header.decompressed_size
904 );
905
906 #[cfg(all(feature = "lsm", feature = "hash"))]
907 let len_before_block: Option<usize> = if self.per_block_checksums_enabled {
908 Some(state.decoder_scratch.buffer_len())
909 } else {
910 None
911 };
912 let bytes_read_in_block_body = state
913 .decoder_scratch
914 .decode_block_content(&mut block_dec, &block_header, &mut source)
915 .map_err(err::FailedToReadBlockBody)?;
916 state.bytes_read_counter += bytes_read_in_block_body;
917
918 // Per-block XXH64 (low 32 bits) of the just-decompressed
919 // bytes. Hashed from `last_n_as_slices` so RingBuffer wrap
920 // is handled in-place, no extra copy.
921 #[cfg(all(feature = "lsm", feature = "hash"))]
922 if let Some(len_before_block) = len_before_block {
923 let added = state.decoder_scratch.buffer_len() - len_before_block;
924 let (s1, s2) = state.decoder_scratch.last_n_as_slices(added);
925 let mut h = twox_hash::XxHash64::with_seed(0);
926 use core::hash::Hasher;
927 h.write(s1);
928 h.write(s2);
929 self.computed_block_checksums.push(h.finish() as u32);
930 }
931
932 state.block_counter += 1;
933
934 vprintln!("Output: {}", state.decoder_scratch.buffer_len());
935
936 if block_header.last_block {
937 state.frame_finished = true;
938 if state.frame_header.descriptor.content_checksum_flag() {
939 let mut chksum = [0u8; 4];
940 source
941 .read_exact(&mut chksum)
942 .map_err(err::FailedToReadChecksum)?;
943 state.bytes_read_counter += 4;
944 let chksum = u32::from_le_bytes(chksum);
945 state.check_sum = Some(chksum);
946 }
947 break;
948 }
949
950 match strat {
951 BlockDecodingStrategy::All => { /* keep going */ }
952 BlockDecodingStrategy::UptoBlocks(n) => {
953 if state.block_counter - block_counter_before >= n {
954 break;
955 }
956 }
957 BlockDecodingStrategy::UptoBytes(n) => {
958 if state.decoder_scratch.buffer_len() - buffer_size_before >= n {
959 break;
960 }
961 }
962 }
963 }
964
965 Ok(state.frame_finished)
966 }
967
968 /// Collect bytes and retain window_size bytes while decoding is still going on.
969 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
970 pub fn collect(&mut self) -> Option<Vec<u8>> {
971 let finished = self.is_finished();
972 let state = self.state.as_mut()?;
973 if finished {
974 Some(state.decoder_scratch.buffer_drain())
975 } else {
976 state.decoder_scratch.buffer_drain_to_window_size()
977 }
978 }
979
980 /// Collect bytes and retain window_size bytes while decoding is still going on.
981 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
982 pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
983 let finished = self.is_finished();
984 let state = match &mut self.state {
985 None => return Ok(0),
986 Some(s) => s,
987 };
988 if finished {
989 state.decoder_scratch.buffer_drain_to_writer(w)
990 } else {
991 state.decoder_scratch.buffer_drain_to_window_size_writer(w)
992 }
993 }
994
995 /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
996 /// because window_size bytes need to be retained for decoding.
997 /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
998 pub fn can_collect(&self) -> usize {
999 let finished = self.is_finished();
1000 let state = match &self.state {
1001 None => return 0,
1002 Some(s) => s,
1003 };
1004 if finished {
1005 state.decoder_scratch.buffer_can_drain()
1006 } else {
1007 state
1008 .decoder_scratch
1009 .buffer_can_drain_to_window_size()
1010 .unwrap_or(0)
1011 }
1012 }
1013
1014 /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
1015 /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
1016 ///
1017 /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
1018 /// which try to serve an old-style c api
1019 ///
1020 /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
1021 /// input will not make any progress!
1022 ///
1023 /// Note that no kind of block can be bigger than 128kb.
1024 /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
1025 ///
1026 /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
1027 pub fn decode_from_to(
1028 &mut self,
1029 source: &[u8],
1030 target: &mut [u8],
1031 ) -> Result<(usize, usize), FrameDecoderError> {
1032 use FrameDecoderError as err;
1033 let bytes_read_at_start = match &self.state {
1034 Some(s) => s.bytes_read_counter,
1035 None => 0,
1036 };
1037
1038 if !self.is_finished() || self.state.is_none() {
1039 let mut mt_source = source;
1040
1041 if self.state.is_none() {
1042 self.init(&mut mt_source)?;
1043 }
1044
1045 //pseudo block to scope "state" so we can borrow self again after the block
1046 {
1047 let state = match &mut self.state {
1048 Some(s) => s,
1049 None => panic!("Bug in library"),
1050 };
1051 let mut block_dec = decoding::block_decoder::new();
1052
1053 if state.frame_header.descriptor.content_checksum_flag()
1054 && state.frame_finished
1055 && state.check_sum.is_none()
1056 {
1057 //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
1058 if mt_source.len() >= 4 {
1059 let chksum = mt_source[..4].try_into().expect("optimized away");
1060 state.bytes_read_counter += 4;
1061 let chksum = u32::from_le_bytes(chksum);
1062 state.check_sum = Some(chksum);
1063 }
1064 return Ok((4, 0));
1065 }
1066
1067 loop {
1068 //check if there are enough bytes for the next header
1069 if mt_source.len() < 3 {
1070 break;
1071 }
1072 let (block_header, block_header_size) = block_dec
1073 .read_block_header(&mut mt_source)
1074 .map_err(err::FailedToReadBlockHeader)?;
1075
1076 // check the needed size for the block before updating counters.
1077 // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
1078 if mt_source.len() < block_header.content_size as usize {
1079 break;
1080 }
1081 state.bytes_read_counter += u64::from(block_header_size);
1082
1083 let bytes_read_in_block_body = state
1084 .decoder_scratch
1085 .decode_block_content(&mut block_dec, &block_header, &mut mt_source)
1086 .map_err(err::FailedToReadBlockBody)?;
1087 state.bytes_read_counter += bytes_read_in_block_body;
1088 state.block_counter += 1;
1089
1090 if block_header.last_block {
1091 state.frame_finished = true;
1092 if state.frame_header.descriptor.content_checksum_flag() {
1093 //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
1094 if mt_source.len() >= 4 {
1095 let chksum = mt_source[..4].try_into().expect("optimized away");
1096 state.bytes_read_counter += 4;
1097 let chksum = u32::from_le_bytes(chksum);
1098 state.check_sum = Some(chksum);
1099 }
1100 }
1101 break;
1102 }
1103 }
1104 }
1105 }
1106
1107 let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
1108 let bytes_read_at_end = match &mut self.state {
1109 Some(s) => s.bytes_read_counter,
1110 None => panic!("Bug in library"),
1111 };
1112 let read_len = bytes_read_at_end - bytes_read_at_start;
1113 Ok((read_len as usize, result_len))
1114 }
1115
1116 /// Decode multiple frames into the output slice.
1117 ///
1118 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1119 /// skipped during decode.
1120 ///
1121 /// `output` must be large enough to hold the decompressed data. If you don't know
1122 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1123 ///
1124 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1125 ///
1126 /// Returns the number of bytes written to `output`.
1127 pub fn decode_all(
1128 &mut self,
1129 input: &[u8],
1130 output: &mut [u8],
1131 ) -> Result<usize, FrameDecoderError> {
1132 #[cfg(not(feature = "lsm"))]
1133 {
1134 self.decode_all_impl(input, output, |this, src| this.init(src))
1135 }
1136 #[cfg(feature = "lsm")]
1137 {
1138 self.decode_all_impl(input, output, |this, src| this.init(src), None)
1139 }
1140 }
1141
1142 /// Decode multiple frames into the output slice, invoking `visitor`
1143 /// for every skippable frame encountered before advancing past it.
1144 ///
1145 /// `input` must contain an exact number of frames. Skippable frames
1146 /// (RFC 8878 §3.1.2 magic numbers `0x184D2A50..=0x184D2A5F`) are
1147 /// allowed and will be both visited AND skipped: the visitor gets
1148 /// `(magic_variant, payload)` where `magic_variant` is the low
1149 /// nibble of the magic (`magic - 0x184D2A50`, range `0..=15`) and
1150 /// `payload` is a borrowed slice of the on-wire payload bytes (the
1151 /// skippable frame's `Frame_Size` field worth of data) into
1152 /// `input` — no allocation.
1153 ///
1154 /// The visitor sees skippable frames in stream order; interleaved
1155 /// regular zstd frames continue to decompress into `output` exactly
1156 /// as `decode_all` does.
1157 ///
1158 /// `output` must be large enough to hold the decompressed data.
1159 /// Returns the number of bytes written to `output`.
1160 ///
1161 /// # Example
1162 ///
1163 /// ```ignore
1164 /// use structured_zstd::decoding::FrameDecoder;
1165 ///
1166 /// let mut decoder = FrameDecoder::new();
1167 /// let mut output = vec![0u8; 1024];
1168 /// let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
1169 /// let n = decoder.decode_all_with_skippable_visitor(
1170 /// input,
1171 /// &mut output,
1172 /// |variant, payload| collected.push((variant, payload.to_vec())),
1173 /// )?;
1174 /// ```
1175 #[cfg(feature = "lsm")]
1176 #[cfg_attr(docsrs, doc(cfg(feature = "lsm")))]
1177 pub fn decode_all_with_skippable_visitor<F>(
1178 &mut self,
1179 input: &[u8],
1180 output: &mut [u8],
1181 mut visitor: F,
1182 ) -> Result<usize, FrameDecoderError>
1183 where
1184 F: FnMut(u8, &[u8]),
1185 {
1186 self.decode_all_impl(
1187 input,
1188 output,
1189 |this, src| this.init(src),
1190 Some(&mut visitor),
1191 )
1192 }
1193
1194 /// Decode multiple frames into the output slice using a pre-parsed dictionary handle.
1195 ///
1196 /// `input` must contain an exact number of frames. Skippable frames are allowed and will be
1197 /// skipped during decode.
1198 ///
1199 /// `output` must be large enough to hold the decompressed data. If you don't know
1200 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
1201 ///
1202 /// This calls [`FrameDecoder::init_with_dict_handle`], and all bytes currently in the
1203 /// decoder will be lost.
1204 ///
1205 /// # Warning
1206 ///
1207 /// Each decoded frame is initialized with `dict`, even when a frame header
1208 /// omits the optional dictionary ID. Callers must only use this API when
1209 /// they already know the input frames were encoded with the provided
1210 /// dictionary; otherwise decoded output can be silently corrupted.
1211 pub fn decode_all_with_dict_handle(
1212 &mut self,
1213 input: &[u8],
1214 output: &mut [u8],
1215 dict: &DictionaryHandle,
1216 ) -> Result<usize, FrameDecoderError> {
1217 #[cfg(not(feature = "lsm"))]
1218 {
1219 self.decode_all_impl(input, output, |this, src| {
1220 this.init_with_dict_handle(src, dict)
1221 })
1222 }
1223 #[cfg(feature = "lsm")]
1224 {
1225 self.decode_all_impl(
1226 input,
1227 output,
1228 |this, src| this.init_with_dict_handle(src, dict),
1229 None,
1230 )
1231 }
1232 }
1233
1234 /// Default-feature decode_all_impl: no visitor parameter so the
1235 /// no-lsm build's call surface and codegen are byte-identical to
1236 /// the pre-#172 implementation. Compiles only when `lsm` is OFF.
1237 #[cfg(not(feature = "lsm"))]
1238 fn decode_all_impl(
1239 &mut self,
1240 mut input: &[u8],
1241 mut output: &mut [u8],
1242 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1243 ) -> Result<usize, FrameDecoderError> {
1244 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1245 let mut total_bytes_written = 0;
1246 while !input.is_empty() {
1247 match init_frame(self, &mut input) {
1248 Ok(_) => {}
1249 Err(FrameDecoderError::ReadFrameHeaderError(
1250 crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
1251 )) => {
1252 input = input
1253 .get(length as usize..)
1254 .ok_or(FrameDecoderError::FailedToSkipFrame)?;
1255 continue;
1256 }
1257 Err(e) => return Err(e),
1258 };
1259 // Per-frame direct-path dispatch. Now safe to route the
1260 // public `decode_all` here because
1261 // `UserSliceBackend::exec_sequence_inline` returns
1262 // `Result<(), ExecuteSequencesError>` instead of
1263 // panicking on capacity overflow; the error propagates
1264 // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1265 // active dict, remaining `output` slice has WILDCOPY
1266 // slack) puts the frame on the fast path that bypasses
1267 // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1268 // frames (no FCS, active dict, output too small for
1269 // slack) fall through to the legacy `decode_blocks` +
1270 // `read` drain loop below.
1271 let state_ref = self.state.as_ref().expect("init populated state");
1272 let content_size = state_ref.frame_header.frame_content_size();
1273 let fcs_declared = state_ref.frame_header.fcs_declared();
1274 let dict_active = state_ref.using_dict.is_some();
1275 let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1276 // Per-block checksums collected inside `run_direct_decode`
1277 // post-loop (over recorded (start, end) ranges of `output`)
1278 // so the direct path stays eligible AND keeps the
1279 // window-size cap (`drop_to_window_size`) between blocks
1280 // that the spec relies on for `offset <= window_size`
1281 // validation. Path choice no longer alters checksum
1282 // semantics.
1283 let direct_eligible =
1284 content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1285 if direct_eligible {
1286 let written = self.run_direct_decode(&mut input, output, content_size)?;
1287 output = &mut output[written..];
1288 total_bytes_written += written;
1289 continue;
1290 }
1291 let frame_start_total = total_bytes_written;
1292 loop {
1293 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1294 let bytes_written = self
1295 .read(output)
1296 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1297 output = &mut output[bytes_written..];
1298 total_bytes_written += bytes_written;
1299 if self.can_collect() != 0 {
1300 return Err(FrameDecoderError::TargetTooSmall);
1301 }
1302 if self.is_finished() {
1303 break;
1304 }
1305 }
1306 // Per-frame FCS validation on the legacy fallback path.
1307 // Use `fcs_declared()` (NOT `content_size > 0`) so an
1308 // empty frame with explicit FCS=0 on the wire still gets
1309 // validated.
1310 if fcs_declared {
1311 let produced = (total_bytes_written - frame_start_total) as u64;
1312 if produced != content_size {
1313 return Err(FrameDecoderError::FrameContentSizeMismatch {
1314 declared: content_size,
1315 produced,
1316 });
1317 }
1318 }
1319 }
1320
1321 Ok(total_bytes_written)
1322 }
1323
1324 /// `lsm`-feature decode_all_impl: adds the optional skippable
1325 /// visitor parameter consumed by
1326 /// [`Self::decode_all_with_skippable_visitor`]. Mirrors the no-lsm
1327 /// variant including the direct-path dispatch + FCS-validation
1328 /// rationale comments, so the two functions stay in sync; the only
1329 /// behavioral difference is the SkipFrame arm, which uses
1330 /// `split_at(length)` (single bounds check) instead of two
1331 /// separate `get(..length)` / `get(length..)` slices and invokes
1332 /// the visitor (when `Some`) on the borrowed payload before
1333 /// advancing past it.
1334 #[cfg(feature = "lsm")]
1335 #[allow(clippy::type_complexity)]
1336 fn decode_all_impl(
1337 &mut self,
1338 mut input: &[u8],
1339 mut output: &mut [u8],
1340 mut init_frame: impl FnMut(&mut Self, &mut &[u8]) -> Result<(), FrameDecoderError>,
1341 mut skippable_visitor: Option<&mut dyn FnMut(u8, &[u8])>,
1342 ) -> Result<usize, FrameDecoderError> {
1343 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1344 let mut total_bytes_written = 0;
1345 while !input.is_empty() {
1346 match init_frame(self, &mut input) {
1347 Ok(_) => {}
1348 Err(FrameDecoderError::ReadFrameHeaderError(
1349 crate::decoding::errors::ReadFrameHeaderError::SkipFrame {
1350 magic_number,
1351 length,
1352 },
1353 )) => {
1354 let length = length as usize;
1355 // Visitor sees the payload slice BEFORE we advance
1356 // past it. Borrowed slice — no allocation. The
1357 // variant is the low nibble of the magic number
1358 // (RFC 8878 §3.1.2). `read_frame_header` only emits
1359 // SkipFrame for magic in 0x184D2A50..=0x184D2A5F, so
1360 // the subtraction fits in 0..=15.
1361 if input.len() < length {
1362 return Err(FrameDecoderError::FailedToSkipFrame);
1363 }
1364 let (payload, rest) = input.split_at(length);
1365 if let Some(visitor) = skippable_visitor.as_mut() {
1366 let variant = (magic_number - 0x184D2A50) as u8;
1367 visitor(variant, payload);
1368 }
1369 input = rest;
1370 continue;
1371 }
1372 Err(e) => return Err(e),
1373 };
1374 // Per-frame direct-path dispatch. Now safe to route the
1375 // public `decode_all` here because
1376 // `UserSliceBackend::exec_sequence_inline` returns
1377 // `Result<(), ExecuteSequencesError>` instead of
1378 // panicking on capacity overflow; the error propagates
1379 // up as `FrameDecoderError`. Eligibility (FCS > 0, no
1380 // active dict, remaining `output` slice has WILDCOPY
1381 // slack) puts the frame on the fast path that bypasses
1382 // the FlatBuf/Ring -> `read()` drain copy. Ineligible
1383 // frames (no FCS, active dict, output too small for
1384 // slack) fall through to the legacy `decode_blocks` +
1385 // `read` drain loop below.
1386 let state_ref = self.state.as_ref().expect("init populated state");
1387 let content_size = state_ref.frame_header.frame_content_size();
1388 let fcs_declared = state_ref.frame_header.fcs_declared();
1389 let dict_active = state_ref.using_dict.is_some();
1390 let needed = content_size.saturating_add(WILDCOPY_OVERLENGTH as u64);
1391 let direct_eligible =
1392 content_size > 0 && !dict_active && (output.len() as u64) >= needed;
1393 if direct_eligible {
1394 let written = self.run_direct_decode(&mut input, output, content_size)?;
1395 output = &mut output[written..];
1396 total_bytes_written += written;
1397 continue;
1398 }
1399 let frame_start_total = total_bytes_written;
1400 loop {
1401 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
1402 let bytes_written = self
1403 .read(output)
1404 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
1405 output = &mut output[bytes_written..];
1406 total_bytes_written += bytes_written;
1407 if self.can_collect() != 0 {
1408 return Err(FrameDecoderError::TargetTooSmall);
1409 }
1410 if self.is_finished() {
1411 break;
1412 }
1413 }
1414 // Per-frame FCS validation on the legacy fallback path.
1415 // Use `fcs_declared()` (NOT `content_size > 0`) so an
1416 // empty frame with explicit FCS=0 on the wire still gets
1417 // validated.
1418 if fcs_declared {
1419 let produced = (total_bytes_written - frame_start_total) as u64;
1420 if produced != content_size {
1421 return Err(FrameDecoderError::FrameContentSizeMismatch {
1422 declared: content_size,
1423 produced,
1424 });
1425 }
1426 }
1427 }
1428
1429 Ok(total_bytes_written)
1430 }
1431
1432 /// Decode multiple frames into the output slice using a serialized dictionary.
1433 ///
1434 /// # Warning
1435 ///
1436 /// Each decoded frame is initialized with the parsed dictionary, even when a
1437 /// frame header omits the optional dictionary ID. Callers must only use this
1438 /// API when they already know the input frames were encoded with that
1439 /// dictionary; otherwise decoded output can be silently corrupted.
1440 pub fn decode_all_with_dict_bytes(
1441 &mut self,
1442 input: &[u8],
1443 output: &mut [u8],
1444 raw_dictionary: &[u8],
1445 ) -> Result<usize, FrameDecoderError> {
1446 let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
1447 self.decode_all_with_dict_handle(input, output, &dict)
1448 }
1449
1450 /// Decode multiple frames into the extra capacity of the output vector.
1451 ///
1452 /// `input` must contain an exact number of frames.
1453 ///
1454 /// `output` must have enough extra capacity to hold the decompressed data.
1455 /// This function reserves an additional [`WILDCOPY_OVERLENGTH`]
1456 /// bytes on top of the caller's capacity so the per-frame direct
1457 /// decode path stays eligible — that may grow the vector by up
1458 /// to that fixed amount via `Vec::reserve`. It will NOT grow
1459 /// further to fit the decompressed payload itself; the caller's
1460 /// pre-allocated capacity must already cover the data. If you
1461 /// don't know how large the output will be, use
1462 /// [`FrameDecoder::decode_blocks`] instead.
1463 ///
1464 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
1465 ///
1466 /// The length of the output vector is updated to include the decompressed data.
1467 /// The length is not changed if an error occurs. The
1468 /// `WILDCOPY_OVERLENGTH` slack is internal — `output.len()` on
1469 /// return is the actual decompressed size, NOT the inflated
1470 /// capacity. Callers who pre-sized the Vec with
1471 /// `Vec::with_capacity(fcs)` see no functional change beyond
1472 /// the small one-time capacity bump.
1473 pub fn decode_all_to_vec(
1474 &mut self,
1475 input: &[u8],
1476 output: &mut Vec<u8>,
1477 ) -> Result<(), FrameDecoderError> {
1478 use super::buffer_backend::WILDCOPY_OVERLENGTH;
1479 let len = output.len();
1480 // Reserve WILDCOPY slack on top of the caller's capacity so
1481 // `decode_all` can land on the direct path even when the
1482 // caller didn't pre-allocate slack themselves.
1483 output.reserve(WILDCOPY_OVERLENGTH);
1484 let cap = output.capacity();
1485 output.resize(cap, 0);
1486 match self.decode_all(input, &mut output[len..]) {
1487 Ok(bytes_written) => {
1488 let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
1489 output.resize(new_len, 0);
1490 Ok(())
1491 }
1492 Err(e) => {
1493 output.resize(len, 0);
1494 Err(e)
1495 }
1496 }
1497 }
1498
1499 /// Single-frame direct-decode path. Decodes one zstd frame into
1500 /// `output[..content_size]` via a stack-local
1501 /// `DecodeBuffer<UserSliceBackend>`, bypassing the per-block
1502 /// FlatBuf/Ring -> `read()` drain copy.
1503 ///
1504 /// # Preconditions (caller-enforced)
1505 ///
1506 /// - `self.init` (or `init_with_dict_handle`) was called for
1507 /// this frame so `self.state` is populated.
1508 /// - `content_size` matches `self.state.frame_header
1509 /// .frame_content_size()` and is `> 0` (caller already passed
1510 /// the eligibility gate).
1511 /// - `output.len() >= content_size + WILDCOPY_OVERLENGTH`.
1512 /// - No active dictionary
1513 /// (`self.state.using_dict.is_none()`).
1514 ///
1515 /// On return, `input` points at the byte immediately after the
1516 /// frame's checksum (or after the last block, when the frame
1517 /// has `content_checksum_flag = 0`). `self.state.frame_finished`
1518 /// is set so [`Self::is_finished`] reports `true`.
1519 fn run_direct_decode(
1520 &mut self,
1521 input: &mut &[u8],
1522 output: &mut [u8],
1523 content_size: u64,
1524 ) -> Result<usize, FrameDecoderError> {
1525 use super::block_decoder;
1526 use super::decode_buffer::DecodeBuffer;
1527 use super::scratch::DirectScratch;
1528 use super::user_slice_buf::UserSliceBackend;
1529 use crate::io::Read;
1530 use FrameDecoderError as err;
1531
1532 let state = self
1533 .state
1534 .as_mut()
1535 .expect("caller ensures init populated state");
1536
1537 // Borrow persistent fields out of whichever scratch variant
1538 // `init` produced (Flat for single_segment, Ring for
1539 // multi-segment) — both expose the same HUF/FSE/Vec
1540 // fields; only `buffer` differs and we don't use that here.
1541 // Macro-style binding avoids the closure / generic
1542 // gymnastics of returning multiple `&mut` from a match arm.
1543 let (huf, fse, offset_hist, literals_buffer, sequences, block_content_buffer, window_size) =
1544 match &mut state.decoder_scratch {
1545 DecoderScratchKind::Flat(s) => (
1546 &mut s.huf,
1547 &mut s.fse,
1548 &mut s.offset_hist,
1549 &mut s.literals_buffer,
1550 &mut s.sequences,
1551 &mut s.block_content_buffer,
1552 s.buffer.window_size,
1553 ),
1554 DecoderScratchKind::Ring(s) => (
1555 &mut s.huf,
1556 &mut s.fse,
1557 &mut s.offset_hist,
1558 &mut s.literals_buffer,
1559 &mut s.sequences,
1560 &mut s.block_content_buffer,
1561 s.buffer.window_size,
1562 ),
1563 };
1564 let backend = UserSliceBackend::from_slice(output);
1565 let buffer = DecodeBuffer::from_backend(backend, window_size);
1566 let mut direct = DirectScratch {
1567 huf,
1568 fse,
1569 offset_hist,
1570 literals_buffer,
1571 sequences,
1572 block_content_buffer,
1573 buffer,
1574 };
1575
1576 // Block loop. Mirrors `decode_blocks` (without the
1577 // strategy-bounded early exit — we always decode the whole
1578 // frame in one shot for the direct path). Keeps
1579 // `state.bytes_read_counter` / `state.block_counter` in
1580 // sync with `decode_blocks` so post-call accessors
1581 // (`bytes_read_from_source`, `blocks_decoded`) return
1582 // accurate values.
1583 let mut block_dec = block_decoder::new();
1584 // Track total output bytes against the declared
1585 // `frame_content_size` via the buffer's actual write
1586 // counter — `BlockHeader.decompressed_size` is 0 for
1587 // Compressed blocks (the header parser can't know the
1588 // expanded size before decoding the body), so per-header
1589 // tracking would always count 0 for those blocks and
1590 // miscount frames that aren't pure Raw/RLE.
1591 let mut produced: u64 = 0;
1592 // Per-block output ranges captured during the direct-path
1593 // loop. After the loop we re-borrow `output` (post-drop of
1594 // `direct`) and XXH64 each range into
1595 // `self.computed_block_checksums`, so the digests vector
1596 // stays consistent with the legacy `decode_blocks` path
1597 // regardless of which dispatch the frame took.
1598 // `Vec::new()` does not allocate, so this stays free when
1599 // `per_block_checksums_enabled` is false: the `push` and the
1600 // post-loop hashing loop are both gated by the same flag.
1601 #[cfg(all(feature = "lsm", feature = "hash"))]
1602 let mut block_ranges: alloc::vec::Vec<(usize, usize)> = alloc::vec::Vec::new();
1603 loop {
1604 #[cfg(all(feature = "lsm", feature = "hash"))]
1605 let produced_before: Option<usize> = if self.per_block_checksums_enabled {
1606 Some(produced as usize)
1607 } else {
1608 None
1609 };
1610 let (block_header, hsize) = block_dec
1611 .read_block_header(&mut *input)
1612 .map_err(err::FailedToReadBlockHeader)?;
1613 state.bytes_read_counter += u64::from(hsize);
1614 // Pre-flight FCS check ONLY for Raw / RLE blocks where
1615 // `decompressed_size` is the actual block output size.
1616 // For Compressed blocks the header field is 0; the
1617 // post-decode check below catches overflow via the
1618 // backend's actual write counter delta.
1619 let block_upper = u64::from(block_header.decompressed_size);
1620 if block_upper > 0 && produced + block_upper > content_size {
1621 // Frame is corrupt — Raw/RLE block headers claim
1622 // more output than the FCS allows.
1623 return Err(err::FrameContentSizeMismatch {
1624 declared: content_size,
1625 produced: produced + block_upper,
1626 });
1627 }
1628 // Slice-source fast path: consume the block body
1629 // straight from `input` without copying into the
1630 // persistent `block_content_buffer`.
1631 let body_consumed = match block_dec.decode_block_content_from_slice(
1632 &block_header,
1633 &mut direct,
1634 &mut *input,
1635 ) {
1636 Ok(n) => n,
1637 // Defense-in-depth: RLE / Raw block whose declared
1638 // `decompressed_size` slipped past the per-block
1639 // pre-flight above and tripped the backend's
1640 // fallible write surface.
1641 Err(crate::decoding::errors::DecodeBlockContentError::BackendOverflow {
1642 ..
1643 }) => {
1644 // Use saturating_add on the
1645 // `produced + decompressed_size` sum. Each block
1646 // is bounded by 128 KiB (MAX_BLOCK_SIZE), but
1647 // accumulated `produced` can grow toward
1648 // u64::MAX across adversarial frames. Saturating
1649 // avoids a panic on the error path itself.
1650 return Err(err::FrameContentSizeMismatch {
1651 declared: content_size,
1652 produced: produced
1653 .saturating_add(u64::from(block_header.decompressed_size)),
1654 });
1655 }
1656 Err(e) => return Err(err::FailedToReadBlockBody(e)),
1657 };
1658 produced = direct.buffer.buffer_ref().tail() as u64;
1659 // Post-decode FCS overflow check.
1660 if produced > content_size {
1661 return Err(err::FrameContentSizeMismatch {
1662 declared: content_size,
1663 produced,
1664 });
1665 }
1666 state.bytes_read_counter += body_consumed;
1667 state.block_counter += 1;
1668 #[cfg(all(feature = "lsm", feature = "hash"))]
1669 if let Some(produced_before) = produced_before {
1670 block_ranges.push((produced_before, produced as usize));
1671 }
1672 // Cap the visible buffer at window_size between blocks
1673 // so the next block's match-offset validation matches
1674 // the spec's `offset <= window_size` rule.
1675 direct.buffer.drop_to_window_size();
1676 if block_header.last_block {
1677 if state.frame_header.descriptor.content_checksum_flag() {
1678 let mut chksum = [0u8; 4];
1679 input
1680 .read_exact(&mut chksum)
1681 .map_err(err::FailedToReadChecksum)?;
1682 state.bytes_read_counter += 4;
1683 state.check_sum = Some(u32::from_le_bytes(chksum));
1684 }
1685 break;
1686 }
1687 }
1688 // Final sanity: blocks summed to exactly `content_size`.
1689 if produced != content_size {
1690 return Err(err::FrameContentSizeMismatch {
1691 declared: content_size,
1692 produced,
1693 });
1694 }
1695
1696 let written = content_size as usize;
1697 state.frame_finished = true;
1698 // Drop the stack-local DirectScratch (and its DecodeBuffer
1699 // borrow on `output`) so we can re-borrow `output` for the
1700 // hash pass below.
1701 drop(direct);
1702 // Per-block XXH64 (low 32 bits) over the captured ranges.
1703 // Mirrors `decode_blocks`' per-block hashing so the digests
1704 // vector stays identical regardless of which dispatch path
1705 // the frame took. Ranges were recorded inside the loop while
1706 // `direct` held a mutable borrow on `output`; now that the
1707 // borrow is dropped we can read the slices directly.
1708 #[cfg(all(feature = "lsm", feature = "hash"))]
1709 if self.per_block_checksums_enabled {
1710 use core::hash::Hasher;
1711 for (start, end) in &block_ranges {
1712 let mut h = twox_hash::XxHash64::with_seed(0);
1713 h.write(&output[*start..*end]);
1714 self.computed_block_checksums.push(h.finish() as u32);
1715 }
1716 }
1717 #[cfg(feature = "hash")]
1718 {
1719 // Direct path bypasses the per-write hash accounting
1720 // (DecodeBuffer hashes during drain; the direct path
1721 // never drains because the user slice IS the buffer).
1722 // Walk the decoded output once and propagate the
1723 // resulting hasher state into the persistent scratch's
1724 // buffer so `get_calculated_checksum()` returns the
1725 // right value path-independently.
1726 use core::hash::Hasher;
1727 let mut hasher = twox_hash::XxHash64::with_seed(0);
1728 hasher.write(&output[..written]);
1729 match &mut state.decoder_scratch {
1730 DecoderScratchKind::Flat(s) => s.buffer.hash = hasher,
1731 DecoderScratchKind::Ring(s) => s.buffer.hash = hasher,
1732 }
1733 }
1734 Ok(written)
1735 }
1736}
1737
1738/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
1739/// this will retain window_size bytes, else it will drain it completely
1740impl Read for FrameDecoder {
1741 fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
1742 let state = match &mut self.state {
1743 None => return Ok(0),
1744 Some(s) => s,
1745 };
1746 if state.frame_finished {
1747 state.decoder_scratch.buffer_read_all(target)
1748 } else {
1749 state.decoder_scratch.buffer_read(target)
1750 }
1751 }
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756 extern crate std;
1757
1758 use super::{DictionaryHandle, FrameDecoder};
1759 use crate::encoding::{CompressionLevel, FrameCompressor};
1760 use alloc::vec::Vec;
1761
1762 #[test]
1763 fn decode_all_legacy_drain_matches_direct_path_on_single_segment_frame() {
1764 // Roundtrip a small payload through the encoder, then decode
1765 // it via `decode_all` on two output shapes that select
1766 // different internal paths:
1767 // 1. Tight output (no WILDCOPY_OVERLENGTH slack) → legacy
1768 // `decode_blocks` + `read()` drain path.
1769 // 2. Output with WILDCOPY slack → direct
1770 // `run_direct_decode` + `UserSliceBackend` path.
1771 // Both paths must produce identical output bytes — the only
1772 // difference is the internal buffer/drain shape, not the
1773 // decoded semantics. This is the regression gate for the
1774 // direct-decode wiring.
1775 let payload: Vec<u8> = (0..4096u32).map(|i| (i & 0xFF) as u8).collect();
1776 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1777 compressor.set_source(payload.as_slice());
1778 let mut compressed = Vec::new();
1779 compressor.set_drain(&mut compressed);
1780 compressor.compress();
1781
1782 // Baseline: tight output → legacy drain path.
1783 let mut dec_a = FrameDecoder::new();
1784 let mut out_a = alloc::vec![0u8; payload.len()];
1785 let n_a = dec_a
1786 .decode_all(compressed.as_slice(), &mut out_a)
1787 .expect("decode_all (legacy drain) should succeed");
1788 assert_eq!(n_a, payload.len());
1789 assert_eq!(&out_a[..n_a], payload.as_slice());
1790
1791 // Direct: output with WILDCOPY slack → direct path.
1792 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1793 let mut dec_b = FrameDecoder::new();
1794 let mut out_b = alloc::vec![0u8; payload.len() + slack];
1795 let n_b = dec_b
1796 .decode_all(compressed.as_slice(), &mut out_b)
1797 .expect("decode_all (direct path) should succeed");
1798 assert_eq!(
1799 n_b,
1800 payload.len(),
1801 "direct decode produced wrong byte count"
1802 );
1803 assert_eq!(&out_b[..n_b], payload.as_slice());
1804 }
1805
1806 #[test]
1807 fn decode_all_multi_segment_frame_decodes_correctly() {
1808 // Multi-segment frame: payload large enough that the
1809 // encoder's default frame layout has `single_segment_flag =
1810 // false` and `window_size < frame_content_size`. The direct
1811 // path must cap the visible buffer at window_size after each
1812 // block (drop_to_window_size) so match-offset validation
1813 // matches the spec rule `offset <= window_size`, and still
1814 // produce the same bytes as decode_all on the
1815 // FlatBuf/Ring-backed path.
1816 //
1817 // Make the payload structured so multi-segment behavior
1818 // actually kicks in: 2 MiB of repeating + random-ish bytes
1819 // forces window_size lower than content_size at the encoder.
1820 let mut payload: Vec<u8> = Vec::with_capacity(2 * 1024 * 1024);
1821 for i in 0..payload.capacity() {
1822 payload.push((i.wrapping_mul(2_654_435_761) & 0xFF) as u8);
1823 }
1824 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1825 compressor.set_source(payload.as_slice());
1826 let mut compressed = Vec::new();
1827 compressor.set_drain(&mut compressed);
1828 compressor.compress();
1829
1830 // Baseline: decode_all through the FlatBuf+drain path.
1831 let mut dec_a = FrameDecoder::new();
1832 let mut out_a = alloc::vec![0u8; payload.len()];
1833 let n_a = dec_a
1834 .decode_all(compressed.as_slice(), &mut out_a)
1835 .expect("decode_all should succeed");
1836 assert_eq!(n_a, payload.len());
1837 assert_eq!(&out_a[..n_a], payload.as_slice());
1838
1839 // Direct path: must give identical bytes via UserSliceBackend
1840 // + per-block drop_to_window_size.
1841 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1842 let mut dec_b = FrameDecoder::new();
1843 let mut out_b = alloc::vec![0u8; payload.len() + slack];
1844 let n_b = dec_b
1845 .decode_all(compressed.as_slice(), &mut out_b)
1846 .expect("decode_all should succeed on multi-segment frame");
1847 assert_eq!(n_b, payload.len(), "wrong byte count on direct path");
1848 assert_eq!(&out_b[..n_b], payload.as_slice());
1849
1850 // Sanity-check: confirm the encoded frame really IS
1851 // multi-segment. If a future encoder default changes,
1852 // catching the assumption here is better than silently
1853 // testing single_segment on this name.
1854 let mut sanity = FrameDecoder::new();
1855 sanity.init(&mut compressed.as_slice()).unwrap();
1856 assert!(
1857 !sanity
1858 .state
1859 .as_ref()
1860 .unwrap()
1861 .frame_header
1862 .descriptor
1863 .single_segment_flag(),
1864 "test precondition violated: frame is single-segment, rename or resize"
1865 );
1866 }
1867
1868 #[cfg(feature = "hash")]
1869 #[test]
1870 fn decode_all_propagates_checksum_into_persistent_scratch() {
1871 // Direct path on a checksum-flagged frame: the FrameCompressor
1872 // under `feature = "hash"` sets content_checksum_flag, so the
1873 // decoded frame has a recorded checksum. After
1874 // decode_all we must be able to verify it matches via
1875 // the public get_calculated_checksum() accessor — the digest
1876 // is computed by walking output at end of decode and stored
1877 // into the persistent scratch's hasher.
1878 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
1879 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1880 compressor.set_source(payload.as_slice());
1881 let mut compressed = Vec::new();
1882 compressor.set_drain(&mut compressed);
1883 compressor.compress();
1884
1885 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1886 let mut dec = FrameDecoder::new();
1887 let mut out = alloc::vec![0u8; payload.len() + slack];
1888 let n = dec
1889 .decode_all(compressed.as_slice(), &mut out)
1890 .expect("decode_all with checksum must succeed");
1891 assert_eq!(n, payload.len());
1892 assert_eq!(&out[..n], payload.as_slice());
1893
1894 // Both sides must report the same checksum: the frame header
1895 // carries the stored u32, and get_calculated_checksum reads
1896 // the running digest the direct path just propagated.
1897 let stored = dec.get_checksum_from_data();
1898 let calculated = dec.get_calculated_checksum();
1899 assert!(stored.is_some(), "frame must carry stored checksum");
1900 assert!(
1901 calculated.is_some(),
1902 "direct path must propagate calculated checksum"
1903 );
1904 assert_eq!(
1905 stored, calculated,
1906 "stored vs calculated checksum mismatch on direct path"
1907 );
1908 }
1909
1910 #[test]
1911 fn decode_all_fcs_overflow_via_corrupt_frame_returns_structured_error() {
1912 // Hand-build a corrupt frame that declares
1913 // frame_content_size = 4 but the (last) block carries a
1914 // larger Raw payload. The pre-flight FCS check inside the
1915 // direct path's block loop catches this and returns the
1916 // structured FrameContentSizeMismatch variant — not a
1917 // panic, not a generic TargetTooSmall.
1918 //
1919 // Frame layout (single_segment, FCS=4):
1920 // magic 4 bytes 0xFD2FB528
1921 // FHD 1 byte single_segment=1, no checksum,
1922 // FCS field size = 0 (-> 1-byte FCS)
1923 // FCS 1 byte 0x04
1924 // block_header 3 bytes last=1, type=Raw, block_size=10
1925 // block_payload 10 bytes 0xAA repeated
1926 let mut frame = alloc::vec::Vec::new();
1927 // magic
1928 frame.extend_from_slice(&0xFD2FB528u32.to_le_bytes());
1929 // FHD: single_segment=1, fcs_flag=0 (1-byte FCS), no checksum,
1930 // no dict. Bit layout: FCS(7-6)=0, single_segment(5)=1,
1931 // reserved/uncs(4)=0, content_checksum(2)=0, dict(0-1)=00.
1932 frame.push(0b0010_0000);
1933 // FCS: 1 byte
1934 frame.push(4);
1935 // Block header: cBlockSize=10, type=Raw (0), last=1
1936 // 3-byte LE: bit0=last, bits1-2=type(2 bits), bits3-23=size
1937 let cblock_size: u32 = 10;
1938 let bh: u32 = 1 | (cblock_size << 3); // last=1, type=Raw=0
1939 frame.push((bh & 0xFF) as u8);
1940 frame.push((bh >> 8) as u8);
1941 frame.push((bh >> 16) as u8);
1942 // Payload — 10 bytes that, if decoded, would exceed FCS=4.
1943 frame.extend(core::iter::repeat_n(0xAAu8, 10));
1944
1945 let slack = super::super::buffer_backend::WILDCOPY_OVERLENGTH;
1946 let mut dec = FrameDecoder::new();
1947 let mut out = alloc::vec![0u8; 4 + slack];
1948 let err = dec
1949 .decode_all(&frame, &mut out)
1950 .expect_err("FCS-overflow frame must fail decode");
1951 assert!(
1952 matches!(
1953 err,
1954 super::FrameDecoderError::FrameContentSizeMismatch { .. }
1955 ),
1956 "expected FrameContentSizeMismatch, got {:?}",
1957 err
1958 );
1959 }
1960
1961 #[test]
1962 fn decode_all_falls_back_when_output_too_small_for_wildcopy_slack() {
1963 // Output sized exactly to frame_content_size (no
1964 // WILDCOPY_OVERLENGTH slack) must NOT trigger the direct
1965 // path — the burst's `extend_from_within_unchecked` writes
1966 // past `tail` into the slack region. Direct dispatcher
1967 // recognises this and falls back to the FlatBuf + drain
1968 // path which still produces the right output.
1969 let payload: Vec<u8> = (0..2048u32)
1970 .map(|i| (i.wrapping_mul(31) & 0xFF) as u8)
1971 .collect();
1972 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
1973 compressor.set_source(payload.as_slice());
1974 let mut compressed = Vec::new();
1975 compressor.set_drain(&mut compressed);
1976 compressor.compress();
1977
1978 let mut dec = FrameDecoder::new();
1979 // Exactly payload.len(), no slack — direct path is gated out.
1980 let mut out = alloc::vec![0u8; payload.len()];
1981 let n = dec
1982 .decode_all(compressed.as_slice(), &mut out)
1983 .expect("decode_all should still succeed via fallback");
1984 assert_eq!(n, payload.len());
1985 assert_eq!(&out[..n], payload.as_slice());
1986 }
1987
1988 #[test]
1989 fn decode_all_fallback_validates_fcs_against_total_output() {
1990 // Synthetic single-segment frame: FCS = 20 bytes, but the
1991 // last-block flag fires after only 4 bytes of raw payload.
1992 // On the direct path this would trip the post-block
1993 // `produced > content_size` check; the fallback path
1994 // (eligible=false because output is sized exactly to FCS,
1995 // no WILDCOPY slack) used to silently return Ok(4). With
1996 // the fix it now surfaces `FrameContentSizeMismatch`
1997 // matching the direct path.
1998 //
1999 // Frame layout: 4 B magic | 1 B FHD (single_segment=1,
2000 // FCS_flag=3 → 8-byte FCS) | 8 B FCS=20 | block header
2001 // (Raw, last, size=4) | 4 raw bytes.
2002 let mut wire = Vec::new();
2003 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes()); // magic
2004 // FHD: FCS_flag=3 (8-byte FCS) <<6 | single_segment=1 <<5.
2005 wire.push(0b1110_0000);
2006 wire.extend_from_slice(&20u64.to_le_bytes()); // declared FCS
2007 // Block header: (size << 3) | (block_type << 1) | last_block.
2008 // Raw block (block_type=0), last_block=1, size=4 → 0b00100001 = 0x21.
2009 wire.push(0x21);
2010 wire.push(0x00);
2011 wire.push(0x00);
2012 wire.extend_from_slice(&[1u8, 2, 3, 4]);
2013
2014 let mut dec = FrameDecoder::new();
2015 // Size output exactly at declared FCS (no WILDCOPY slack)
2016 // so the eligibility check gates the direct path out.
2017 let mut out = alloc::vec![0u8; 20];
2018 let err = dec
2019 .decode_all(wire.as_slice(), &mut out)
2020 .expect_err("fallback must reject corrupt FCS underflow");
2021 match err {
2022 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2023 declared,
2024 produced,
2025 } => {
2026 assert_eq!(declared, 20);
2027 assert_eq!(produced, 4);
2028 }
2029 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2030 }
2031 }
2032
2033 #[test]
2034 fn decode_all_fallback_treats_explicit_fcs_zero_as_declared() {
2035 // Synthetic multi-segment frame with FCS_flag=2 (4-byte
2036 // FCS) explicitly set to 0. The header DECLARES zero
2037 // content, but the body carries a 5-byte raw last-block.
2038 // `fcs_declared()` must return true (the field is on the
2039 // wire) so the fallback's post-decode size check sees the
2040 // mismatch — even though `frame_content_size == 0`. This
2041 // is exactly the FCS=0 edge case where the previous
2042 // `content_size > 0` proxy would have silently accepted
2043 // the corrupt frame.
2044 //
2045 // Frame layout:
2046 // 4 B magic — 28 B5 2F FD
2047 // 1 B FHD — FCS_flag=2 (bits 7-6), no
2048 // single_segment, content_checksum=0,
2049 // dict_id_flag=0 → 0b1000_0000
2050 // 1 B window_descriptor — exp=10, mantissa=0 → window=1 MiB
2051 // 4 B FCS — 0 LE
2052 // 3 B block header — raw, last, size=5 → 0x29 0x00 0x00
2053 // 5 B raw payload — anything non-empty
2054 let mut wire = Vec::new();
2055 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2056 wire.push(0b1000_0000); // FHD: FCS_flag=2, others 0.
2057 wire.push(0x50); // window_descriptor: exp=10, mantissa=0.
2058 wire.extend_from_slice(&0u32.to_le_bytes()); // FCS = 0.
2059 // Block header (24-bit LE): (size << 3) | (block_type << 1) | last_block
2060 // = (5 << 3) | (0 << 1) | 1 = 0x29.
2061 wire.push(0x29);
2062 wire.push(0x00);
2063 wire.push(0x00);
2064 wire.extend_from_slice(&[1u8, 2, 3, 4, 5]);
2065
2066 let mut dec = FrameDecoder::new();
2067 // FCS=0 declared, so eligibility (`content_size > 0`)
2068 // false — falls through to the drain loop. Output buffer
2069 // size doesn't matter for the eligibility check here;
2070 // give it some room so `read()` can drain the block.
2071 let mut out = alloc::vec![0u8; 16];
2072 let err = dec
2073 .decode_all(wire.as_slice(), &mut out)
2074 .expect_err("corrupt FCS=0 + 5-byte block must error");
2075 match err {
2076 crate::decoding::errors::FrameDecoderError::FrameContentSizeMismatch {
2077 declared,
2078 produced,
2079 } => {
2080 assert_eq!(declared, 0);
2081 assert_eq!(produced, 5);
2082 }
2083 other => panic!("expected FrameContentSizeMismatch, got {other:?}"),
2084 }
2085 }
2086
2087 #[test]
2088 fn decode_all_fallback_accepts_honest_explicit_fcs_zero() {
2089 // Companion to the corrupt-FCS=0 test above: an HONEST
2090 // empty frame with FCS_flag=2 (4-byte FCS) explicitly set
2091 // to 0 AND a 0-byte raw last-block. `fcs_declared()`
2092 // returns true and `content_size == 0 == total_written`,
2093 // so the fallback validation accepts the frame instead of
2094 // misreporting a mismatch.
2095 //
2096 // (Single-segment FCS=0 would test a similar invariant
2097 // but trips header-stage validation: `window_size =
2098 // frame_content_size = 0 < MIN_WINDOW_SIZE` fails the
2099 // window-size sanity check before decode runs. Use the
2100 // multi-segment shape where `window_size` comes from
2101 // `window_descriptor` independently of FCS.)
2102 //
2103 // Frame layout:
2104 // 4 B magic
2105 // 1 B FHD — FCS_flag=2, others 0 → 0x80
2106 // 1 B window_descriptor — exp=10 → 1 MiB window
2107 // 4 B FCS — 0 LE
2108 // 3 B block header — raw, last, size=0 → 0x01 0x00 0x00
2109 let mut wire = Vec::new();
2110 wire.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2111 wire.push(0b1000_0000);
2112 wire.push(0x50);
2113 wire.extend_from_slice(&0u32.to_le_bytes());
2114 // Block header: (0 << 3) | (0 << 1) | 1 = 0x01.
2115 wire.push(0x01);
2116 wire.push(0x00);
2117 wire.push(0x00);
2118
2119 let mut dec = FrameDecoder::new();
2120 let mut out = alloc::vec![0u8; 16];
2121 let n = dec
2122 .decode_all(wire.as_slice(), &mut out)
2123 .expect("honest FCS=0 + empty block must succeed");
2124 assert_eq!(n, 0);
2125 }
2126
2127 #[test]
2128 fn reset_with_dict_handle_applies_dict_when_no_dict_id() {
2129 let payload = b"reset-without-dict-id";
2130 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2131 compressor.set_source(payload.as_slice());
2132 let mut compressed = Vec::new();
2133 compressor.set_drain(&mut compressed);
2134 compressor.compress();
2135
2136 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2137 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dictionary should parse");
2138
2139 let mut decoder = FrameDecoder::new();
2140 decoder
2141 .reset_with_dict_handle(compressed.as_slice(), &handle)
2142 .expect("reset should succeed");
2143 let state = decoder.state.as_ref().expect("state should be initialized");
2144 assert!(state.frame_header.dictionary_id().is_none());
2145 assert_eq!(state.using_dict, Some(handle.id()));
2146 }
2147
2148 #[cfg(feature = "lsm")]
2149 mod expect_validation {
2150 use super::*;
2151 use crate::decoding::errors::FrameDecoderError;
2152
2153 fn compress(payload: &[u8]) -> Vec<u8> {
2154 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2155 compressor.set_source(payload);
2156 let mut compressed = Vec::new();
2157 compressor.set_drain(&mut compressed);
2158 compressor.compress();
2159 compressed
2160 }
2161
2162 fn compress_with_dict(payload: &[u8], dict_raw: &[u8]) -> Vec<u8> {
2163 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2164 compressor
2165 .set_dictionary_from_bytes(dict_raw)
2166 .expect("dict load");
2167 compressor.set_source(payload);
2168 let mut compressed = Vec::new();
2169 compressor.set_drain(&mut compressed);
2170 compressor.compress();
2171 compressed
2172 }
2173
2174 #[test]
2175 fn expect_dict_id_none_default_allows_anything() {
2176 let compressed = compress(b"hello-no-expect");
2177 let mut decoder = FrameDecoder::new();
2178 decoder
2179 .reset(compressed.as_slice())
2180 .expect("default None passes");
2181 }
2182
2183 #[test]
2184 fn expect_dict_id_zero_matches_frame_without_dict_id() {
2185 // Default-encoded frame has no dict_id; pinning Some(0)
2186 // ("no dictionary expected") must accept it.
2187 let compressed = compress(b"payload");
2188 let mut decoder = FrameDecoder::new();
2189 decoder.expect_dict_id(Some(0));
2190 decoder
2191 .reset(compressed.as_slice())
2192 .expect("Some(0) ~ None");
2193 }
2194
2195 #[test]
2196 fn expect_dict_id_matching_value_passes() {
2197 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2198 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2199 let actual_id = handle.id();
2200
2201 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2202
2203 let mut decoder = FrameDecoder::new();
2204 decoder.expect_dict_id(Some(actual_id));
2205 // Decode requires the dict to be registered; using
2206 // reset_with_dict_handle for that.
2207 decoder
2208 .reset_with_dict_handle(compressed.as_slice(), &handle)
2209 .expect("matching dict_id passes");
2210 }
2211
2212 #[test]
2213 fn expect_dict_id_mismatching_value_fails_before_decode() {
2214 let dict_raw = include_bytes!("../../dict_tests/dictionary");
2215 let handle = DictionaryHandle::decode_dict(dict_raw).expect("dict parse");
2216 let actual_id = handle.id();
2217 let wrong_id = actual_id.wrapping_add(1);
2218
2219 let compressed = compress_with_dict(b"payload-with-dict", dict_raw);
2220
2221 let mut decoder = FrameDecoder::new();
2222 decoder.expect_dict_id(Some(wrong_id));
2223 let err = decoder
2224 .reset_with_dict_handle(compressed.as_slice(), &handle)
2225 .expect_err("mismatch must fail");
2226 match err {
2227 FrameDecoderError::UnexpectedDictId { expected, found } => {
2228 assert_eq!(expected, Some(wrong_id));
2229 assert_eq!(found, Some(actual_id));
2230 }
2231 other => panic!("expected UnexpectedDictId, got {other:?}"),
2232 }
2233 }
2234
2235 #[test]
2236 fn expect_dict_id_nonzero_fails_on_frame_without_dict_id() {
2237 // Frame has no dict_id; expecting Some(42) (non-zero)
2238 // must fail with found = None.
2239 let compressed = compress(b"no-dict-frame");
2240 let mut decoder = FrameDecoder::new();
2241 decoder.expect_dict_id(Some(42));
2242 let err = decoder
2243 .reset(compressed.as_slice())
2244 .expect_err("nonzero expectation on dictless frame must fail");
2245 match err {
2246 FrameDecoderError::UnexpectedDictId { expected, found } => {
2247 assert_eq!(expected, Some(42));
2248 assert_eq!(found, None);
2249 }
2250 other => panic!("expected UnexpectedDictId, got {other:?}"),
2251 }
2252 }
2253
2254 #[test]
2255 fn expect_window_descriptor_none_default_allows_anything() {
2256 let compressed = compress(b"hello-no-wd-expect");
2257 let mut decoder = FrameDecoder::new();
2258 decoder
2259 .reset(compressed.as_slice())
2260 .expect("default None passes");
2261 }
2262
2263 #[test]
2264 fn expect_window_descriptor_mismatch_fails_before_decode() {
2265 // Compress a payload large enough to force a
2266 // multi-segment frame (window_descriptor on wire).
2267 // Default compression at >256 KiB produces multi-
2268 // segment frames with a real window_descriptor byte.
2269 let payload = alloc::vec![0xABu8; 512 * 1024];
2270 let compressed = compress(&payload);
2271
2272 // Read the actual window_descriptor by decoding once
2273 // without expectations, then pin a wrong value.
2274 let mut probe_decoder = FrameDecoder::new();
2275 probe_decoder.reset(compressed.as_slice()).unwrap();
2276 let probe_state = probe_decoder.state.as_ref().unwrap();
2277 let actual_wd = probe_state
2278 .frame_header
2279 .window_descriptor()
2280 .expect("multi-segment frame should expose window_descriptor");
2281 let wrong_wd = actual_wd.wrapping_add(0x10); // bump exponent
2282
2283 let mut decoder = FrameDecoder::new();
2284 decoder.expect_window_descriptor(Some(wrong_wd));
2285 let err = decoder
2286 .reset(compressed.as_slice())
2287 .expect_err("wrong window_descriptor must fail");
2288 match err {
2289 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2290 assert_eq!(expected, wrong_wd);
2291 assert_eq!(found, Some(actual_wd));
2292 }
2293 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2294 }
2295 }
2296
2297 /// Build a minimal synthetic single-segment zstd frame
2298 /// carrying a 4-byte raw payload. RFC 8878 §3.1.1.1
2299 /// layout, hand-rolled because our default
2300 /// `FrameCompressor` settings don't emit
2301 /// `single_segment_flag` for tiny inputs.
2302 ///
2303 /// Wire bytes (13 total for 4-byte payload):
2304 /// ```text
2305 /// 28 B5 2F FD magic
2306 /// 20 FHD: single_segment=1, FCS_flag=0
2307 /// 04 FCS (single byte, value = payload.len())
2308 /// 21 00 00 block header: raw, last, size=4
2309 /// .. .. .. .. payload bytes
2310 /// ```
2311 fn synth_single_segment_frame(payload: &[u8]) -> Vec<u8> {
2312 assert!(payload.len() <= 255, "1-byte FCS field caps at 255");
2313 assert!(payload.len() < (1usize << 21), "block size 21-bit max");
2314 let mut out = Vec::new();
2315 // Magic 0xFD2FB528 LE.
2316 out.extend_from_slice(&0xFD2F_B528u32.to_le_bytes());
2317 // FHD: single_segment_flag (bit 5) set, everything
2318 // else zero. With single_segment + FCS_flag=0 the FCS
2319 // field is 1 byte. No window_descriptor on wire.
2320 out.push(0b0010_0000);
2321 // 1-byte FCS = payload length.
2322 out.push(payload.len() as u8);
2323 // Block header (3 bytes LE):
2324 // last_block=1, block_type=0 (Raw), block_size=payload.len().
2325 // Encoded: (size << 3) | (block_type << 1) | last_block.
2326 // Block header: last_block flag in bit 0, block_type
2327 // (0 = Raw) in bits 1-2, block size in bits 3+.
2328 let bh: u32 = ((payload.len() as u32) << 3) | 1;
2329 out.push((bh & 0xFF) as u8);
2330 out.push(((bh >> 8) & 0xFF) as u8);
2331 out.push(((bh >> 16) & 0xFF) as u8);
2332 // Raw payload.
2333 out.extend_from_slice(payload);
2334 out
2335 }
2336
2337 #[test]
2338 fn expect_window_descriptor_on_single_segment_frame_fails_with_found_none() {
2339 // Single-segment frames omit the window_descriptor
2340 // byte from the wire entirely. Setting an expectation
2341 // here must surface `found: None` so callers
2342 // distinguish "wrong descriptor" from "no descriptor
2343 // on the wire" — never silently pass.
2344 let compressed = synth_single_segment_frame(b"tiny");
2345
2346 // First sanity-check: the synthetic frame decodes
2347 // cleanly without any expectation.
2348 {
2349 let mut probe = FrameDecoder::new();
2350 probe
2351 .reset(compressed.as_slice())
2352 .expect("synth frame parses");
2353 let probe_state = probe.state.as_ref().unwrap();
2354 assert!(
2355 probe_state.frame_header.window_descriptor().is_none(),
2356 "synth frame must be single-segment"
2357 );
2358 }
2359
2360 let mut decoder = FrameDecoder::new();
2361 decoder.expect_window_descriptor(Some(0x40));
2362 let err = decoder
2363 .reset(compressed.as_slice())
2364 .expect_err("single-segment + expectation must fail");
2365 match err {
2366 FrameDecoderError::UnexpectedWindowDescriptor { expected, found } => {
2367 assert_eq!(expected, 0x40);
2368 assert_eq!(found, None);
2369 }
2370 other => panic!("expected UnexpectedWindowDescriptor, got {other:?}"),
2371 }
2372 }
2373
2374 #[test]
2375 fn validation_failure_leaves_decoder_re_resettable() {
2376 // After UnexpectedDictId on a wrong-expectation reset,
2377 // clearing the expectation and re-calling reset must
2378 // succeed on the same source — no lingering failed
2379 // state.
2380 let compressed = compress(b"re-resettable");
2381
2382 let mut decoder = FrameDecoder::new();
2383 decoder.expect_dict_id(Some(42));
2384 let err = decoder
2385 .reset(compressed.as_slice())
2386 .expect_err("first reset fails");
2387 assert!(matches!(err, FrameDecoderError::UnexpectedDictId { .. }));
2388
2389 // Clear expectation and retry on a fresh source.
2390 decoder.expect_dict_id(None);
2391 decoder
2392 .reset(compressed.as_slice())
2393 .expect("retry after clearing expectation should succeed");
2394 }
2395 }
2396
2397 /// Build a skippable frame on the wire: 4-byte LE magic + 4-byte LE
2398 /// length + payload bytes. RFC 8878 §3.1.2 restricts the magic
2399 /// variant to `0..=15`; assert here so accidental misuse of the
2400 /// helper can't smuggle a non-skippable magic past the tests.
2401 #[cfg(feature = "lsm")]
2402 fn build_skippable_frame(variant: u8, payload: &[u8]) -> Vec<u8> {
2403 assert!(
2404 variant <= 15,
2405 "skippable-frame variant {variant} outside RFC 8878 0..=15 range",
2406 );
2407 let mut out = Vec::with_capacity(8 + payload.len());
2408 let magic: u32 = 0x184D2A50 + u32::from(variant);
2409 out.extend_from_slice(&magic.to_le_bytes());
2410 out.extend_from_slice(&u32::try_from(payload.len()).unwrap().to_le_bytes());
2411 out.extend_from_slice(payload);
2412 out
2413 }
2414
2415 #[cfg(feature = "lsm")]
2416 #[test]
2417 fn decode_all_with_skippable_visitor_sees_payloads_in_order() {
2418 // Build a stream: skippable(v0, "alpha") + zstd_frame +
2419 // skippable(v3, "beta") + zstd_frame + skippable(v15, "")
2420 // and verify the visitor is invoked exactly three times with
2421 // the correct (variant, payload) pairs in stream order while
2422 // the zstd frames decode normally.
2423 let payload_a: Vec<u8> = (0..256u16).map(|i| i as u8).collect();
2424 let payload_b: Vec<u8> = (0..256u16).map(|i| (i ^ 0xAA) as u8).collect();
2425
2426 let mut comp_a = Vec::new();
2427 let mut c = FrameCompressor::new(CompressionLevel::Default);
2428 c.set_source(payload_a.as_slice());
2429 c.set_drain(&mut comp_a);
2430 c.compress();
2431
2432 let mut comp_b = Vec::new();
2433 let mut c = FrameCompressor::new(CompressionLevel::Default);
2434 c.set_source(payload_b.as_slice());
2435 c.set_drain(&mut comp_b);
2436 c.compress();
2437
2438 let skip0 = build_skippable_frame(0, b"alpha");
2439 let skip3 = build_skippable_frame(3, b"beta");
2440 let skip15 = build_skippable_frame(15, &[]);
2441
2442 let mut stream = Vec::new();
2443 stream.extend_from_slice(&skip0);
2444 stream.extend_from_slice(&comp_a);
2445 stream.extend_from_slice(&skip3);
2446 stream.extend_from_slice(&comp_b);
2447 stream.extend_from_slice(&skip15);
2448
2449 let mut decoder = FrameDecoder::new();
2450 let mut out = alloc::vec![0u8; payload_a.len() + payload_b.len()];
2451 let mut collected: Vec<(u8, Vec<u8>)> = Vec::new();
2452 let n = decoder
2453 .decode_all_with_skippable_visitor(stream.as_slice(), &mut out, |variant, payload| {
2454 collected.push((variant, payload.to_vec()));
2455 })
2456 .expect("decode_all_with_skippable_visitor should succeed");
2457
2458 // All three skippables visited in stream order.
2459 assert_eq!(collected.len(), 3);
2460 assert_eq!(collected[0], (0u8, b"alpha".to_vec()));
2461 assert_eq!(collected[1], (3u8, b"beta".to_vec()));
2462 assert_eq!(collected[2], (15u8, Vec::<u8>::new()));
2463
2464 // Both zstd frames decoded into `out` back-to-back.
2465 assert_eq!(n, payload_a.len() + payload_b.len());
2466 assert_eq!(&out[..payload_a.len()], payload_a.as_slice());
2467 assert_eq!(&out[payload_a.len()..n], payload_b.as_slice());
2468 }
2469
2470 #[cfg(feature = "lsm")]
2471 #[test]
2472 fn decode_all_silently_skips_when_no_visitor() {
2473 // Regression gate: plain decode_all must still silently skip
2474 // skippable frames (RFC 8878 mandated behavior) with no
2475 // behavioral change after the visitor refactor.
2476 let payload: Vec<u8> = (0..512u16).map(|i| i as u8).collect();
2477 let mut comp = Vec::new();
2478 let mut c = FrameCompressor::new(CompressionLevel::Default);
2479 c.set_source(payload.as_slice());
2480 c.set_drain(&mut comp);
2481 c.compress();
2482
2483 let skip = build_skippable_frame(7, b"ignored sidecar");
2484 let mut stream = Vec::new();
2485 stream.extend_from_slice(&skip);
2486 stream.extend_from_slice(&comp);
2487
2488 let mut decoder = FrameDecoder::new();
2489 let mut out = alloc::vec![0u8; payload.len()];
2490 let n = decoder
2491 .decode_all(stream.as_slice(), &mut out)
2492 .expect("decode_all should succeed on skippable + zstd stream");
2493 assert_eq!(n, payload.len());
2494 assert_eq!(&out[..n], payload.as_slice());
2495 }
2496
2497 #[cfg(feature = "lsm")]
2498 #[test]
2499 fn frame_emit_info_describes_emitted_block_layout() {
2500 // Encode a payload large enough to force >1 block, fetch
2501 // FrameEmitInfo, walk blocks[] and verify each block's
2502 // (offset_in_frame, header_size, body_size) matches the bytes
2503 // actually emitted into the drain buffer.
2504 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2505 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2506 compressor.set_source(payload.as_slice());
2507 let mut compressed = Vec::new();
2508 compressor.set_drain(&mut compressed);
2509 compressor.compress();
2510
2511 let info = compressor
2512 .last_frame_emit_info()
2513 .expect("last_frame_emit_info populated after compress")
2514 .clone();
2515 drop(compressor);
2516
2517 // Frame header range starts at 0 and is non-empty.
2518 assert_eq!(info.frame_header_range.start, 0);
2519 assert!(info.frame_header_range.end > 0);
2520 // Total size matches what was written to the drain.
2521 assert_eq!(info.total_size as usize, compressed.len());
2522 // At least one block, and the last entry has last_block=true.
2523 assert!(!info.blocks.is_empty());
2524 assert!(info.blocks.last().unwrap().last_block);
2525 // All non-final blocks have last_block=false.
2526 for b in &info.blocks[..info.blocks.len() - 1] {
2527 assert!(!b.last_block);
2528 }
2529 // Walk and verify each block's header bytes match the
2530 // recorded type / size by re-decoding the 3-byte header.
2531 // Walking arithmetic: offset_in_frame + header_size + body_size
2532 // must land exactly on the next block's offset_in_frame (or,
2533 // for the last block, on the checksum / end of frame).
2534 for (i, b) in info.blocks.iter().enumerate() {
2535 let off = b.offset_in_frame as usize;
2536 assert_eq!(b.header_size, 3);
2537 let mut hdr = [0u8; 4];
2538 hdr[..3].copy_from_slice(&compressed[off..off + 3]);
2539 let raw = u32::from_le_bytes(hdr);
2540 let last = (raw & 1) != 0;
2541 let ty = (raw >> 1) & 0b11;
2542 let sz = raw >> 3;
2543 assert_eq!(last, b.last_block);
2544 assert_eq!(sz, b.block_size_field);
2545 // body_size is the PHYSICAL length on the wire: spec's
2546 // Block_Size for Raw/Compressed, always 1 for RLE.
2547 let expected_physical = match b.block_type {
2548 crate::encoding::frame_emit_info::BlockType::RLE => 1,
2549 _ => sz,
2550 };
2551 assert_eq!(b.body_size, expected_physical);
2552 let expected_ty = match b.block_type {
2553 crate::encoding::frame_emit_info::BlockType::Raw => 0,
2554 crate::encoding::frame_emit_info::BlockType::RLE => 1,
2555 crate::encoding::frame_emit_info::BlockType::Compressed => 2,
2556 crate::encoding::frame_emit_info::BlockType::Reserved => 3,
2557 };
2558 assert_eq!(ty, expected_ty);
2559 // Walking-arithmetic invariant.
2560 let next_off = b.offset_in_frame + b.header_size as u32 + b.body_size;
2561 if let Some(next) = info.blocks.get(i + 1) {
2562 assert_eq!(
2563 next_off, next.offset_in_frame,
2564 "block {i} body_size doesn't reach next block's offset_in_frame",
2565 );
2566 } else if let Some(cs) = info.checksum_range.as_ref() {
2567 assert_eq!(
2568 next_off, cs.start,
2569 "last block body_size doesn't reach checksum_range.start",
2570 );
2571 } else {
2572 assert_eq!(
2573 next_off, info.total_size,
2574 "last block body_size doesn't reach total_size",
2575 );
2576 }
2577 }
2578 // Checksum range present iff `feature = "hash"` is enabled.
2579 assert_eq!(info.checksum_range.is_some(), cfg!(feature = "hash"));
2580 }
2581
2582 #[cfg(all(feature = "lsm", feature = "hash"))]
2583 #[test]
2584 fn per_block_checksum_round_trip() {
2585 // Encode with per-block checksums enabled. Decode with
2586 // per-block verification. Both sides emit exactly 1
2587 // checksum per physical block written to / read from the
2588 // wire (encoder hashes per emission site, including each
2589 // post-split partition; decoder hashes each decoded block).
2590 // Cardinality and element-wise contents must match
2591 // round-trip.
2592 let payload: Vec<u8> = (0..200_000u32).map(|i| (i & 0xFF) as u8).collect();
2593 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
2594 compressor.set_source(payload.as_slice());
2595 compressor.enable_per_block_checksums();
2596 let mut compressed = Vec::new();
2597 compressor.set_drain(&mut compressed);
2598 compressor.compress();
2599
2600 let encoder_checksums = compressor
2601 .last_frame_block_checksums()
2602 .expect("checksums populated after enable + compress")
2603 .to_vec();
2604 drop(compressor);
2605 assert!(!encoder_checksums.is_empty());
2606
2607 // Decode side: enable verification, decode, compare.
2608 let mut decoder = FrameDecoder::new();
2609 decoder.enable_per_block_checksums();
2610 let mut output = alloc::vec![0u8; payload.len()];
2611 let n = decoder
2612 .decode_all(compressed.as_slice(), &mut output)
2613 .expect("decode_all should succeed");
2614 assert_eq!(n, payload.len());
2615 assert_eq!(&output[..n], payload.as_slice());
2616
2617 let decoder_checksums = decoder.computed_block_checksums();
2618 assert_eq!(decoder_checksums, encoder_checksums.as_slice());
2619 }
2620}