zstd_framed/
reader.rs

1use std::io::BufRead as _;
2
3use crate::{buffer::Buffer, decoder::ZstdFramedDecoder, table::ZstdSeekTable};
4
5/// A reader that decompresses a zstd stream from an underlying reader.
6///
7/// The underyling reader `R` should implement the following traits:
8///
9/// - [`std::io::BufRead`] (required for [`std::io::Read`] and [`std::io::BufRead`] impls)
10/// - (Optional) [`std::io::Seek`] (required for [`std::io::Seek`] impl)
11///
12/// For async support, see [`crate::AsyncZstdReader`].
13///
14/// ## Construction
15///
16/// Create a builder using either [`ZstdReader::builder`] (recommended) or
17/// [`ZstdReader::builder_buffered`] (to use a custom buffer).
18/// See [`ZstdReaderBuilder`] for build options. Call
19/// [`ZstdReaderBuilder::build`] to build the [`ZstdReader`] instance.
20///
21/// ```
22/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
23/// # let compressed_file: &[u8] = &[];
24/// let reader = zstd_framed::ZstdReader::builder(compressed_file)
25///     // .with_seek_table(table) // Provide a seek table if available
26///     .build()?;
27/// # Ok(())
28/// # }
29/// ```
30///
31/// ## Buffering
32///
33/// The decompressed zstd output is always buffered internally. Since the
34/// reader must also implement [`std::io::BufRead`], the compressed input
35/// must also be buffered.
36///
37/// [`ZstdReader::builder`] will wrap any reader implmenting [`std::io::Read`]
38/// with a recommended buffer size for the input stream. For more control
39/// over how the input gets buffered, you can instead use
40/// [`ZstdReader::builder_buffered`].
41///
42/// ## Seeking
43///
44/// [`ZstdReader`] implements [`std::io::Seek`] as long as the underlying
45/// reader implements [`std::io::Seek`]. **By default, seeking within the
46/// stream will linearly decompress until reaching the target!**
47///
48/// Seeking can do a lot better when the underlying stream is broken up
49/// into multiple frames, such as a stream that uses the [zstd seekable format].
50/// You can create such a stream using [`ZstdWriterBuilder::with_seek_table`](crate::writer::ZstdWriterBuilder::with_seek_table).
51///
52/// There are two situations where seeking can take advantage of a seek
53/// table:
54///
55/// 1. When a seek table is provided up-front using [`ZstdReaderBuilder::with_seek_table`].
56///    See [`crate::table::read_seek_table`] for reading a seek table
57///    from a reader.
58/// 2. When rewinding to a previously-decompressed frame. Frame offsets are
59///    automatically recorded during decompression.
60///
61/// Even if a seek table is used, seeking will still need to rewind to
62/// the start of a frame, then decompress until reaching the target offset.
63///
64/// [zstd seekable format]: https://github.com/facebook/zstd/tree/51eb7daf39c8e8a7c338ba214a9d4e2a6a086826/contrib/seekable_format
65pub struct ZstdReader<'dict, R> {
66    reader: R,
67    decoder: ZstdFramedDecoder<'dict>,
68    buffer: crate::buffer::FixedBuffer<Vec<u8>>,
69    current_pos: u64,
70}
71
72impl<R> ZstdReader<'_, std::io::BufReader<R>> {
73    /// Create a new zstd reader that decompresses the zstd stream from
74    /// the underlying reader. The provided reader will be wrapped with
75    /// an appropriately-sized buffer.
76    pub fn builder(reader: R) -> ZstdReaderBuilder<std::io::BufReader<R>>
77    where
78        R: std::io::Read,
79    {
80        ZstdReaderBuilder::new(reader)
81    }
82
83    /// Create a new zstd reader that decompresses the zstd stream from
84    /// the underlying reader. The underlying reader must implement
85    /// [`std::io::BufRead`], and its buffer will be used directly. When in
86    /// doubt, use [`ZstdReader::builder`], which uses an appropriate
87    /// buffer size for decompressing a zstd stream.
88    pub fn builder_buffered(reader: R) -> ZstdReaderBuilder<R> {
89        ZstdReaderBuilder::with_buffered(reader)
90    }
91}
92
93impl<R> ZstdReader<'_, R> {
94    /// Jump forward, decoding and consuming `length` decompressed bytes
95    /// from the zstd stream.
96    fn jump_forward(&mut self, mut length: u64) -> std::io::Result<()>
97    where
98        R: std::io::BufRead,
99    {
100        while length > 0 {
101            // Decode some data from the underyling reader
102            let decoded = self.fill_buf()?;
103
104            // Return an error if we don't have any more data to decode
105            if decoded.is_empty() {
106                return Err(std::io::Error::new(
107                    std::io::ErrorKind::UnexpectedEof,
108                    "reached eof while trying to decode to offset",
109                ));
110            }
111
112            // Consume the data (up to the remaining length we should jump)
113            let decoded_len = u64::try_from(decoded.len()).map_err(std::io::Error::other)?;
114            let consumed_len = decoded_len.min(length);
115            let consumed_len_usize =
116                usize::try_from(consumed_len).map_err(std::io::Error::other)?;
117
118            self.consume(consumed_len_usize);
119
120            // Keep iterating until we've consumed enough to reach our target
121            length -= consumed_len;
122        }
123
124        Ok(())
125    }
126
127    /// Decode and consume the entire zstd stream until reaching the end.
128    /// Stops when reaching EOF (i.e. the underlying reader had no more data)
129    fn jump_to_end(&mut self) -> std::io::Result<()>
130    where
131        R: std::io::BufRead,
132    {
133        loop {
134            // Decode some data from the underlying reader
135            let decoded = self.fill_buf()?;
136
137            // If we didn't get any more data, we're done
138            if decoded.is_empty() {
139                break;
140            }
141
142            // Consume all the decoded data
143            let decoded_len = decoded.len();
144            self.consume(decoded_len);
145        }
146
147        Ok(())
148    }
149}
150
151impl<R> std::io::Read for ZstdReader<'_, R>
152where
153    R: std::io::BufRead,
154{
155    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
156        if buf.is_empty() {
157            return Ok(0);
158        }
159
160        // Decode some data from the underlying reader
161        let filled = self.fill_buf()?;
162
163        // Get some of the decoded data, capped to `buf`'s length
164        let consumable = filled.len().min(buf.len());
165
166        // Copy the decoded data to `buf`
167        buf[..consumable].copy_from_slice(&filled[..consumable]);
168
169        // Consume the copied data
170        self.consume(consumable);
171
172        Ok(consumable)
173    }
174}
175
176impl<R> std::io::BufRead for ZstdReader<'_, R>
177where
178    R: std::io::BufRead,
179{
180    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
181        loop {
182            // Check if our buffer contais any data we can return
183            if !self.buffer.uncommitted().is_empty() {
184                // If it does, we're done
185                break;
186            }
187
188            // Get some data from the underlying reader
189            let decodable = self.reader.fill_buf()?;
190            if decodable.is_empty() {
191                // If the underlying reader doesn't have any more data,
192                // then we're done
193                break;
194            }
195
196            // Decode the data, and write it to `self.buffer`
197            let consumed = self.decoder.decode(decodable, &mut self.buffer)?;
198
199            // Tell the underlying reader that we read the subset of
200            // data we decoded
201            self.reader.consume(consumed);
202        }
203
204        // Return all the data we have in `self.buffer`
205        Ok(self.buffer.uncommitted())
206    }
207
208    fn consume(&mut self, amt: usize) {
209        // Tell the buffer that we've committed the data that was consumed
210        self.buffer.commit(amt);
211
212        // Advance the reader's position
213        let amt_u64 = u64::try_from(amt).unwrap();
214        self.current_pos += amt_u64;
215    }
216}
217
218impl<R> std::io::Seek for ZstdReader<'_, R>
219where
220    R: std::io::BufRead + std::io::Seek,
221{
222    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
223        // Get the target position relative to the stream start
224        let target_pos = match pos {
225            std::io::SeekFrom::Start(offset) => {
226                // Position is already relative to the start
227                offset
228            }
229            std::io::SeekFrom::End(offset) => {
230                // Seek to the last position we know about in the stream.
231                let seek = self.decoder.prepare_seek_to_last_known_pos();
232
233                if let Some(frame) = seek.seek_to_frame_start {
234                    // We need to seek to the start of a frame
235
236                    // Seek the underlying reader
237                    self.reader
238                        .seek(std::io::SeekFrom::Start(frame.compressed_pos))?;
239
240                    // Update the decoder based on what frame we're now at
241                    self.decoder.seeked_to_frame(frame)?;
242
243                    // Update our internal position to align with the start of the frame
244                    self.current_pos = frame.decompressed_pos;
245
246                    // Clear the buffer
247                    self.buffer.clear();
248                }
249
250                // Jump to the end of the stream
251                self.jump_to_end()?;
252
253                // The current position is now at the end, so
254                // add the end offset
255                self.current_pos
256                    .checked_add_signed(offset)
257                    .ok_or_else(|| std::io::Error::other("invalid seek offset"))?
258            }
259            std::io::SeekFrom::Current(offset) => {
260                // Add the offset to the current position
261                self.current_pos
262                    .checked_add_signed(offset)
263                    .ok_or_else(|| std::io::Error::other("invalid seek offset"))?
264            }
265        };
266
267        // Consume any leftover data in `self.buffer`. This ensures that
268        // the current position is in line with the underlying decoder
269        self.consume(self.buffer.uncommitted().len());
270
271        // Determine what we need to do to reach the target position
272        let seek = self.decoder.prepare_seek_to_decompressed_pos(target_pos);
273
274        if let Some(frame) = seek.seek_to_frame_start {
275            // We need to seek to the start of a frame
276
277            // Seek the underlying reader
278            self.reader
279                .seek(std::io::SeekFrom::Start(frame.compressed_pos))?;
280
281            // Update the decoder based on what frame we're now at
282            self.decoder.seeked_to_frame(frame)?;
283
284            // Update our internal position to align with the start of the frame
285            self.current_pos = frame.decompressed_pos;
286        }
287
288        // Seek the remaining distance (if any) to reach the target position
289        self.jump_forward(seek.decompress_len)?;
290
291        assert_eq!(self.current_pos, target_pos);
292        Ok(self.current_pos)
293    }
294}
295
296/// A builder that builds a [`ZstdReader`] from the provided reader.
297pub struct ZstdReaderBuilder<R> {
298    reader: R,
299    table: ZstdSeekTable,
300}
301
302impl<R> ZstdReaderBuilder<std::io::BufReader<R>> {
303    fn new(reader: R) -> Self
304    where
305        R: std::io::Read,
306    {
307        let reader = std::io::BufReader::with_capacity(zstd::zstd_safe::DCtx::in_size(), reader);
308        ZstdReaderBuilder::with_buffered(reader)
309    }
310}
311
312impl<R> ZstdReaderBuilder<R> {
313    fn with_buffered(reader: R) -> Self {
314        ZstdReaderBuilder {
315            reader,
316            table: ZstdSeekTable::empty(),
317        }
318    }
319
320    /// Use the given seek table when seeking the resulting reader. This can
321    /// greatly speed up seek operations when using a zstd stream that
322    /// uses the [zstd seekable format].
323    ///
324    /// See [`crate::table::read_seek_table`] for reading a seek table.
325    ///
326    /// [zstd seekable format]: https://github.com/facebook/zstd/tree/51eb7daf39c8e8a7c338ba214a9d4e2a6a086826/contrib/seekable_format
327    pub fn with_seek_table(mut self, table: ZstdSeekTable) -> Self {
328        self.table = table;
329        self
330    }
331
332    /// Build the reader.
333    pub fn build(self) -> std::io::Result<ZstdReader<'static, R>> {
334        let zstd_decoder = zstd::stream::raw::Decoder::new()?;
335        let buffer = crate::buffer::FixedBuffer::new(vec![0; zstd::zstd_safe::DCtx::out_size()]);
336        let decoder = ZstdFramedDecoder::new(zstd_decoder, self.table);
337
338        Ok(ZstdReader {
339            reader: self.reader,
340            decoder,
341            buffer,
342            current_pos: 0,
343        })
344    }
345}