Skip to main content

structured_zstd/decoding/
streaming_decoder.rs

1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::common::MAX_BLOCK_SIZE;
6use crate::decoding::errors::FrameDecoderError;
7use crate::decoding::{BlockDecodingStrategy, DictionaryHandle, FrameDecoder};
8#[cfg(not(feature = "std"))]
9use crate::io::ErrorKind;
10use crate::io::{Error, Read};
11
12/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
13///
14/// This decoder implements `io::Read`, so you can interact with it by calling
15/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
16///
17/// If you need more control over how decompression takes place, you can use
18/// the lower level [FrameDecoder], which allows for greater control over how
19/// decompression takes place but the implementor must call
20/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
21///
22/// ## Caveat
23/// Plain `read` / `read_exact` operate on the single frame this decoder was
24/// initialised with: they do not advance into following frames. `read_to_end`,
25/// by contrast, is specialised to consume a finite source to EOF, decoding
26/// concatenated frames and skipping skippable frames along the way.
27///
28/// To recover the bytes that follow one frame WITHOUT consuming the rest of the
29/// source, recreate the decoder manually and handle
30/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
31/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
32///
33/// ```no_run
34/// // `File` is std-only; `read_to_end` itself is available under no_std too.
35/// #[cfg(feature = "std")]
36/// {
37///     use std::fs::File;
38///     use std::io::Read;
39///     use structured_zstd::decoding::StreamingDecoder;
40///
41///     // Read a Zstandard archive from the filesystem then decompress it into a vec.
42///     let mut f: File = todo!("Read a .zstd archive from somewhere");
43///     let mut decoder = StreamingDecoder::new(f).unwrap();
44///     let mut result = Vec::new();
45///     Read::read_to_end(&mut decoder, &mut result).unwrap();
46/// }
47/// ```
48pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
49    pub decoder: DEC,
50    source: READ,
51    /// Dictionary the decoder was constructed with, if any. Retained so the
52    /// `read_to_end` paths can re-initialise FOLLOWING concatenated frames with
53    /// the same forced dictionary (a plain re-init resolves dictionaries by
54    /// frame id only and would lose a forced dict for frames omitting the id).
55    /// Cheap to hold: `DictionaryHandle` is an `Arc`/`Rc` handle.
56    dict: Option<DictionaryHandle>,
57}
58
59impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
60    pub fn new_with_decoder(
61        mut source: READ,
62        mut decoder: DEC,
63    ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
64        decoder.borrow_mut().init(&mut source)?;
65        Ok(StreamingDecoder {
66            decoder,
67            source,
68            dict: None,
69        })
70    }
71}
72
73impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
74    pub fn new(
75        mut source: READ,
76    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
77        let mut decoder = FrameDecoder::new();
78        decoder.init(&mut source)?;
79        Ok(StreamingDecoder {
80            decoder,
81            source,
82            dict: None,
83        })
84    }
85
86    /// Create a streaming decoder using a pre-parsed dictionary handle.
87    ///
88    /// # Warning
89    ///
90    /// This constructor initializes the underlying [`FrameDecoder`] with
91    /// `dict`, even if a frame header omits the optional dictionary ID.
92    /// Callers must only use it when they already know the stream was encoded
93    /// with this dictionary; otherwise decoded output can be silently
94    /// corrupted.
95    pub fn new_with_dictionary_handle(
96        mut source: READ,
97        dict: &DictionaryHandle,
98    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
99        let mut decoder = FrameDecoder::new();
100        decoder.init_with_dict_handle(&mut source, dict)?;
101        Ok(StreamingDecoder {
102            decoder,
103            source,
104            dict: Some(dict.clone()),
105        })
106    }
107
108    /// Create a streaming decoder using a serialized dictionary blob.
109    ///
110    /// # Warning
111    ///
112    /// This API forwards to [`StreamingDecoder::new_with_dictionary_handle`]
113    /// and therefore applies the decoded dictionary to frames whose headers may
114    /// omit the optional dictionary ID. Only use it when the stream is known to
115    /// be encoded with that dictionary.
116    pub fn new_with_dictionary_bytes(
117        source: READ,
118        raw_dictionary: &[u8],
119    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
120        let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
121        Self::new_with_dictionary_handle(source, &dict)
122    }
123}
124
125impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
126    /// Gets a reference to the underlying reader.
127    pub fn get_ref(&self) -> &READ {
128        &self.source
129    }
130
131    /// Gets a mutable reference to the underlying reader.
132    ///
133    /// It is inadvisable to directly read from the underlying reader.
134    pub fn get_mut(&mut self) -> &mut READ {
135        &mut self.source
136    }
137
138    /// Destructures this object into the inner reader.
139    pub fn into_inner(self) -> READ
140    where
141        READ: Sized,
142    {
143        self.source
144    }
145
146    /// Destructures this object into both the inner reader and [FrameDecoder].
147    pub fn into_parts(self) -> (READ, DEC)
148    where
149        READ: Sized,
150    {
151        (self.source, self.decoder)
152    }
153
154    /// Destructures this object into the inner [FrameDecoder].
155    pub fn into_frame_decoder(self) -> DEC {
156        self.decoder
157    }
158}
159
160impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
161    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
162        let decoder = self.decoder.borrow_mut();
163        if decoder.is_finished() && decoder.can_collect() == 0 {
164            // Frame fully decoded and fully drained: the running XXH64 digest
165            // is final, so a `Verify`-mode decoder validates the content
166            // checksum at this finish point. No-op in other modes.
167            #[cfg(feature = "hash")]
168            if let Err(e) = decoder.verify_content_checksum() {
169                #[cfg(feature = "std")]
170                return Err(Error::other(e));
171                #[cfg(not(feature = "std"))]
172                return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
173            }
174            //No more bytes can ever be decoded
175            return Ok(0);
176        }
177
178        // Interleave bounded decode with draining so the decode window
179        // (`RingBuffer`) stays near `window_size` instead of accumulating the
180        // whole request before a single end-of-call drain. `read_to_end` hands
181        // ever-larger buffers; decoding `buf.len()` worth into the ring up
182        // front grew it far past the window (repeated `reserve_amortized`
183        // alloc+copy). Decode at most one block worth per step, then drain
184        // what is now collectable into `buf`, mirroring upstream zstd's
185        // window-bounded flush loop.
186        let mut written = 0;
187        while written < buf.len() {
188            // Drain whatever is collectable now (retaining `window_size` until
189            // the frame finishes). Reclaims the ring promptly so the next
190            // decode step reuses the same capacity.
191            written += decoder.read(&mut buf[written..])?;
192            if written == buf.len() || decoder.is_finished() {
193                break;
194            }
195            // Decode one bounded chunk. `UptoBytes` may overshoot a little but
196            // is capped to one block, so the ring's live region stays within
197            // `window_size + MAX_BLOCK_SIZE`.
198            let step = (buf.len() - written).min(MAX_BLOCK_SIZE as usize);
199            if let Err(e) =
200                decoder.decode_blocks(&mut self.source, BlockDecodingStrategy::UptoBytes(step))
201            {
202                #[cfg(feature = "std")]
203                {
204                    return Err(Error::other(e));
205                }
206                #[cfg(not(feature = "std"))]
207                {
208                    return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
209                }
210            }
211        }
212
213        // The loop can finish AND fully drain a frame within this same call
214        // (decode last block, then drain it into `buf`). Validate here too when
215        // the frame is finished and nothing is left to collect, but ONLY when
216        // this call wrote no bytes: the `Read` contract forbids returning `Err`
217        // after bytes were delivered, so when `written > 0` the verify is
218        // deferred to the next call, where the top early-return runs it and
219        // returns `Err` on the zero-byte path. Idempotent with that top check.
220        #[cfg(feature = "hash")]
221        if written == 0
222            && decoder.is_finished()
223            && decoder.can_collect() == 0
224            && let Err(e) = decoder.verify_content_checksum()
225        {
226            #[cfg(feature = "std")]
227            return Err(Error::other(e));
228            #[cfg(not(feature = "std"))]
229            return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
230        }
231
232        Ok(written)
233    }
234
235    /// Decode-in-place fast path for whole-frame consumption. Instead of the
236    /// generic `read` loop (decode block -> `RingBuffer` -> copy into the
237    /// caller buffer), buffer the (compressed, hence small) source and decode
238    /// STRAIGHT into `output`'s spare capacity via the single-copy direct path,
239    /// pre-sized from the frame's declared content size. Only taken when the
240    /// decoder is at a frame boundary (nothing partially decoded / undrained);
241    /// otherwise it falls back to the generic grow-and-`read` loop so a caller
242    /// that mixed `read` with `read_to_end` still gets correct output.
243    ///
244    /// Per the `Read::read_to_end` contract this consumes the source to EOF: if
245    /// the stream holds several concatenated frames they are ALL decoded (and
246    /// skippable frames skipped). To recover bytes that follow a single frame,
247    /// use `read` plus the
248    /// [`SkipFrame`](crate::decoding::errors::ReadFrameHeaderError::SkipFrame)
249    /// recreate-the-decoder pattern instead.
250    #[cfg(feature = "std")]
251    fn read_to_end(&mut self, output: &mut alloc::vec::Vec<u8>) -> Result<usize, Error> {
252        let start_total = output.len();
253        // `new()` already read the frame header, so the fast path applies when
254        // the decoder sits at the start of that frame with nothing decoded yet.
255        let at_start = {
256            let d = self.decoder.borrow_mut();
257            d.is_at_frame_start() && d.can_collect() == 0
258        };
259        // Clone the (cheap Arc/Rc) dict handle out so the `decoder` borrow below
260        // does not conflict with borrowing `self.dict`.
261        let dict = self.dict.clone();
262        if at_start {
263            let mut compressed = alloc::vec::Vec::new();
264            self.source.read_to_end(&mut compressed)?;
265            self.decoder
266                .borrow_mut()
267                .decode_current_frame_to_vec(&compressed, output, dict.as_ref())
268                .map_err(Error::other)?;
269            return Ok(output.len() - start_total);
270        }
271        // Mid-frame fallback: drain the partially-read CURRENT frame through the
272        // generic path, then decode any FOLLOWING concatenated frames so
273        // read_to_end still consumes the source to true EOF.
274        loop {
275            let start = output.len();
276            output.resize(start + MAX_BLOCK_SIZE as usize, 0);
277            // On error, drop the just-grown (zeroed) tail before propagating so
278            // the caller never observes bytes that were never decoded.
279            let n = match self.read(&mut output[start..]) {
280                Ok(n) => n,
281                Err(e) => {
282                    output.truncate(start);
283                    return Err(e);
284                }
285            };
286            output.truncate(start + n);
287            if n == 0 {
288                break;
289            }
290        }
291        // Current frame fully drained; `source` is positioned at the next frame.
292        let mut rest = alloc::vec::Vec::new();
293        self.source.read_to_end(&mut rest)?;
294        if !rest.is_empty() {
295            let mut input = rest.as_slice();
296            self.decoder
297                .borrow_mut()
298                .decode_concatenated_frames_to_vec(&mut input, output, dict.as_ref())
299                .map_err(Error::other)?;
300        }
301        Ok(output.len() - start_total)
302    }
303
304    /// no_std counterpart of the decode-in-place `read_to_end` fast path above
305    /// (the no_std `Read::read_to_end` returns `()` instead of the byte count).
306    #[cfg(not(feature = "std"))]
307    fn read_to_end(&mut self, output: &mut alloc::vec::Vec<u8>) -> Result<(), Error> {
308        let at_start = {
309            let d = self.decoder.borrow_mut();
310            d.is_at_frame_start() && d.can_collect() == 0
311        };
312        // Cheap Arc/Rc clone so the `decoder` borrow does not conflict with
313        // borrowing `self.dict`.
314        let dict = self.dict.clone();
315        if at_start {
316            let mut compressed = alloc::vec::Vec::new();
317            self.source.read_to_end(&mut compressed)?;
318            self.decoder
319                .borrow_mut()
320                .decode_current_frame_to_vec(&compressed, output, dict.as_ref())
321                .map_err(|e| Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)))?;
322            return Ok(());
323        }
324        // Mid-frame fallback: drain the partial CURRENT frame, then decode the
325        // FOLLOWING concatenated frames so the source is consumed to true EOF.
326        loop {
327            let start = output.len();
328            output.resize(start + MAX_BLOCK_SIZE as usize, 0);
329            // On error, drop the just-grown (zeroed) tail before propagating so
330            // the caller never observes bytes that were never decoded.
331            let n = match self.read(&mut output[start..]) {
332                Ok(n) => n,
333                Err(e) => {
334                    output.truncate(start);
335                    return Err(e);
336                }
337            };
338            output.truncate(start + n);
339            if n == 0 {
340                break;
341            }
342        }
343        let mut rest = alloc::vec::Vec::new();
344        self.source.read_to_end(&mut rest)?;
345        if !rest.is_empty() {
346            let mut input = rest.as_slice();
347            self.decoder
348                .borrow_mut()
349                .decode_concatenated_frames_to_vec(&mut input, output, dict.as_ref())
350                .map_err(|e| Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)))?;
351        }
352        Ok(())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::StreamingDecoder;
359    use crate::io::Read;
360
361    /// `Read::read` must not return `Err` after it has already written bytes
362    /// into the caller's buffer (the trait mandates that an error implies no
363    /// bytes were read). When a single `read` call both drains the final bytes
364    /// of a `Verify`-mode frame AND finishes it, a checksum mismatch must be
365    /// deferred: those bytes are delivered as `Ok(n)` and the error surfaces on
366    /// the next (zero-byte) call, where returning `Err` violates no contract.
367    #[cfg(feature = "hash")]
368    #[test]
369    fn read_delivering_bytes_defers_checksum_error_to_next_call() {
370        use crate::decoding::ContentChecksum;
371        use crate::encoding::{CompressionLevel, FrameCompressor};
372        use crate::io::ErrorKind;
373        use alloc::vec;
374        use alloc::vec::Vec;
375
376        let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
377        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
378        // Checksum is the subject under test; the encoder default is off
379        // (upstream library parity).
380        compressor.set_content_checksum(true);
381        compressor.set_source(payload.as_slice());
382        let mut compressed = Vec::new();
383        compressor.set_drain(&mut compressed);
384        compressor.compress();
385
386        // Corrupt the trailing 4-byte content checksum: the body still decodes
387        // to the right bytes, but the stored digest no longer matches.
388        let last = compressed.len() - 1;
389        compressed[last] ^= 0xFF;
390
391        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
392        decoder
393            .decoder
394            .set_content_checksum(ContentChecksum::Verify);
395
396        // A buffer large enough to drain the whole frame in one call: this call
397        // finishes the frame AND writes every payload byte. The mismatch must
398        // NOT abort it (that would drop the delivered bytes).
399        let mut buf = vec![0u8; payload.len() + 4096];
400        let n = decoder
401            .read(&mut buf)
402            .expect("a read that delivered bytes must not return the checksum Err");
403        assert_eq!(n, payload.len());
404        assert_eq!(&buf[..n], payload.as_slice());
405
406        // The deferred mismatch surfaces on the terminating zero-byte read.
407        let err = decoder
408            .read(&mut buf)
409            .expect_err("deferred checksum mismatch must surface on the terminating read");
410        assert_eq!(err.kind(), ErrorKind::Other);
411    }
412
413    /// A fresh `read_to_end` must take the single-copy decode-in-place path
414    /// (FCS-declared frame decoded straight into the output `Vec`, no ring
415    /// drain) AND reproduce the payload byte-for-byte.
416    #[cfg(feature = "std")]
417    #[test]
418    fn read_to_end_decode_in_place_matches_and_takes_direct_path() {
419        use crate::encoding::{CompressionLevel, FrameCompressor};
420        use alloc::vec::Vec;
421
422        let payload: Vec<u8> = (0..20_000u32)
423            .map(|i| (i.wrapping_mul(2654435761) >> 24) as u8)
424            .collect();
425        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
426        compressor.set_source(payload.as_slice());
427        let mut compressed = Vec::new();
428        compressor.set_drain(&mut compressed);
429        compressor.compress();
430
431        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
432        let mut out = Vec::new();
433        let n = decoder.read_to_end(&mut out).unwrap();
434        assert_eq!(n, payload.len());
435        assert_eq!(out, payload);
436        // FrameCompressor declares FCS, so the fresh fast path used the direct
437        // (decode-in-place) route, not the ring drain.
438        assert_eq!(decoder.decoder.direct_frames(), 1);
439    }
440
441    /// `read_to_end` after a partial `read` must still produce the full
442    /// payload. The decoder is mid-frame, so the fast path is skipped and the
443    /// generic grow-and-drain fallback runs (no direct frame).
444    #[cfg(feature = "std")]
445    #[test]
446    fn read_to_end_after_partial_read_is_complete() {
447        use crate::encoding::{CompressionLevel, FrameCompressor};
448        use alloc::vec;
449        use alloc::vec::Vec;
450
451        let payload: Vec<u8> = (0..20_000u32).map(|i| (i & 0xFF) as u8).collect();
452        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
453        compressor.set_source(payload.as_slice());
454        let mut compressed = Vec::new();
455        compressor.set_drain(&mut compressed);
456        compressor.compress();
457
458        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
459        let mut head = vec![0u8; 4096];
460        let got = decoder.read(&mut head).unwrap();
461        assert!(got > 0 && got <= head.len());
462
463        let mut out = Vec::new();
464        out.extend_from_slice(&head[..got]);
465        decoder.read_to_end(&mut out).unwrap();
466        assert_eq!(out, payload);
467        // Mid-frame entry → fallback path, never the direct route.
468        assert_eq!(decoder.decoder.direct_frames(), 0);
469    }
470
471    /// `read_to_end` reads the WHOLE source to EOF: a stream of concatenated
472    /// frames must decode every frame, not just the first. (The fast path
473    /// buffers the whole source, so dropping the trailing frame would lose
474    /// data.)
475    #[cfg(feature = "std")]
476    #[test]
477    fn read_to_end_decodes_all_concatenated_frames() {
478        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
479        use alloc::vec::Vec;
480
481        let a: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
482        let b: Vec<u8> = (0..3000u32)
483            .map(|i| ((i.wrapping_mul(7)) & 0xFF) as u8)
484            .collect();
485        let mut stream = compress_slice_to_vec(&a, CompressionLevel::Level(3));
486        stream.extend_from_slice(&compress_slice_to_vec(&b, CompressionLevel::Level(3)));
487
488        let mut decoder = StreamingDecoder::new(stream.as_slice()).unwrap();
489        let mut out = Vec::new();
490        decoder.read_to_end(&mut out).unwrap();
491
492        let mut expected = a.clone();
493        expected.extend_from_slice(&b);
494        assert_eq!(out, expected);
495        // Both FCS-declared frames took the direct path.
496        assert_eq!(decoder.decoder.direct_frames(), 2);
497    }
498
499    /// `read_to_end` after a partial `read` must STILL consume the source to
500    /// EOF across concatenated frames, not stop at the current frame's end. The
501    /// partial read forces the mid-frame fallback path; with two concatenated
502    /// frames the fallback must finish frame 1, then advance through frame 2.
503    #[cfg(feature = "std")]
504    #[test]
505    fn read_to_end_after_partial_read_decodes_all_concatenated_frames() {
506        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
507        use alloc::vec;
508        use alloc::vec::Vec;
509
510        let a: Vec<u8> = (0..6000u32).map(|i| (i & 0xFF) as u8).collect();
511        let b: Vec<u8> = (0..4000u32)
512            .map(|i| ((i.wrapping_mul(11)) & 0xFF) as u8)
513            .collect();
514        let mut stream = compress_slice_to_vec(&a, CompressionLevel::Level(3));
515        stream.extend_from_slice(&compress_slice_to_vec(&b, CompressionLevel::Level(3)));
516
517        let mut decoder = StreamingDecoder::new(stream.as_slice()).unwrap();
518        // Partial read of frame 1 → mid-frame, so read_to_end takes the fallback.
519        let mut head = vec![0u8; 2048];
520        let got = decoder.read(&mut head).unwrap();
521        assert!(got > 0 && got <= head.len());
522
523        let mut out = Vec::new();
524        out.extend_from_slice(&head[..got]);
525        decoder.read_to_end(&mut out).unwrap();
526
527        let mut expected = a.clone();
528        expected.extend_from_slice(&b);
529        assert_eq!(
530            out, expected,
531            "fallback path must decode frame 2 too, not stop at frame 1 EOF"
532        );
533    }
534
535    /// `read_to_end` on a stream of concatenated DICTIONARY frames must decode
536    /// every frame WITH the dictionary the decoder was constructed with. The
537    /// fast-path concatenated loop re-initialises following frames, and a plain
538    /// re-init resolves dictionaries by frame id only — losing the forced
539    /// dictionary for frames that omit (or can't resolve) the id.
540    #[cfg(feature = "std")]
541    #[test]
542    fn read_to_end_concatenated_dict_frames_decode_with_dictionary() {
543        use crate::encoding::{CompressionLevel, FrameCompressor};
544        use alloc::vec::Vec;
545
546        let dict_raw = include_bytes!("../../dict_tests/dictionary");
547        let compress_with_dict = |payload: &[u8]| -> Vec<u8> {
548            let mut compressor = FrameCompressor::new(CompressionLevel::Default);
549            compressor
550                .set_dictionary_from_bytes(dict_raw)
551                .expect("dict load");
552            compressor.set_source(payload);
553            let mut compressed = Vec::new();
554            compressor.set_drain(&mut compressed);
555            compressor.compress();
556            compressed
557        };
558
559        let a = b"first dictionary-compressed frame payload".to_vec();
560        let b = b"second dictionary-compressed frame payload".to_vec();
561        let mut stream = compress_with_dict(&a);
562        stream.extend_from_slice(&compress_with_dict(&b));
563
564        let mut decoder =
565            StreamingDecoder::new_with_dictionary_bytes(stream.as_slice(), dict_raw).unwrap();
566        let mut out = Vec::new();
567        decoder
568            .read_to_end(&mut out)
569            .expect("both dict frames must decode with the forced dictionary");
570
571        let mut expected = a.clone();
572        expected.extend_from_slice(&b);
573        assert_eq!(out, expected);
574    }
575
576    /// A direct-path decode error must NOT leave non-decoded bytes in `output`.
577    /// The fast path resizes `output` to the declared content size before
578    /// decoding; if decode fails, the enlarged (zeroed) tail must be truncated
579    /// away so callers never observe bytes that were never decoded.
580    #[cfg(feature = "std")]
581    #[test]
582    fn read_to_end_truncates_output_on_direct_decode_error() {
583        use crate::encoding::{CompressionLevel, FrameCompressor};
584        use alloc::vec::Vec;
585
586        let payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
587        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
588        compressor.set_source(payload.as_slice());
589        let mut compressed = Vec::new();
590        compressor.set_drain(&mut compressed);
591        compressor.compress();
592        // Truncate the block bytes (the FCS-bearing header at the front stays
593        // intact) so the header parses but the direct-path block decode hits a
594        // premature end → error after `output` was already resized.
595        compressed.truncate(compressed.len() - 40);
596
597        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
598        let mut out = b"SENTINEL".to_vec();
599        let result = decoder.read_to_end(&mut out);
600        assert!(result.is_err(), "truncated block must fail the decode");
601        assert_eq!(
602            out, b"SENTINEL",
603            "failed direct decode must not append non-decoded bytes to output"
604        );
605    }
606
607    /// The mid-frame fallback grows `output` by `MAX_BLOCK_SIZE` before each
608    /// `self.read`. When that read errors (truncated current frame), the grown
609    /// zero-filled tail must be truncated away before the error propagates, so
610    /// the caller never observes `MAX_BLOCK_SIZE` worth of bytes that were never
611    /// decoded.
612    #[cfg(feature = "std")]
613    #[test]
614    fn read_to_end_truncates_output_on_midframe_fallback_error() {
615        use crate::encoding::{CompressionLevel, CompressionParameters, FrameCompressor};
616        use alloc::vec;
617        use alloc::vec::Vec;
618
619        // Incompressible payload with a window (128 KiB) SMALLER than the input,
620        // so the frame holds several blocks and bytes become collectable while
621        // the frame is still mid-decode. Without a sub-input window the decoder
622        // retains the whole input until the frame finishes, and a partial read
623        // could only ever finish or error, never leave a truncated remainder for
624        // the fallback to trip on.
625        let payload: Vec<u8> = (0..320_000u32)
626            .map(|i| (i.wrapping_mul(2654435761) >> 24) as u8)
627            .collect();
628        let params = CompressionParameters::builder(CompressionLevel::Default)
629            .window_log(17)
630            .build()
631            .expect("window_log within bounds");
632        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
633        compressor.set_parameters(&params);
634        compressor.set_source(payload.as_slice());
635        let mut compressed = Vec::new();
636        compressor.set_drain(&mut compressed);
637        compressor.compress();
638        // Truncate the tail so the final block decode fails partway through.
639        compressed.truncate(compressed.len() - 40);
640
641        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
642        // A partial `read` first: leaves the decoder mid-frame so `read_to_end`
643        // takes the grow-and-drain fallback (not the decode-in-place fast path).
644        let mut head = vec![0u8; 4096];
645        let got = decoder.read(&mut head).unwrap();
646        assert!(got > 0);
647
648        let mut out = Vec::new();
649        out.extend_from_slice(&head[..got]);
650        let result = decoder.read_to_end(&mut out);
651        assert!(
652            result.is_err(),
653            "truncated current frame must fail the decode"
654        );
655        assert!(
656            out.len() <= payload.len(),
657            "failed fallback read must not leave a zero-filled tail (len {} > payload {})",
658            out.len(),
659            payload.len()
660        );
661        assert_eq!(
662            out.as_slice(),
663            &payload[..out.len()],
664            "decoded prefix must match the payload, with no appended non-decoded bytes"
665        );
666    }
667
668    /// An empty (`Frame_Content_Size = 0`) frame decodes to nothing through the
669    /// `read_to_end` fast path — the declared-size validation accepts the valid
670    /// case (produced == 0) instead of erroring.
671    #[cfg(feature = "std")]
672    #[test]
673    fn read_to_end_empty_frame_decodes_to_empty() {
674        use crate::encoding::{CompressionLevel, compress_slice_to_vec};
675        use alloc::vec::Vec;
676
677        let compressed = compress_slice_to_vec(&[], CompressionLevel::Level(3));
678        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
679        let mut out = Vec::new();
680        decoder.read_to_end(&mut out).unwrap();
681        assert!(out.is_empty());
682    }
683}