Skip to main content

zrip_encode/
streaming.rs

1#![forbid(unsafe_code)]
2
3use std::io::{self, Write};
4
5use crate::block_encoder::{self, BlockEncodeWorkspace};
6use crate::dfast;
7use crate::fast;
8use crate::strategy::{self, LevelParams, Strategy};
9use zrip_core::error::CompressError;
10use zrip_core::frame::{MAX_BLOCK_SIZE, ZSTD_MAGIC};
11use zrip_core::xxhash::Xxh64State;
12
13/// Streaming zstd compressor implementing [`Write`].
14///
15/// Buffers input until a full block (128 KiB) is ready, then compresses
16/// and writes it to the underlying writer. Call [`finish`](Self::finish)
17/// to flush the final block, write the content checksum, and recover the
18/// writer.
19///
20/// ```
21/// use std::io::Write;
22///
23/// let mut encoder = zrip::FrameEncoder::new(Vec::new(), 1).unwrap();
24/// encoder.write_all(b"streaming data").unwrap();
25/// let compressed = encoder.finish().unwrap();
26/// assert!(!compressed.is_empty());
27/// ```
28pub struct FrameEncoder<W: Write> {
29    inner: W,
30    params: LevelParams,
31    buffer: Vec<u8>,
32    rep_offsets: [u32; 3],
33    hasher: Xxh64State,
34    header_written: bool,
35    finished: bool,
36    workspace: BlockEncodeWorkspace,
37}
38
39impl<W: Write> FrameEncoder<W> {
40    /// Creates a new streaming encoder at the given level (-7..=4).
41    pub fn new(writer: W, level: i32) -> Result<Self, CompressError> {
42        let params = strategy::level_params(level).ok_or(CompressError::InvalidLevel(level))?;
43        Ok(Self {
44            inner: writer,
45            params,
46            buffer: Vec::new(),
47            rep_offsets: [1, 4, 8],
48            hasher: Xxh64State::new(0),
49            header_written: false,
50            finished: false,
51            workspace: BlockEncodeWorkspace::new(),
52        })
53    }
54
55    /// Flushes remaining data, writes the content checksum, and returns the inner writer.
56    pub fn finish(mut self) -> Result<W, io::Error> {
57        if self.finished {
58            return Ok(self.inner);
59        }
60        self.finished = true;
61
62        if !self.header_written {
63            self.write_header()?;
64        }
65
66        self.flush_block(true)?;
67
68        let hash = self.hasher.finish();
69        let checksum = (hash & 0xFFFFFFFF) as u32;
70        self.inner.write_all(&checksum.to_le_bytes())?;
71
72        Ok(self.inner)
73    }
74
75    fn write_header(&mut self) -> io::Result<()> {
76        self.header_written = true;
77
78        self.inner.write_all(&ZSTD_MAGIC.to_le_bytes())?;
79
80        let window_log = self.params.window_log;
81        let descriptor = 0x04u8;
82        self.inner.write_all(&[descriptor])?;
83
84        let mantissa = 0u8;
85        let exponent = (window_log - 10) as u8;
86        let window_descriptor = (exponent << 3) | mantissa;
87        self.inner.write_all(&[window_descriptor])?;
88
89        Ok(())
90    }
91
92    fn flush_block(&mut self, last: bool) -> io::Result<()> {
93        if self.buffer.is_empty() && last {
94            let mut block_out = Vec::new();
95            block_encoder::encode_raw_block(&[], true, &mut block_out);
96            self.inner.write_all(&block_out)?;
97            return Ok(());
98        }
99
100        if self.buffer.is_empty() {
101            return Ok(());
102        }
103
104        let chunk = core::mem::take(&mut self.buffer);
105
106        let mut block_out = Vec::with_capacity(chunk.len() + 32);
107        if crate::block_looks_incompressible(&chunk) {
108            block_encoder::encode_raw_block(&chunk, last, &mut block_out);
109        } else {
110            let sequences = match self.params.strategy {
111                Strategy::Fast => fast::compress_fast(&chunk, &self.params, &self.rep_offsets),
112                Strategy::DFast => dfast::compress_dfast(&chunk, &self.params, &self.rep_offsets),
113            };
114            if self.params.force_raw_literals {
115                block_encoder::encode_compressed_block_raw(
116                    &chunk,
117                    &sequences,
118                    &mut self.rep_offsets,
119                    last,
120                    &mut block_out,
121                    &mut self.workspace,
122                );
123            } else {
124                block_encoder::encode_compressed_block(
125                    &chunk,
126                    &sequences,
127                    &mut self.rep_offsets,
128                    last,
129                    &mut block_out,
130                    &mut self.workspace,
131                );
132            }
133        }
134
135        self.inner.write_all(&block_out)?;
136        Ok(())
137    }
138}
139
140impl<W: Write> Write for FrameEncoder<W> {
141    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
142        if self.finished {
143            return Err(io::Error::new(
144                io::ErrorKind::Other,
145                "encoder already finished",
146            ));
147        }
148
149        if !self.header_written {
150            self.write_header()?;
151        }
152
153        self.hasher.update(buf);
154
155        let mut consumed = 0;
156        while consumed < buf.len() {
157            let space = MAX_BLOCK_SIZE - self.buffer.len();
158            let n = space.min(buf.len() - consumed);
159            self.buffer.extend_from_slice(&buf[consumed..consumed + n]);
160            consumed += n;
161
162            if self.buffer.len() >= MAX_BLOCK_SIZE {
163                self.flush_block(false)?;
164            }
165        }
166
167        Ok(consumed)
168    }
169
170    fn flush(&mut self) -> io::Result<()> {
171        if !self.buffer.is_empty() {
172            self.flush_block(false)?;
173        }
174        self.inner.flush()
175    }
176}