zstd_framed/reader.rs
1use std::io::BufRead as _;
2
3use crate::{buffer::Buffer, decoder::ZstdFramedDecoder, table::ZstdSeekTable};
4
5/// A reader that decompresses a zstd stream from an underlying reader.
6///
7/// The underyling reader `R` should implement the following traits:
8///
9/// - [`std::io::BufRead`] (required for [`std::io::Read`] and [`std::io::BufRead`] impls)
10/// - (Optional) [`std::io::Seek`] (required for [`std::io::Seek`] impl)
11///
12/// For async support, see [`crate::AsyncZstdReader`].
13///
14/// ## Construction
15///
16/// Create a builder using either [`ZstdReader::builder`] (recommended) or
17/// [`ZstdReader::builder_buffered`] (to use a custom buffer).
18/// See [`ZstdReaderBuilder`] for build options. Call
19/// [`ZstdReaderBuilder::build`] to build the [`ZstdReader`] instance.
20///
21/// ```
22/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
23/// # let compressed_file: &[u8] = &[];
24/// let reader = zstd_framed::ZstdReader::builder(compressed_file)
25/// // .with_seek_table(table) // Provide a seek table if available
26/// .build()?;
27/// # Ok(())
28/// # }
29/// ```
30///
31/// ## Buffering
32///
33/// The decompressed zstd output is always buffered internally. Since the
34/// reader must also implement [`std::io::BufRead`], the compressed input
35/// must also be buffered.
36///
37/// [`ZstdReader::builder`] will wrap any reader implmenting [`std::io::Read`]
38/// with a recommended buffer size for the input stream. For more control
39/// over how the input gets buffered, you can instead use
40/// [`ZstdReader::builder_buffered`].
41///
42/// ## Seeking
43///
44/// [`ZstdReader`] implements [`std::io::Seek`] as long as the underlying
45/// reader implements [`std::io::Seek`]. **By default, seeking within the
46/// stream will linearly decompress until reaching the target!**
47///
48/// Seeking can do a lot better when the underlying stream is broken up
49/// into multiple frames, such as a stream that uses the [zstd seekable format].
50/// You can create such a stream using [`ZstdWriterBuilder::with_seek_table`](crate::writer::ZstdWriterBuilder::with_seek_table).
51///
52/// There are two situations where seeking can take advantage of a seek
53/// table:
54///
55/// 1. When a seek table is provided up-front using [`ZstdReaderBuilder::with_seek_table`].
56/// See [`crate::table::read_seek_table`] for reading a seek table
57/// from a reader.
58/// 2. When rewinding to a previously-decompressed frame. Frame offsets are
59/// automatically recorded during decompression.
60///
61/// Even if a seek table is used, seeking will still need to rewind to
62/// the start of a frame, then decompress until reaching the target offset.
63///
64/// [zstd seekable format]: https://github.com/facebook/zstd/tree/51eb7daf39c8e8a7c338ba214a9d4e2a6a086826/contrib/seekable_format
65pub struct ZstdReader<'dict, R> {
66 reader: R,
67 decoder: ZstdFramedDecoder<'dict>,
68 buffer: crate::buffer::FixedBuffer<Vec<u8>>,
69 current_pos: u64,
70}
71
72impl<R> ZstdReader<'_, std::io::BufReader<R>> {
73 /// Create a new zstd reader that decompresses the zstd stream from
74 /// the underlying reader. The provided reader will be wrapped with
75 /// an appropriately-sized buffer.
76 pub fn builder(reader: R) -> ZstdReaderBuilder<std::io::BufReader<R>>
77 where
78 R: std::io::Read,
79 {
80 ZstdReaderBuilder::new(reader)
81 }
82
83 /// Create a new zstd reader that decompresses the zstd stream from
84 /// the underlying reader. The underlying reader must implement
85 /// [`std::io::BufRead`], and its buffer will be used directly. When in
86 /// doubt, use [`ZstdReader::builder`], which uses an appropriate
87 /// buffer size for decompressing a zstd stream.
88 pub fn builder_buffered(reader: R) -> ZstdReaderBuilder<R> {
89 ZstdReaderBuilder::with_buffered(reader)
90 }
91}
92
93impl<R> ZstdReader<'_, R> {
94 /// Jump forward, decoding and consuming `length` decompressed bytes
95 /// from the zstd stream.
96 fn jump_forward(&mut self, mut length: u64) -> std::io::Result<()>
97 where
98 R: std::io::BufRead,
99 {
100 while length > 0 {
101 // Decode some data from the underyling reader
102 let decoded = self.fill_buf()?;
103
104 // Return an error if we don't have any more data to decode
105 if decoded.is_empty() {
106 return Err(std::io::Error::new(
107 std::io::ErrorKind::UnexpectedEof,
108 "reached eof while trying to decode to offset",
109 ));
110 }
111
112 // Consume the data (up to the remaining length we should jump)
113 let decoded_len = u64::try_from(decoded.len()).map_err(std::io::Error::other)?;
114 let consumed_len = decoded_len.min(length);
115 let consumed_len_usize =
116 usize::try_from(consumed_len).map_err(std::io::Error::other)?;
117
118 self.consume(consumed_len_usize);
119
120 // Keep iterating until we've consumed enough to reach our target
121 length -= consumed_len;
122 }
123
124 Ok(())
125 }
126
127 /// Decode and consume the entire zstd stream until reaching the end.
128 /// Stops when reaching EOF (i.e. the underlying reader had no more data)
129 fn jump_to_end(&mut self) -> std::io::Result<()>
130 where
131 R: std::io::BufRead,
132 {
133 loop {
134 // Decode some data from the underlying reader
135 let decoded = self.fill_buf()?;
136
137 // If we didn't get any more data, we're done
138 if decoded.is_empty() {
139 break;
140 }
141
142 // Consume all the decoded data
143 let decoded_len = decoded.len();
144 self.consume(decoded_len);
145 }
146
147 Ok(())
148 }
149}
150
151impl<R> std::io::Read for ZstdReader<'_, R>
152where
153 R: std::io::BufRead,
154{
155 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
156 if buf.is_empty() {
157 return Ok(0);
158 }
159
160 // Decode some data from the underlying reader
161 let filled = self.fill_buf()?;
162
163 // Get some of the decoded data, capped to `buf`'s length
164 let consumable = filled.len().min(buf.len());
165
166 // Copy the decoded data to `buf`
167 buf[..consumable].copy_from_slice(&filled[..consumable]);
168
169 // Consume the copied data
170 self.consume(consumable);
171
172 Ok(consumable)
173 }
174}
175
176impl<R> std::io::BufRead for ZstdReader<'_, R>
177where
178 R: std::io::BufRead,
179{
180 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
181 loop {
182 // Check if our buffer contais any data we can return
183 if !self.buffer.uncommitted().is_empty() {
184 // If it does, we're done
185 break;
186 }
187
188 // Get some data from the underlying reader
189 let decodable = self.reader.fill_buf()?;
190 if decodable.is_empty() {
191 // If the underlying reader doesn't have any more data,
192 // then we're done
193 break;
194 }
195
196 // Decode the data, and write it to `self.buffer`
197 let consumed = self.decoder.decode(decodable, &mut self.buffer)?;
198
199 // Tell the underlying reader that we read the subset of
200 // data we decoded
201 self.reader.consume(consumed);
202 }
203
204 // Return all the data we have in `self.buffer`
205 Ok(self.buffer.uncommitted())
206 }
207
208 fn consume(&mut self, amt: usize) {
209 // Tell the buffer that we've committed the data that was consumed
210 self.buffer.commit(amt);
211
212 // Advance the reader's position
213 let amt_u64 = u64::try_from(amt).unwrap();
214 self.current_pos += amt_u64;
215 }
216}
217
218impl<R> std::io::Seek for ZstdReader<'_, R>
219where
220 R: std::io::BufRead + std::io::Seek,
221{
222 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
223 // Get the target position relative to the stream start
224 let target_pos = match pos {
225 std::io::SeekFrom::Start(offset) => {
226 // Position is already relative to the start
227 offset
228 }
229 std::io::SeekFrom::End(offset) => {
230 // Seek to the last position we know about in the stream.
231 let seek = self.decoder.prepare_seek_to_last_known_pos();
232
233 if let Some(frame) = seek.seek_to_frame_start {
234 // We need to seek to the start of a frame
235
236 // Seek the underlying reader
237 self.reader
238 .seek(std::io::SeekFrom::Start(frame.compressed_pos))?;
239
240 // Update the decoder based on what frame we're now at
241 self.decoder.seeked_to_frame(frame)?;
242
243 // Update our internal position to align with the start of the frame
244 self.current_pos = frame.decompressed_pos;
245
246 // Clear the buffer
247 self.buffer.clear();
248 }
249
250 // Jump to the end of the stream
251 self.jump_to_end()?;
252
253 // The current position is now at the end, so
254 // add the end offset
255 self.current_pos
256 .checked_add_signed(offset)
257 .ok_or_else(|| std::io::Error::other("invalid seek offset"))?
258 }
259 std::io::SeekFrom::Current(offset) => {
260 // Add the offset to the current position
261 self.current_pos
262 .checked_add_signed(offset)
263 .ok_or_else(|| std::io::Error::other("invalid seek offset"))?
264 }
265 };
266
267 // Consume any leftover data in `self.buffer`. This ensures that
268 // the current position is in line with the underlying decoder
269 self.consume(self.buffer.uncommitted().len());
270
271 // Determine what we need to do to reach the target position
272 let seek = self.decoder.prepare_seek_to_decompressed_pos(target_pos);
273
274 if let Some(frame) = seek.seek_to_frame_start {
275 // We need to seek to the start of a frame
276
277 // Seek the underlying reader
278 self.reader
279 .seek(std::io::SeekFrom::Start(frame.compressed_pos))?;
280
281 // Update the decoder based on what frame we're now at
282 self.decoder.seeked_to_frame(frame)?;
283
284 // Update our internal position to align with the start of the frame
285 self.current_pos = frame.decompressed_pos;
286 }
287
288 // Seek the remaining distance (if any) to reach the target position
289 self.jump_forward(seek.decompress_len)?;
290
291 assert_eq!(self.current_pos, target_pos);
292 Ok(self.current_pos)
293 }
294}
295
296/// A builder that builds a [`ZstdReader`] from the provided reader.
297pub struct ZstdReaderBuilder<R> {
298 reader: R,
299 table: ZstdSeekTable,
300}
301
302impl<R> ZstdReaderBuilder<std::io::BufReader<R>> {
303 fn new(reader: R) -> Self
304 where
305 R: std::io::Read,
306 {
307 let reader = std::io::BufReader::with_capacity(zstd::zstd_safe::DCtx::in_size(), reader);
308 ZstdReaderBuilder::with_buffered(reader)
309 }
310}
311
312impl<R> ZstdReaderBuilder<R> {
313 fn with_buffered(reader: R) -> Self {
314 ZstdReaderBuilder {
315 reader,
316 table: ZstdSeekTable::empty(),
317 }
318 }
319
320 /// Use the given seek table when seeking the resulting reader. This can
321 /// greatly speed up seek operations when using a zstd stream that
322 /// uses the [zstd seekable format].
323 ///
324 /// See [`crate::table::read_seek_table`] for reading a seek table.
325 ///
326 /// [zstd seekable format]: https://github.com/facebook/zstd/tree/51eb7daf39c8e8a7c338ba214a9d4e2a6a086826/contrib/seekable_format
327 pub fn with_seek_table(mut self, table: ZstdSeekTable) -> Self {
328 self.table = table;
329 self
330 }
331
332 /// Build the reader.
333 pub fn build(self) -> std::io::Result<ZstdReader<'static, R>> {
334 let zstd_decoder = zstd::stream::raw::Decoder::new()?;
335 let buffer = crate::buffer::FixedBuffer::new(vec![0; zstd::zstd_safe::DCtx::out_size()]);
336 let decoder = ZstdFramedDecoder::new(zstd_decoder, self.table);
337
338 Ok(ZstdReader {
339 reader: self.reader,
340 decoder,
341 buffer,
342 current_pos: 0,
343 })
344 }
345}