Skip to main content

gix_features/zlib/stream/deflate/
mod.rs

1use crate::zlib::Status;
2use zlib_rs::DeflateError;
3
4const BUF_SIZE: usize = 4096 * 8;
5
6/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
7///
8/// Be sure to call `flush()` when done to finalize the deflate stream.
9pub struct Write<W> {
10    compressor: Compress,
11    inner: W,
12    buf: [u8; BUF_SIZE],
13}
14
15impl<W> Clone for Write<W>
16where
17    W: Clone,
18{
19    fn clone(&self) -> Self {
20        Write {
21            compressor: impls::new_compress(),
22            inner: self.inner.clone(),
23            buf: self.buf,
24        }
25    }
26}
27
28/// Hold all state needed for compressing data.
29pub struct Compress(zlib_rs::Deflate);
30
31impl Default for Compress {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Compress {
38    /// The number of bytes that were read from the input.
39    pub fn total_in(&self) -> u64 {
40        self.0.total_in()
41    }
42
43    /// The number of compressed bytes that were written to the output.
44    pub fn total_out(&self) -> u64 {
45        self.0.total_out()
46    }
47
48    /// Create a new instance - this allocates so should be done with care.
49    pub fn new() -> Self {
50        let config = zlib_rs::DeflateConfig::best_speed();
51        let header = true;
52        let inner = zlib_rs::Deflate::new(config.level, header, config.window_bits as u8);
53        Self(inner)
54    }
55
56    /// Prepare the instance for a new stream.
57    pub fn reset(&mut self) {
58        self.0.reset();
59    }
60
61    /// Compress `input` and write compressed bytes to `output`, with `flush` controlling additional characteristics.
62    pub fn compress(&mut self, input: &[u8], output: &mut [u8], flush: FlushCompress) -> Result<Status, CompressError> {
63        let flush = match flush {
64            FlushCompress::None => zlib_rs::DeflateFlush::NoFlush,
65            FlushCompress::Partial => zlib_rs::DeflateFlush::PartialFlush,
66            FlushCompress::Sync => zlib_rs::DeflateFlush::SyncFlush,
67            FlushCompress::Full => zlib_rs::DeflateFlush::FullFlush,
68            FlushCompress::Finish => zlib_rs::DeflateFlush::Finish,
69        };
70        let status = self.0.compress(input, output, flush)?;
71        match status {
72            zlib_rs::Status::Ok => Ok(Status::Ok),
73            zlib_rs::Status::BufError => Ok(Status::BufError),
74            zlib_rs::Status::StreamEnd => Ok(Status::StreamEnd),
75        }
76    }
77}
78
79/// The error produced by [`Compress::compress()`].
80#[derive(Debug, thiserror::Error)]
81#[allow(missing_docs)]
82pub enum CompressError {
83    #[error("stream error")]
84    StreamError,
85    #[error("The input is not a valid deflate stream.")]
86    DataError,
87    #[error("Not enough memory")]
88    InsufficientMemory,
89}
90
91impl From<zlib_rs::DeflateError> for CompressError {
92    fn from(value: zlib_rs::DeflateError) -> Self {
93        match value {
94            DeflateError::StreamError => CompressError::StreamError,
95            DeflateError::DataError => CompressError::DataError,
96            DeflateError::MemError => CompressError::InsufficientMemory,
97        }
98    }
99}
100
101/// Values which indicate the form of flushing to be used when compressing
102/// in-memory data.
103#[derive(Copy, Clone, PartialEq, Eq, Debug)]
104#[non_exhaustive]
105#[allow(clippy::unnecessary_cast)]
106pub enum FlushCompress {
107    /// A typical parameter for passing to compression/decompression functions,
108    /// this indicates that the underlying stream to decide how much data to
109    /// accumulate before producing output in order to maximize compression.
110    None = 0,
111
112    /// All pending output is flushed to the output buffer, but the output is
113    /// not aligned to a byte boundary.
114    ///
115    /// All input data so far will be available to the decompressor (as with
116    /// `Flush::Sync`). This completes the current deflate block and follows it
117    /// with an empty fixed codes block that is 10 bits long, and it assures
118    /// that enough bytes are output in order for the decompressor to finish the
119    /// block before the empty fixed code block.
120    Partial = 1,
121
122    /// All pending output is flushed to the output buffer and the output is
123    /// aligned on a byte boundary so that the decompressor can get all input
124    /// data available so far.
125    ///
126    /// Flushing may degrade compression for some compression algorithms and so
127    /// it should only be used when necessary. This will complete the current
128    /// deflate block and follow it with an empty stored block.
129    Sync = 2,
130
131    /// All output is flushed as with `Flush::Sync` and the compression state is
132    /// reset so decompression can restart from this point if previous
133    /// compressed data has been damaged or if random access is desired.
134    ///
135    /// Using this option too often can seriously degrade compression.
136    Full = 3,
137
138    /// Pending input is processed and pending output is flushed.
139    ///
140    /// The return value may indicate that the stream is not yet done and more
141    /// data has yet to be processed.
142    Finish = 4,
143}
144
145mod impls {
146    use std::io;
147
148    use crate::zlib::stream::deflate::{self, Compress, FlushCompress};
149    use crate::zlib::Status;
150
151    pub(crate) fn new_compress() -> Compress {
152        Compress::new()
153    }
154
155    impl<W> deflate::Write<W>
156    where
157        W: io::Write,
158    {
159        /// Create a new instance writing compressed bytes to `inner`.
160        pub fn new(inner: W) -> deflate::Write<W> {
161            deflate::Write {
162                compressor: new_compress(),
163                inner,
164                buf: [0; deflate::BUF_SIZE],
165            }
166        }
167
168        /// Reset the compressor, starting a new compression stream.
169        ///
170        /// That way multiple streams can be written to the same inner writer.
171        pub fn reset(&mut self) {
172            self.compressor.reset();
173        }
174
175        /// Consume `self` and return the inner writer.
176        pub fn into_inner(self) -> W {
177            self.inner
178        }
179
180        fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
181            let total_in_when_start = self.compressor.total_in();
182            loop {
183                let last_total_in = self.compressor.total_in();
184                let last_total_out = self.compressor.total_out();
185
186                let status = self
187                    .compressor
188                    .compress(buf, &mut self.buf, flush)
189                    .map_err(io::Error::other)?;
190
191                let written = self.compressor.total_out() - last_total_out;
192                if written > 0 {
193                    self.inner.write_all(&self.buf[..written as usize])?;
194                }
195
196                match status {
197                    Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
198                    Status::Ok | Status::BufError => {
199                        let consumed = self.compressor.total_in() - last_total_in;
200                        buf = &buf[consumed as usize..];
201
202                        // output buffer still makes progress
203                        if self.compressor.total_out() > last_total_out {
204                            continue;
205                        }
206                        // input still makes progress
207                        if self.compressor.total_in() > last_total_in {
208                            continue;
209                        }
210                        // input also makes no progress anymore, need more so leave with what we have
211                        return Ok((self.compressor.total_in() - total_in_when_start) as usize);
212                    }
213                }
214            }
215        }
216    }
217
218    impl<W: io::Write> io::Write for deflate::Write<W> {
219        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
220            self.write_inner(buf, FlushCompress::None)
221        }
222
223        fn flush(&mut self) -> io::Result<()> {
224            self.write_inner(&[], FlushCompress::Finish).map(|_| ())
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests;