Skip to main content

structured_zstd/decoding/
streaming_decoder.rs

1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::decoding::errors::FrameDecoderError;
6use crate::decoding::{BlockDecodingStrategy, DictionaryHandle, FrameDecoder};
7#[cfg(not(feature = "std"))]
8use crate::io::ErrorKind;
9use crate::io::{Error, Read};
10
11/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
12///
13/// This decoder implements `io::Read`, so you can interact with it by calling
14/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
15///
16/// If you need more control over how decompression takes place, you can use
17/// the lower level [FrameDecoder], which allows for greater control over how
18/// decompression takes place but the implementor must call
19/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
20///
21/// ## Caveat
22/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
23/// yet the specification states that a single archive may contain multiple frames.
24///
25/// To decode all the frames in a finite stream, the calling code needs to recreate
26/// the instance of the decoder and handle
27/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
28/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
29///
30/// ```no_run
31/// // `read_to_end` is not implemented by the no_std implementation.
32/// #[cfg(feature = "std")]
33/// {
34///     use std::fs::File;
35///     use std::io::Read;
36///     use structured_zstd::decoding::StreamingDecoder;
37///
38///     // Read a Zstandard archive from the filesystem then decompress it into a vec.
39///     let mut f: File = todo!("Read a .zstd archive from somewhere");
40///     let mut decoder = StreamingDecoder::new(f).unwrap();
41///     let mut result = Vec::new();
42///     Read::read_to_end(&mut decoder, &mut result).unwrap();
43/// }
44/// ```
45pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
46    pub decoder: DEC,
47    source: READ,
48}
49
50impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
51    pub fn new_with_decoder(
52        mut source: READ,
53        mut decoder: DEC,
54    ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
55        decoder.borrow_mut().init(&mut source)?;
56        Ok(StreamingDecoder { decoder, source })
57    }
58}
59
60impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
61    pub fn new(
62        mut source: READ,
63    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
64        let mut decoder = FrameDecoder::new();
65        decoder.init(&mut source)?;
66        Ok(StreamingDecoder { decoder, source })
67    }
68
69    /// Create a streaming decoder using a pre-parsed dictionary handle.
70    ///
71    /// # Warning
72    ///
73    /// This constructor initializes the underlying [`FrameDecoder`] with
74    /// `dict`, even if a frame header omits the optional dictionary ID.
75    /// Callers must only use it when they already know the stream was encoded
76    /// with this dictionary; otherwise decoded output can be silently
77    /// corrupted.
78    pub fn new_with_dictionary_handle(
79        mut source: READ,
80        dict: &DictionaryHandle,
81    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
82        let mut decoder = FrameDecoder::new();
83        decoder.init_with_dict_handle(&mut source, dict)?;
84        Ok(StreamingDecoder { decoder, source })
85    }
86
87    /// Create a streaming decoder using a serialized dictionary blob.
88    ///
89    /// # Warning
90    ///
91    /// This API forwards to [`StreamingDecoder::new_with_dictionary_handle`]
92    /// and therefore applies the decoded dictionary to frames whose headers may
93    /// omit the optional dictionary ID. Only use it when the stream is known to
94    /// be encoded with that dictionary.
95    pub fn new_with_dictionary_bytes(
96        source: READ,
97        raw_dictionary: &[u8],
98    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
99        let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
100        Self::new_with_dictionary_handle(source, &dict)
101    }
102}
103
104impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
105    /// Gets a reference to the underlying reader.
106    pub fn get_ref(&self) -> &READ {
107        &self.source
108    }
109
110    /// Gets a mutable reference to the underlying reader.
111    ///
112    /// It is inadvisable to directly read from the underlying reader.
113    pub fn get_mut(&mut self) -> &mut READ {
114        &mut self.source
115    }
116
117    /// Destructures this object into the inner reader.
118    pub fn into_inner(self) -> READ
119    where
120        READ: Sized,
121    {
122        self.source
123    }
124
125    /// Destructures this object into both the inner reader and [FrameDecoder].
126    pub fn into_parts(self) -> (READ, DEC)
127    where
128        READ: Sized,
129    {
130        (self.source, self.decoder)
131    }
132
133    /// Destructures this object into the inner [FrameDecoder].
134    pub fn into_frame_decoder(self) -> DEC {
135        self.decoder
136    }
137}
138
139impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
140    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
141        let decoder = self.decoder.borrow_mut();
142        if decoder.is_finished() && decoder.can_collect() == 0 {
143            //No more bytes can ever be decoded
144            return Ok(0);
145        }
146
147        // need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
148        // The first few calls can result in just filling the decode buffer but these bytes can not be collected.
149        // So we need to call this until we can actually collect enough bytes
150
151        // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
152        while decoder.can_collect() < buf.len() && !decoder.is_finished() {
153            //More bytes can be decoded
154            let additional_bytes_needed = buf.len() - decoder.can_collect();
155            match decoder.decode_blocks(
156                &mut self.source,
157                BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
158            ) {
159                Ok(_) => { /*Nothing to do*/ }
160                Err(e) => {
161                    let err;
162                    #[cfg(feature = "std")]
163                    {
164                        err = Error::other(e);
165                    }
166                    #[cfg(not(feature = "std"))]
167                    {
168                        err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
169                    }
170                    return Err(err);
171                }
172            }
173        }
174
175        decoder.read(buf)
176    }
177}