Skip to main content

structured_zstd/decoding/
frame_decoder.rs

1//! Framedecoder is the main low-level struct users interact with to decode zstd frames
2//!
3//! Zstandard compressed data is made of one or more frames. Each frame is independent and can be
4//! decompressed independently of other frames. This module contains structures
5//! and utilities that can be used to decode a frame.
6
7use super::frame;
8use crate::decoding;
9use crate::decoding::dictionary::Dictionary;
10use crate::decoding::errors::FrameDecoderError;
11use crate::decoding::scratch::DecoderScratch;
12use crate::io::{Error, Read, Write};
13use alloc::collections::BTreeMap;
14use alloc::vec::Vec;
15use core::convert::TryInto;
16
17use crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE;
18
19/// Low level Zstandard decoder that can be used to decompress frames with fine control over when and how many bytes are decoded.
20///
21/// This decoder is able to decode frames only partially and gives control
22/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once).
23/// It reads bytes as needed from a provided source and can be read from to collect partial results.
24///
25/// If you want to just read the whole frame with an `io::Read` without having to deal with manually calling [FrameDecoder::decode_blocks]
26/// you can use the provided [crate::decoding::StreamingDecoder] wich wraps this FrameDecoder.
27///
28/// Workflow is as follows:
29/// ```
30/// use structured_zstd::decoding::BlockDecodingStrategy;
31///
32/// # #[cfg(feature = "std")]
33/// use std::io::{Read, Write};
34///
35/// // no_std environments can use the crate's own Read traits
36/// # #[cfg(not(feature = "std"))]
37/// use structured_zstd::io::{Read, Write};
38///
39/// fn decode_this(mut file: impl Read) {
40///     //Create a new decoder
41///     let mut frame_dec = structured_zstd::decoding::FrameDecoder::new();
42///     let mut result = Vec::new();
43///
44///     // Use reset or init to make the decoder ready to decode the frame from the io::Read
45///     frame_dec.reset(&mut file).unwrap();
46///
47///     // Loop until the frame has been decoded completely
48///     while !frame_dec.is_finished() {
49///         // decode (roughly) batch_size many bytes
50///         frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap();
51///
52///         // read from the decoder to collect bytes from the internal buffer
53///         let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap();
54///
55///         // then do something with it
56///         do_something(&result[0..bytes_read]);
57///     }
58///
59///     // handle the last chunk of data
60///     while frame_dec.can_collect() > 0 {
61///         let x = frame_dec.read(result.as_mut_slice()).unwrap();
62///
63///         do_something(&result[0..x]);
64///     }
65/// }
66///
67/// fn do_something(data: &[u8]) {
68/// # #[cfg(feature = "std")]
69///     std::io::stdout().write_all(data).unwrap();
70/// }
71/// ```
72pub struct FrameDecoder {
73    state: Option<FrameDecoderState>,
74    dicts: BTreeMap<u32, Dictionary>,
75}
76
77struct FrameDecoderState {
78    pub frame_header: frame::FrameHeader,
79    decoder_scratch: DecoderScratch,
80    frame_finished: bool,
81    block_counter: usize,
82    bytes_read_counter: u64,
83    check_sum: Option<u32>,
84    using_dict: Option<u32>,
85}
86
87pub enum BlockDecodingStrategy {
88    All,
89    UptoBlocks(usize),
90    UptoBytes(usize),
91}
92
93impl FrameDecoderState {
94    /// Read the frame header from `source` and create a new decoder state.
95    ///
96    /// Pre-allocates the decode buffer to `window_size` so the first block
97    /// does not trigger incremental growth from zero capacity.
98    pub fn new(source: impl Read) -> Result<FrameDecoderState, FrameDecoderError> {
99        let (frame, header_size) = frame::read_frame_header(source)?;
100        let window_size = frame.window_size()?;
101
102        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
103            return Err(FrameDecoderError::WindowSizeTooBig {
104                requested: window_size,
105            });
106        }
107
108        let mut decoder_scratch = DecoderScratch::new(window_size as usize);
109        decoder_scratch.buffer.reserve(window_size as usize);
110        Ok(FrameDecoderState {
111            frame_header: frame,
112            frame_finished: false,
113            block_counter: 0,
114            decoder_scratch,
115            bytes_read_counter: u64::from(header_size),
116            check_sum: None,
117            using_dict: None,
118        })
119    }
120
121    /// Reset this state for a new frame read from `source`, reusing existing allocations.
122    ///
123    /// `DecodeBuffer::reset` reserves `window_size` internally, so no
124    /// additional frame-level reservation is needed here. Further buffer
125    /// growth during decoding is performed on demand by the active block path.
126    pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
127        let (frame_header, header_size) = frame::read_frame_header(source)?;
128        let window_size = frame_header.window_size()?;
129
130        if window_size > MAXIMUM_ALLOWED_WINDOW_SIZE {
131            return Err(FrameDecoderError::WindowSizeTooBig {
132                requested: window_size,
133            });
134        }
135
136        self.frame_header = frame_header;
137        self.frame_finished = false;
138        self.block_counter = 0;
139        self.decoder_scratch.reset(window_size as usize);
140        self.bytes_read_counter = u64::from(header_size);
141        self.check_sum = None;
142        self.using_dict = None;
143        Ok(())
144    }
145}
146
147impl Default for FrameDecoder {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153impl FrameDecoder {
154    /// This will create a new decoder without allocating anything yet.
155    /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used
156    /// else they just reset these buffers with not further allocations
157    pub fn new() -> FrameDecoder {
158        FrameDecoder {
159            state: None,
160            dicts: BTreeMap::new(),
161        }
162    }
163
164    /// init() will allocate all needed buffers if it is the first time this decoder is used
165    /// else they just reset these buffers with not further allocations
166    ///
167    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
168    ///
169    /// equivalent to reset()
170    pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
171        self.reset(source)
172    }
173
174    /// reset() will allocate all needed buffers if it is the first time this decoder is used
175    /// else they just reset these buffers with not further allocations
176    ///
177    /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer()
178    ///
179    /// equivalent to init()
180    pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> {
181        use FrameDecoderError as err;
182        let state = match &mut self.state {
183            Some(s) => {
184                s.reset(source)?;
185                s
186            }
187            None => {
188                self.state = Some(FrameDecoderState::new(source)?);
189                self.state.as_mut().unwrap()
190            }
191        };
192        if let Some(dict_id) = state.frame_header.dictionary_id() {
193            let dict = self
194                .dicts
195                .get(&dict_id)
196                .ok_or(err::DictNotProvided { dict_id })?;
197            state.decoder_scratch.init_from_dict(dict);
198            state.using_dict = Some(dict_id);
199        }
200        Ok(())
201    }
202
203    /// Add a dict to the FrameDecoder that can be used when needed. The FrameDecoder uses the appropriate one dynamically
204    pub fn add_dict(&mut self, dict: Dictionary) -> Result<(), FrameDecoderError> {
205        self.dicts.insert(dict.id, dict);
206        Ok(())
207    }
208
209    pub fn force_dict(&mut self, dict_id: u32) -> Result<(), FrameDecoderError> {
210        use FrameDecoderError as err;
211        let Some(state) = self.state.as_mut() else {
212            return Err(err::NotYetInitialized);
213        };
214
215        let dict = self
216            .dicts
217            .get(&dict_id)
218            .ok_or(err::DictNotProvided { dict_id })?;
219        state.decoder_scratch.init_from_dict(dict);
220        state.using_dict = Some(dict_id);
221
222        Ok(())
223    }
224
225    /// Returns how many bytes the frame contains after decompression
226    pub fn content_size(&self) -> u64 {
227        match &self.state {
228            None => 0,
229            Some(s) => s.frame_header.frame_content_size(),
230        }
231    }
232
233    /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame
234    pub fn get_checksum_from_data(&self) -> Option<u32> {
235        let state = match &self.state {
236            None => return None,
237            Some(s) => s,
238        };
239
240        state.check_sum
241    }
242
243    /// Returns the checksum that was calculated while decoding.
244    /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder
245    #[cfg(feature = "hash")]
246    pub fn get_calculated_checksum(&self) -> Option<u32> {
247        use core::hash::Hasher;
248
249        let state = match &self.state {
250            None => return None,
251            Some(s) => s,
252        };
253        let cksum_64bit = state.decoder_scratch.buffer.hash.finish();
254        //truncate to lower 32bit because reasons...
255        Some(cksum_64bit as u32)
256    }
257
258    /// Counter for how many bytes have been consumed while decoding the frame
259    pub fn bytes_read_from_source(&self) -> u64 {
260        let state = match &self.state {
261            None => return 0,
262            Some(s) => s,
263        };
264        state.bytes_read_counter
265    }
266
267    /// Whether the current frames last block has been decoded yet
268    /// If this returns true you can call the drain* functions to get all content
269    /// (the read() function will drain automatically if this returns true)
270    pub fn is_finished(&self) -> bool {
271        let state = match &self.state {
272            None => return true,
273            Some(s) => s,
274        };
275        if state.frame_header.descriptor.content_checksum_flag() {
276            state.frame_finished && state.check_sum.is_some()
277        } else {
278            state.frame_finished
279        }
280    }
281
282    /// Counter for how many blocks have already been decoded
283    pub fn blocks_decoded(&self) -> usize {
284        let state = match &self.state {
285            None => return 0,
286            Some(s) => s,
287        };
288        state.block_counter
289    }
290
291    /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first.
292    /// The Strategy influences how many blocks will be decoded before the function returns
293    /// This is important if you want to manage memory consumption carefully. If you don't care
294    /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer
295    pub fn decode_blocks(
296        &mut self,
297        mut source: impl Read,
298        strat: BlockDecodingStrategy,
299    ) -> Result<bool, FrameDecoderError> {
300        use FrameDecoderError as err;
301        let state = self.state.as_mut().ok_or(err::NotYetInitialized)?;
302
303        let mut block_dec = decoding::block_decoder::new();
304
305        let buffer_size_before = state.decoder_scratch.buffer.len();
306        let block_counter_before = state.block_counter;
307        loop {
308            vprintln!("################");
309            vprintln!("Next Block: {}", state.block_counter);
310            vprintln!("################");
311            let (block_header, block_header_size) = block_dec
312                .read_block_header(&mut source)
313                .map_err(err::FailedToReadBlockHeader)?;
314            state.bytes_read_counter += u64::from(block_header_size);
315
316            vprintln!();
317            vprintln!(
318                "Found {} block with size: {}, which will be of size: {}",
319                block_header.block_type,
320                block_header.content_size,
321                block_header.decompressed_size
322            );
323
324            let bytes_read_in_block_body = block_dec
325                .decode_block_content(&block_header, &mut state.decoder_scratch, &mut source)
326                .map_err(err::FailedToReadBlockBody)?;
327            state.bytes_read_counter += bytes_read_in_block_body;
328
329            state.block_counter += 1;
330
331            vprintln!("Output: {}", state.decoder_scratch.buffer.len());
332
333            if block_header.last_block {
334                state.frame_finished = true;
335                if state.frame_header.descriptor.content_checksum_flag() {
336                    let mut chksum = [0u8; 4];
337                    source
338                        .read_exact(&mut chksum)
339                        .map_err(err::FailedToReadChecksum)?;
340                    state.bytes_read_counter += 4;
341                    let chksum = u32::from_le_bytes(chksum);
342                    state.check_sum = Some(chksum);
343                }
344                break;
345            }
346
347            match strat {
348                BlockDecodingStrategy::All => { /* keep going */ }
349                BlockDecodingStrategy::UptoBlocks(n) => {
350                    if state.block_counter - block_counter_before >= n {
351                        break;
352                    }
353                }
354                BlockDecodingStrategy::UptoBytes(n) => {
355                    if state.decoder_scratch.buffer.len() - buffer_size_before >= n {
356                        break;
357                    }
358                }
359            }
360        }
361
362        Ok(state.frame_finished)
363    }
364
365    /// Collect bytes and retain window_size bytes while decoding is still going on.
366    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
367    pub fn collect(&mut self) -> Option<Vec<u8>> {
368        let finished = self.is_finished();
369        let state = self.state.as_mut()?;
370        if finished {
371            Some(state.decoder_scratch.buffer.drain())
372        } else {
373            state.decoder_scratch.buffer.drain_to_window_size()
374        }
375    }
376
377    /// Collect bytes and retain window_size bytes while decoding is still going on.
378    /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes
379    pub fn collect_to_writer(&mut self, w: impl Write) -> Result<usize, Error> {
380        let finished = self.is_finished();
381        let state = match &mut self.state {
382            None => return Ok(0),
383            Some(s) => s,
384        };
385        if finished {
386            state.decoder_scratch.buffer.drain_to_writer(w)
387        } else {
388            state.decoder_scratch.buffer.drain_to_window_size_writer(w)
389        }
390    }
391
392    /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size
393    /// because window_size bytes need to be retained for decoding.
394    /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes
395    pub fn can_collect(&self) -> usize {
396        let finished = self.is_finished();
397        let state = match &self.state {
398            None => return 0,
399            Some(s) => s,
400        };
401        if finished {
402            state.decoder_scratch.buffer.can_drain()
403        } else {
404            state
405                .decoder_scratch
406                .buffer
407                .can_drain_to_window_size()
408                .unwrap_or(0)
409        }
410    }
411
412    /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice
413    /// The source slice may contain only parts of a frame but must contain at least one full block to make progress
414    ///
415    /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors
416    /// which try to serve an old-style c api
417    ///
418    /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same
419    /// input will not make any progress!
420    ///
421    /// Note that no kind of block can be bigger than 128kb.
422    /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer
423    ///
424    /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target)
425    pub fn decode_from_to(
426        &mut self,
427        source: &[u8],
428        target: &mut [u8],
429    ) -> Result<(usize, usize), FrameDecoderError> {
430        use FrameDecoderError as err;
431        let bytes_read_at_start = match &self.state {
432            Some(s) => s.bytes_read_counter,
433            None => 0,
434        };
435
436        if !self.is_finished() || self.state.is_none() {
437            let mut mt_source = source;
438
439            if self.state.is_none() {
440                self.init(&mut mt_source)?;
441            }
442
443            //pseudo block to scope "state" so we can borrow self again after the block
444            {
445                let state = match &mut self.state {
446                    Some(s) => s,
447                    None => panic!("Bug in library"),
448                };
449                let mut block_dec = decoding::block_decoder::new();
450
451                if state.frame_header.descriptor.content_checksum_flag()
452                    && state.frame_finished
453                    && state.check_sum.is_none()
454                {
455                    //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame
456                    if mt_source.len() >= 4 {
457                        let chksum = mt_source[..4].try_into().expect("optimized away");
458                        state.bytes_read_counter += 4;
459                        let chksum = u32::from_le_bytes(chksum);
460                        state.check_sum = Some(chksum);
461                    }
462                    return Ok((4, 0));
463                }
464
465                loop {
466                    //check if there are enough bytes for the next header
467                    if mt_source.len() < 3 {
468                        break;
469                    }
470                    let (block_header, block_header_size) = block_dec
471                        .read_block_header(&mut mt_source)
472                        .map_err(err::FailedToReadBlockHeader)?;
473
474                    // check the needed size for the block before updating counters.
475                    // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place
476                    if mt_source.len() < block_header.content_size as usize {
477                        break;
478                    }
479                    state.bytes_read_counter += u64::from(block_header_size);
480
481                    let bytes_read_in_block_body = block_dec
482                        .decode_block_content(
483                            &block_header,
484                            &mut state.decoder_scratch,
485                            &mut mt_source,
486                        )
487                        .map_err(err::FailedToReadBlockBody)?;
488                    state.bytes_read_counter += bytes_read_in_block_body;
489                    state.block_counter += 1;
490
491                    if block_header.last_block {
492                        state.frame_finished = true;
493                        if state.frame_header.descriptor.content_checksum_flag() {
494                            //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call
495                            if mt_source.len() >= 4 {
496                                let chksum = mt_source[..4].try_into().expect("optimized away");
497                                state.bytes_read_counter += 4;
498                                let chksum = u32::from_le_bytes(chksum);
499                                state.check_sum = Some(chksum);
500                            }
501                        }
502                        break;
503                    }
504                }
505            }
506        }
507
508        let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?;
509        let bytes_read_at_end = match &mut self.state {
510            Some(s) => s.bytes_read_counter,
511            None => panic!("Bug in library"),
512        };
513        let read_len = bytes_read_at_end - bytes_read_at_start;
514        Ok((read_len as usize, result_len))
515    }
516
517    /// Decode multiple frames into the output slice.
518    ///
519    /// `input` must contain an exact number of frames.
520    ///
521    /// `output` must be large enough to hold the decompressed data. If you don't know
522    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
523    ///
524    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
525    ///
526    /// Returns the number of bytes written to `output`.
527    pub fn decode_all(
528        &mut self,
529        mut input: &[u8],
530        mut output: &mut [u8],
531    ) -> Result<usize, FrameDecoderError> {
532        let mut total_bytes_written = 0;
533        while !input.is_empty() {
534            match self.init(&mut input) {
535                Ok(_) => {}
536                Err(FrameDecoderError::ReadFrameHeaderError(
537                    crate::decoding::errors::ReadFrameHeaderError::SkipFrame { length, .. },
538                )) => {
539                    input = input
540                        .get(length as usize..)
541                        .ok_or(FrameDecoderError::FailedToSkipFrame)?;
542                    continue;
543                }
544                Err(e) => return Err(e),
545            };
546            loop {
547                self.decode_blocks(&mut input, BlockDecodingStrategy::UptoBytes(1024 * 1024))?;
548                let bytes_written = self
549                    .read(output)
550                    .map_err(FrameDecoderError::FailedToDrainDecodebuffer)?;
551                output = &mut output[bytes_written..];
552                total_bytes_written += bytes_written;
553                if self.can_collect() != 0 {
554                    return Err(FrameDecoderError::TargetTooSmall);
555                }
556                if self.is_finished() {
557                    break;
558                }
559            }
560        }
561
562        Ok(total_bytes_written)
563    }
564
565    /// Decode multiple frames into the extra capacity of the output vector.
566    ///
567    /// `input` must contain an exact number of frames.
568    ///
569    /// `output` must have enough extra capacity to hold the decompressed data.
570    /// This function will not reallocate or grow the vector. If you don't know
571    /// how large the output will be, use [`FrameDecoder::decode_blocks`] instead.
572    ///
573    /// This calls [`FrameDecoder::init`], and all bytes currently in the decoder will be lost.
574    ///
575    /// The length of the output vector is updated to include the decompressed data.
576    /// The length is not changed if an error occurs.
577    pub fn decode_all_to_vec(
578        &mut self,
579        input: &[u8],
580        output: &mut Vec<u8>,
581    ) -> Result<(), FrameDecoderError> {
582        let len = output.len();
583        let cap = output.capacity();
584        output.resize(cap, 0);
585        match self.decode_all(input, &mut output[len..]) {
586            Ok(bytes_written) => {
587                let new_len = core::cmp::min(len + bytes_written, cap); // Sanitizes `bytes_written`.
588                output.resize(new_len, 0);
589                Ok(())
590            }
591            Err(e) => {
592                output.resize(len, 0);
593                Err(e)
594            }
595        }
596    }
597}
598
599/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished
600/// this will retain window_size bytes, else it will drain it completely
601impl Read for FrameDecoder {
602    fn read(&mut self, target: &mut [u8]) -> Result<usize, Error> {
603        let state = match &mut self.state {
604            None => return Ok(0),
605            Some(s) => s,
606        };
607        if state.frame_finished {
608            state.decoder_scratch.buffer.read_all(target)
609        } else {
610            state.decoder_scratch.buffer.read(target)
611        }
612    }
613}