Skip to main content

structured_zstd/decoding/
streaming_decoder.rs

1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::common::MAX_BLOCK_SIZE;
6use crate::decoding::errors::FrameDecoderError;
7use crate::decoding::{BlockDecodingStrategy, DictionaryHandle, FrameDecoder};
8#[cfg(not(feature = "std"))]
9use crate::io::ErrorKind;
10use crate::io::{Error, Read};
11
12/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
13///
14/// This decoder implements `io::Read`, so you can interact with it by calling
15/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
16///
17/// If you need more control over how decompression takes place, you can use
18/// the lower level [FrameDecoder], which allows for greater control over how
19/// decompression takes place but the implementor must call
20/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
21///
22/// ## Caveat
23/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
24/// yet the specification states that a single archive may contain multiple frames.
25///
26/// To decode all the frames in a finite stream, the calling code needs to recreate
27/// the instance of the decoder and handle
28/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
29/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
30///
31/// ```no_run
32/// // `read_to_end` is not implemented by the no_std implementation.
33/// #[cfg(feature = "std")]
34/// {
35///     use std::fs::File;
36///     use std::io::Read;
37///     use structured_zstd::decoding::StreamingDecoder;
38///
39///     // Read a Zstandard archive from the filesystem then decompress it into a vec.
40///     let mut f: File = todo!("Read a .zstd archive from somewhere");
41///     let mut decoder = StreamingDecoder::new(f).unwrap();
42///     let mut result = Vec::new();
43///     Read::read_to_end(&mut decoder, &mut result).unwrap();
44/// }
45/// ```
46pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
47    pub decoder: DEC,
48    source: READ,
49}
50
51impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
52    pub fn new_with_decoder(
53        mut source: READ,
54        mut decoder: DEC,
55    ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
56        decoder.borrow_mut().init(&mut source)?;
57        Ok(StreamingDecoder { decoder, source })
58    }
59}
60
61impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
62    pub fn new(
63        mut source: READ,
64    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
65        let mut decoder = FrameDecoder::new();
66        decoder.init(&mut source)?;
67        Ok(StreamingDecoder { decoder, source })
68    }
69
70    /// Create a streaming decoder using a pre-parsed dictionary handle.
71    ///
72    /// # Warning
73    ///
74    /// This constructor initializes the underlying [`FrameDecoder`] with
75    /// `dict`, even if a frame header omits the optional dictionary ID.
76    /// Callers must only use it when they already know the stream was encoded
77    /// with this dictionary; otherwise decoded output can be silently
78    /// corrupted.
79    pub fn new_with_dictionary_handle(
80        mut source: READ,
81        dict: &DictionaryHandle,
82    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
83        let mut decoder = FrameDecoder::new();
84        decoder.init_with_dict_handle(&mut source, dict)?;
85        Ok(StreamingDecoder { decoder, source })
86    }
87
88    /// Create a streaming decoder using a serialized dictionary blob.
89    ///
90    /// # Warning
91    ///
92    /// This API forwards to [`StreamingDecoder::new_with_dictionary_handle`]
93    /// and therefore applies the decoded dictionary to frames whose headers may
94    /// omit the optional dictionary ID. Only use it when the stream is known to
95    /// be encoded with that dictionary.
96    pub fn new_with_dictionary_bytes(
97        source: READ,
98        raw_dictionary: &[u8],
99    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
100        let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
101        Self::new_with_dictionary_handle(source, &dict)
102    }
103}
104
105impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
106    /// Gets a reference to the underlying reader.
107    pub fn get_ref(&self) -> &READ {
108        &self.source
109    }
110
111    /// Gets a mutable reference to the underlying reader.
112    ///
113    /// It is inadvisable to directly read from the underlying reader.
114    pub fn get_mut(&mut self) -> &mut READ {
115        &mut self.source
116    }
117
118    /// Destructures this object into the inner reader.
119    pub fn into_inner(self) -> READ
120    where
121        READ: Sized,
122    {
123        self.source
124    }
125
126    /// Destructures this object into both the inner reader and [FrameDecoder].
127    pub fn into_parts(self) -> (READ, DEC)
128    where
129        READ: Sized,
130    {
131        (self.source, self.decoder)
132    }
133
134    /// Destructures this object into the inner [FrameDecoder].
135    pub fn into_frame_decoder(self) -> DEC {
136        self.decoder
137    }
138}
139
140impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
141    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
142        let decoder = self.decoder.borrow_mut();
143        if decoder.is_finished() && decoder.can_collect() == 0 {
144            // Frame fully decoded and fully drained: the running XXH64 digest
145            // is final, so a `Verify`-mode decoder validates the content
146            // checksum at this finish point. No-op in other modes.
147            #[cfg(feature = "hash")]
148            if let Err(e) = decoder.verify_content_checksum() {
149                #[cfg(feature = "std")]
150                return Err(Error::other(e));
151                #[cfg(not(feature = "std"))]
152                return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
153            }
154            //No more bytes can ever be decoded
155            return Ok(0);
156        }
157
158        // Interleave bounded decode with draining so the decode window
159        // (`RingBuffer`) stays near `window_size` instead of accumulating the
160        // whole request before a single end-of-call drain. `read_to_end` hands
161        // ever-larger buffers; decoding `buf.len()` worth into the ring up
162        // front grew it far past the window (repeated `reserve_amortized`
163        // alloc+copy). Decode at most one block worth per step, then drain
164        // what is now collectable into `buf`, mirroring the donor's
165        // window-bounded flush loop.
166        let mut written = 0;
167        while written < buf.len() {
168            // Drain whatever is collectable now (retaining `window_size` until
169            // the frame finishes). Reclaims the ring promptly so the next
170            // decode step reuses the same capacity.
171            written += decoder.read(&mut buf[written..])?;
172            if written == buf.len() || decoder.is_finished() {
173                break;
174            }
175            // Decode one bounded chunk. `UptoBytes` may overshoot a little but
176            // is capped to one block, so the ring's live region stays within
177            // `window_size + MAX_BLOCK_SIZE`.
178            let step = (buf.len() - written).min(MAX_BLOCK_SIZE as usize);
179            if let Err(e) =
180                decoder.decode_blocks(&mut self.source, BlockDecodingStrategy::UptoBytes(step))
181            {
182                #[cfg(feature = "std")]
183                {
184                    return Err(Error::other(e));
185                }
186                #[cfg(not(feature = "std"))]
187                {
188                    return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
189                }
190            }
191        }
192
193        // The loop can finish AND fully drain a frame within this same call
194        // (decode last block, then drain it into `buf`). Validate here too when
195        // the frame is finished and nothing is left to collect, but ONLY when
196        // this call wrote no bytes: the `Read` contract forbids returning `Err`
197        // after bytes were delivered, so when `written > 0` the verify is
198        // deferred to the next call, where the top early-return runs it and
199        // returns `Err` on the zero-byte path. Idempotent with that top check.
200        #[cfg(feature = "hash")]
201        if written == 0
202            && decoder.is_finished()
203            && decoder.can_collect() == 0
204            && let Err(e) = decoder.verify_content_checksum()
205        {
206            #[cfg(feature = "std")]
207            return Err(Error::other(e));
208            #[cfg(not(feature = "std"))]
209            return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
210        }
211
212        Ok(written)
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::StreamingDecoder;
219    use crate::io::Read;
220
221    /// `Read::read` must not return `Err` after it has already written bytes
222    /// into the caller's buffer (the trait mandates that an error implies no
223    /// bytes were read). When a single `read` call both drains the final bytes
224    /// of a `Verify`-mode frame AND finishes it, a checksum mismatch must be
225    /// deferred: those bytes are delivered as `Ok(n)` and the error surfaces on
226    /// the next (zero-byte) call, where returning `Err` violates no contract.
227    #[cfg(feature = "hash")]
228    #[test]
229    fn read_delivering_bytes_defers_checksum_error_to_next_call() {
230        use crate::decoding::ContentChecksum;
231        use crate::encoding::{CompressionLevel, FrameCompressor};
232        use crate::io::ErrorKind;
233        use alloc::vec;
234        use alloc::vec::Vec;
235
236        let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
237        let mut compressor = FrameCompressor::new(CompressionLevel::Default);
238        // Checksum is the subject under test; the encoder default is off
239        // (upstream library parity).
240        compressor.set_content_checksum(true);
241        compressor.set_source(payload.as_slice());
242        let mut compressed = Vec::new();
243        compressor.set_drain(&mut compressed);
244        compressor.compress();
245
246        // Corrupt the trailing 4-byte content checksum: the body still decodes
247        // to the right bytes, but the stored digest no longer matches.
248        let last = compressed.len() - 1;
249        compressed[last] ^= 0xFF;
250
251        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
252        decoder
253            .decoder
254            .set_content_checksum(ContentChecksum::Verify);
255
256        // A buffer large enough to drain the whole frame in one call: this call
257        // finishes the frame AND writes every payload byte. The mismatch must
258        // NOT abort it (that would drop the delivered bytes).
259        let mut buf = vec![0u8; payload.len() + 4096];
260        let n = decoder
261            .read(&mut buf)
262            .expect("a read that delivered bytes must not return the checksum Err");
263        assert_eq!(n, payload.len());
264        assert_eq!(&buf[..n], payload.as_slice());
265
266        // The deferred mismatch surfaces on the terminating zero-byte read.
267        let err = decoder
268            .read(&mut buf)
269            .expect_err("deferred checksum mismatch must surface on the terminating read");
270        assert_eq!(err.kind(), ErrorKind::Other);
271    }
272}