structured_zstd/decoding/frame_decoder.rs
1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::dictionary::Dictionary;
10use crate::decoding::errors::FrameDecoderError;
11use crate::decoding::scratch::DecoderScratch;
12use crate::io::{Error, Read, Write};
13use alloc::collections::BTreeMap;
14use alloc::vec::Vec;
15use core::convert::TryInto;
16
17use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
18
19/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
20///
21/// This decoder is able to decode frames only partially and gives control
22/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
23/// It reads bytes as needed from a provided source and can be read from to collect partial results.
24///
25/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
26/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
27///
28/// Workflow is as follows:
29/// ```
30/// use structured_zstd::decoding::BlockDecodingStrategy;
31///
32/// # #[cfg(feature = "std")]
33/// use std::io::{Read, Write};
34///
35/// // no_std environments can use the crate's own Read traits
36/// # #[cfg(not(feature = "std"))]
37/// use structured_zstd::io::{Read, Write};
38///
39/// fn decode_this(mut file: impl Read) {
40/// //Create a new decoder
41/// let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
42/// let mut result = Vec::new();
43///
44/// // Use reset or init to make the decoder ready to decode the frame from the io::Read
45/// frame_dec.reset(&mut file).unwrap();
46///
47/// // Loop until the frame has been decoded completely
48/// while !frame_dec.is_finished() {
49/// // decode (roughly) batch_size many bytes
50/// frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
51///
52/// // read from the decoder to collect bytes from the internal buffer
53/// let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
54///
55/// // then do something with it
56/// do_something(&result[0..bytes_read]);
57/// }
58///
59/// // handle the last chunk of data
60/// while frame_dec.can_collect() > 0 {
61/// let x = frame_dec.read(result.as_mut_slice()).unwrap();
62///
63/// do_something(&result[0..x]);
64/// }
65/// }
66///
67/// fn do_something(data: &[u8]) {
68/// # #[cfg(feature = "std")]
69/// std::io::stdout().write_all(data).unwrap();
70/// }
71/// ```
72pub struct FrameDecoder {
73 state: Option<FrameDecoderState>,
74 dicts: BTreeMap<u32, Dictionary>,
75}
76
77struct FrameDecoderState {
78 pub frame_header: frame::FrameHeader,
79 decoder_scratch: DecoderScratch,
80 frame_finished: bool,
81 block_counter: usize,
82 bytes_read_counter: u64,
83 check_sum: Option<u32>,
84 using_dict: Option<u32>,
85}
86
87pub enum BlockDecodingStrategy {
88 All,
89 UptoBlocks(usize),
90 UptoBytes(usize),
91}
92
93impl FrameDecoderState {
94 /// Read the frame header from `source` and create a new decoder state.
95 ///
96 /// Pre-allocates the decode buffer to `window_size` so the first block
97 /// does not trigger incremental growth from zero capacity.
98 pub fn new(source: impl Read) -> Result<FrameDecoderState, FrameDecoderError> {
99 let (frame, header_size) = frame::read_frame_header(source)?;
100 let window_size = frame.window_size()?;
101
102 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
103 return Err(FrameDecoderError::WindowSizeTooBig {
104 requested: window_size,
105 });
106 }
107
108 let mut decoder_scratch = DecoderScratch::new(window_size as usize);
109 decoder_scratch.buffer.reserve(window_size as usize);
110 Ok(FrameDecoderState {
111 frame_header: frame,
112 frame_finished: false,
113 block_counter: 0,
114 decoder_scratch,
115 bytes_read_counter: u64::from(header_size),
116 check_sum: None,
117 using_dict: None,
118 })
119 }
120
121 /// Reset this state for a new frame read from `source`, reusing existing allocations.
122 ///
123 /// `DecodeBuffer::reset` reserves `window_size` internally, so no
124 /// additional frame-level reservation is needed here. Further buffer
125 /// growth during decoding is performed on demand by the active block path.
126 pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
127 let (frame_header, header_size) = frame::read_frame_header(source)?;
128 let window_size = frame_header.window_size()?;
129
130 if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
131 return Err(FrameDecoderError::WindowSizeTooBig {
132 requested: window_size,
133 });
134 }
135
136 self.frame_header = frame_header;
137 self.frame_finished = false;
138 self.block_counter = 0;
139 self.decoder_scratch.reset(window_size as usize);
140 self.bytes_read_counter = u64::from(header_size);
141 self.check_sum = None;
142 self.using_dict = None;
143 Ok(())
144 }
145}
146
147impl Default for FrameDecoder {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153impl FrameDecoder {
154 /// This will create a new decoder without allocating anything yet.
155 /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
156 /// else they just reset these buffers with not further allocations
157 pub fn new() -> FrameDecoder {
158 FrameDecoder {
159 state: None,
160 dicts: BTreeMap::new(),
161 }
162 }
163
164 /// init() will allocate all needed buffers if it is the first time this decoder is used
165 /// else they just reset these buffers with not further allocations
166 ///
167 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
168 ///
169 /// equivalent to reset()
170 pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
171 self.reset(source)
172 }
173
174 /// reset() will allocate all needed buffers if it is the first time this decoder is used
175 /// else they just reset these buffers with not further allocations
176 ///
177 /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
178 ///
179 /// equivalent to init()
180 pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
181 use FrameDecoderError as err;
182 let state = match &mut self.state {
183 Some(s) => {
184 s.reset(source)?;
185 s
186 }
187 None => {
188 self.state = Some(FrameDecoderState::new(source)?);
189 self.state.as_mut().unwrap()
190 }
191 };
192 if let Some(dict_id) = state.frame_header.dictionary_id() {
193 let dict = self
194 .dicts
195 .get(&dict_id)
196 .ok_or(err::DictNotProvided { dict_id })?;
197 state.decoder_scratch.init_from_dict(dict);
198 state.using_dict = Some(dict_id);
199 }
200 Ok(())
201 }
202
203 /// Add a dict to the FrameDecoder that can be used when needed. The FrameDecoder uses the appropriate one dynamically
204 pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
205 self.dicts.insert(dict.id, dict);
206 Ok(())
207 }
208
209 pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
210 use FrameDecoderError as err;
211 let Some(state) = self.state.as_mut() else {
212 return Err(err::NotYetInitialized);
213 };
214
215 let dict = self
216 .dicts
217 .get(&dict_id)
218 .ok_or(err::DictNotProvided { dict_id })?;
219 state.decoder_scratch.init_from_dict(dict);
220 state.using_dict = Some(dict_id);
221
222 Ok(())
223 }
224
225 /// Returns how many bytes the frame contains after decompression
226 pub fn content_size(&self) -> u64 {
227 match &self.state {
228 None => 0,
229 Some(s) => s.frame_header.frame_content_size(),
230 }
231 }
232
233 /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
234 pub fn get_checksum_from_data(&self) -> Option<u32> {
235 let state = match &self.state {
236 None => return None,
237 Some(s) => s,
238 };
239
240 state.check_sum
241 }
242
243 /// Returns the checksum that was calculated while decoding.
244 /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder
245 #[cfg(feature = "hash")]
246 pub fn get_calculated_checksum(&self) -> Option<u32> {
247 use core::hash::Hasher;
248
249 let state = match &self.state {
250 None => return None,
251 Some(s) => s,
252 };
253 let cksum_64bit = state.decoder_scratch.buffer.hash.finish();
254 //truncate to lower 32bit because reasons...
255 Some(cksum_64bit as u32)
256 }
257
258 /// Counter for how many bytes have been consumed while decoding the frame
259 pub fn bytes_read_from_source(&self) -> u64 {
260 let state = match &self.state {
261 None => return 0,
262 Some(s) => s,
263 };
264 state.bytes_read_counter
265 }
266
267 /// Whether the current frames last block has been decoded yet
268 /// If this returns true you can call the drain* functions to get all content
269 /// (the read() function will drain automatically if this returns true)
270 pub fn is_finished(&self) -> bool {
271 let state = match &self.state {
272 None => return true,
273 Some(s) => s,
274 };
275 if state.frame_header.descriptor.content_checksum_flag() {
276 state.frame_finished && state.check_sum.is_some()
277 } else {
278 state.frame_finished
279 }
280 }
281
282 /// Counter for how many blocks have already been decoded
283 pub fn blocks_decoded(&self) -> usize {
284 let state = match &self.state {
285 None => return 0,
286 Some(s) => s,
287 };
288 state.block_counter
289 }
290
291 /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
292 /// The Strategy influences how many blocks will be decoded before the function returns
293 /// This is important if you want to manage memory consumption carefully. If you don't care
294 /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
295 pub fn decode_blocks(
296 &mut self,
297 mut source: impl Read,
298 strat: BlockDecodingStrategy,
299 ) -> Result<bool, FrameDecoderError> {
300 use FrameDecoderError as err;
301 let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
302
303 let mut block_dec = decoding::block_decoder::new();
304
305 let buffer_size_before = state.decoder_scratch.buffer.len();
306 let block_counter_before = state.block_counter;
307 loop {
308 vprintln!("################");
309 vprintln!("Next Block: {}", state.block_counter);
310 vprintln!("################");
311 let (block_header, block_header_size) = block_dec
312 .read_block_header(&mut source)
313 .map_err(err::FailedToReadBlockHeader)?;
314 state.bytes_read_counter += u64::from(block_header_size);
315
316 vprintln!();
317 vprintln!(
318 "Found {} block with size: {}, which will be of size: {}",
319 block_header.block_type,
320 block_header.content_size,
321 block_header.decompressed_size
322 );
323
324 let bytes_read_in_block_body = block_dec
325 .decode_block_content(&block_header, &mut state.decoder_scratch, &mut source)
326 .map_err(err::FailedToReadBlockBody)?;
327 state.bytes_read_counter += bytes_read_in_block_body;
328
329 state.block_counter += 1;
330
331 vprintln!("Output: {}", state.decoder_scratch.buffer.len());
332
333 if block_header.last_block {
334 state.frame_finished = true;
335 if state.frame_header.descriptor.content_checksum_flag() {
336 let mut chksum = [0u8; 4];
337 source
338 .read_exact(&mut chksum)
339 .map_err(err::FailedToReadChecksum)?;
340 state.bytes_read_counter += 4;
341 let chksum = u32::from_le_bytes(chksum);
342 state.check_sum = Some(chksum);
343 }
344 break;
345 }
346
347 match strat {
348 BlockDecodingStrategy::All => { /* keep going */ }
349 BlockDecodingStrategy::UptoBlocks(n) => {
350 if state.block_counter - block_counter_before >= n {
351 break;
352 }
353 }
354 BlockDecodingStrategy::UptoBytes(n) => {
355 if state.decoder_scratch.buffer.len() - buffer_size_before >= n {
356 break;
357 }
358 }
359 }
360 }
361
362 Ok(state.frame_finished)
363 }
364
365 /// Collect bytes and retain window_size bytes while decoding is still going on.
366 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
367 pub fn collect(&mut self) -> Option<Vec<u8>> {
368 let finished = self.is_finished();
369 let state = self.state.as_mut()?;
370 if finished {
371 Some(state.decoder_scratch.buffer.drain())
372 } else {
373 state.decoder_scratch.buffer.drain_to_window_size()
374 }
375 }
376
377 /// Collect bytes and retain window_size bytes while decoding is still going on.
378 /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
379 pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
380 let finished = self.is_finished();
381 let state = match &mut self.state {
382 None => return Ok(0),
383 Some(s) => s,
384 };
385 if finished {
386 state.decoder_scratch.buffer.drain_to_writer(w)
387 } else {
388 state.decoder_scratch.buffer.drain_to_window_size_writer(w)
389 }
390 }
391
392 /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
393 /// because window_size bytes need to be retained for decoding.
394 /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
395 pub fn can_collect(&self) -> usize {
396 let finished = self.is_finished();
397 let state = match &self.state {
398 None => return 0,
399 Some(s) => s,
400 };
401 if finished {
402 state.decoder_scratch.buffer.can_drain()
403 } else {
404 state
405 .decoder_scratch
406 .buffer
407 .can_drain_to_window_size()
408 .unwrap_or(0)
409 }
410 }
411
412 /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
413 /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
414 ///
415 /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
416 /// which try to serve an old-style c api
417 ///
418 /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
419 /// input will not make any progress!
420 ///
421 /// Note that no kind of block can be bigger than 128kb.
422 /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
423 ///
424 /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
425 pub fn decode_from_to(
426 &mut self,
427 source: &[u8],
428 target: &mut [u8],
429 ) -> Result<(usize, usize), FrameDecoderError> {
430 use FrameDecoderError as err;
431 let bytes_read_at_start = match &self.state {
432 Some(s) => s.bytes_read_counter,
433 None => 0,
434 };
435
436 if !self.is_finished() || self.state.is_none() {
437 let mut mt_source = source;
438
439 if self.state.is_none() {
440 self.init(&mut mt_source)?;
441 }
442
443 //pseudo block to scope "state" so we can borrow self again after the block
444 {
445 let state = match &mut self.state {
446 Some(s) => s,
447 None => panic!("Bug in library"),
448 };
449 let mut block_dec = decoding::block_decoder::new();
450
451 if state.frame_header.descriptor.content_checksum_flag()
452 && state.frame_finished
453 && state.check_sum.is_none()
454 {
455 //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
456 if mt_source.len() >= 4 {
457 let chksum = mt_source[..4].try_into().expect("optimized away");
458 state.bytes_read_counter += 4;
459 let chksum = u32::from_le_bytes(chksum);
460 state.check_sum = Some(chksum);
461 }
462 return Ok((4, 0));
463 }
464
465 loop {
466 //check if there are enough bytes for the next header
467 if mt_source.len() < 3 {
468 break;
469 }
470 let (block_header, block_header_size) = block_dec
471 .read_block_header(&mut mt_source)
472 .map_err(err::FailedToReadBlockHeader)?;
473
474 // check the needed size for the block before updating counters.
475 // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
476 if mt_source.len() < block_header.content_size as usize {
477 break;
478 }
479 state.bytes_read_counter += u64::from(block_header_size);
480
481 let bytes_read_in_block_body = block_dec
482 .decode_block_content(
483 &block_header,
484 &mut state.decoder_scratch,
485 &mut mt_source,
486 )
487 .map_err(err::FailedToReadBlockBody)?;
488 state.bytes_read_counter += bytes_read_in_block_body;
489 state.block_counter += 1;
490
491 if block_header.last_block {
492 state.frame_finished = true;
493 if state.frame_header.descriptor.content_checksum_flag() {
494 //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
495 if mt_source.len() >= 4 {
496 let chksum = mt_source[..4].try_into().expect("optimized away");
497 state.bytes_read_counter += 4;
498 let chksum = u32::from_le_bytes(chksum);
499 state.check_sum = Some(chksum);
500 }
501 }
502 break;
503 }
504 }
505 }
506 }
507
508 let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
509 let bytes_read_at_end = match &mut self.state {
510 Some(s) => s.bytes_read_counter,
511 None => panic!("Bug in library"),
512 };
513 let read_len = bytes_read_at_end - bytes_read_at_start;
514 Ok((read_len as usize, result_len))
515 }
516
517 /// Decode multiple frames into the output slice.
518 ///
519 /// `input` must contain an exact number of frames.
520 ///
521 /// `output` must be large enough to hold the decompressed data. If you don't know
522 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
523 ///
524 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
525 ///
526 /// Returns the number of bytes written to `output`.
527 pub fn decode_all(
528 &mut self,
529 mut input: &[u8],
530 mut output: &mut [u8],
531 ) -> Result<usize, FrameDecoderError> {
532 let mut total_bytes_written = 0;
533 while !input.is_empty() {
534 match self.init(&mut input) {
535 Ok(_) => {}
536 Err(FrameDecoderError::ReadFrameHeaderError(
537 crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
538 )) => {
539 input = input
540 .get(length as usize..)
541 .ok_or(FrameDecoderError::FailedToSkipFrame)?;
542 continue;
543 }
544 Err(e) => return Err(e),
545 };
546 loop {
547 self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
548 let bytes_written = self
549 .read(output)
550 .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
551 output = &mut output[bytes_written..];
552 total_bytes_written += bytes_written;
553 if self.can_collect() != 0 {
554 return Err(FrameDecoderError::TargetTooSmall);
555 }
556 if self.is_finished() {
557 break;
558 }
559 }
560 }
561
562 Ok(total_bytes_written)
563 }
564
565 /// Decode multiple frames into the extra capacity of the output vector.
566 ///
567 /// `input` must contain an exact number of frames.
568 ///
569 /// `output` must have enough extra capacity to hold the decompressed data.
570 /// This function will not reallocate or grow the vector. If you don't know
571 /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
572 ///
573 /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
574 ///
575 /// The length of the output vector is updated to include the decompressed data.
576 /// The length is not changed if an error occurs.
577 pub fn decode_all_to_vec(
578 &mut self,
579 input: &[u8],
580 output: &mut Vec<u8>,
581 ) -> Result<(), FrameDecoderError> {
582 let len = output.len();
583 let cap = output.capacity();
584 output.resize(cap, 0);
585 match self.decode_all(input, &mut output[len..]) {
586 Ok(bytes_written) => {
587 let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
588 output.resize(new_len, 0);
589 Ok(())
590 }
591 Err(e) => {
592 output.resize(len, 0);
593 Err(e)
594 }
595 }
596 }
597}
598
599/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
600/// this will retain window_size bytes, else it will drain it completely
601impl Read for FrameDecoder {
602 fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
603 let state = match &mut self.state {
604 None => return Ok(0),
605 Some(s) => s,
606 };
607 if state.frame_finished {
608 state.decoder_scratch.buffer.read_all(target)
609 } else {
610 state.decoder_scratch.buffer.read(target)
611 }
612 }
613}