gix_features/zlib/stream/deflate/
mod.rs

1use crate::zlib::Status;
2use zlib_rs::DeflateError;
3
4const BUF_SIZE: usize = 4096 * 8;
5
6/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
7///
8/// Be sure to call `flush()` when done to finalize the deflate stream.
9pub struct Write<W> {
10    compressor: Compress,
11    inner: W,
12    buf: [u8; BUF_SIZE],
13}
14
15impl<W> Clone for Write<W>
16where
17    W: Clone,
18{
19    fn clone(&self) -> Self {
20        Write {
21            compressor: impls::new_compress(),
22            inner: self.inner.clone(),
23            buf: self.buf,
24        }
25    }
26}
27
28/// Hold all state needed for compressing data.
29pub struct Compress(zlib_rs::Deflate);
30
31impl Default for Compress {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Compress {
38    /// The number of bytes that were read from the input.
39    pub fn total_in(&self) -> u64 {
40        self.0.total_in()
41    }
42
43    /// The number of compressed bytes that were written to the output.
44    pub fn total_out(&self) -> u64 {
45        self.0.total_out()
46    }
47
48    /// Create a new instance - this allocates so should be done with care.
49    pub fn new() -> Self {
50        let inner = zlib_rs::Deflate::new(zlib_rs::c_api::Z_BEST_SPEED, true, zlib_rs::MAX_WBITS as u8);
51        Self(inner)
52    }
53
54    /// Prepare the instance for a new stream.
55    pub fn reset(&mut self) {
56        self.0.reset();
57    }
58
59    /// Compress `input` and write compressed bytes to `output`, with `flush` controlling additional characteristics.
60    pub fn compress(&mut self, input: &[u8], output: &mut [u8], flush: FlushCompress) -> Result<Status, CompressError> {
61        let flush = match flush {
62            FlushCompress::None => zlib_rs::DeflateFlush::NoFlush,
63            FlushCompress::Partial => zlib_rs::DeflateFlush::PartialFlush,
64            FlushCompress::Sync => zlib_rs::DeflateFlush::SyncFlush,
65            FlushCompress::Full => zlib_rs::DeflateFlush::FullFlush,
66            FlushCompress::Finish => zlib_rs::DeflateFlush::Finish,
67        };
68        let status = self.0.compress(input, output, flush)?;
69        match status {
70            zlib_rs::Status::Ok => Ok(Status::Ok),
71            zlib_rs::Status::BufError => Ok(Status::BufError),
72            zlib_rs::Status::StreamEnd => Ok(Status::StreamEnd),
73        }
74    }
75}
76
77/// The error produced by [`Compress::compress()`].
78#[derive(Debug, thiserror::Error)]
79#[allow(missing_docs)]
80pub enum CompressError {
81    #[error("stream error")]
82    StreamError,
83    #[error("The input is not a valid deflate stream.")]
84    DataError,
85    #[error("Not enough memory")]
86    InsufficientMemory,
87}
88
89impl From<zlib_rs::DeflateError> for CompressError {
90    fn from(value: zlib_rs::DeflateError) -> Self {
91        match value {
92            DeflateError::StreamError => CompressError::StreamError,
93            DeflateError::DataError => CompressError::DataError,
94            DeflateError::MemError => CompressError::InsufficientMemory,
95        }
96    }
97}
98
99/// Values which indicate the form of flushing to be used when compressing
100/// in-memory data.
101#[derive(Copy, Clone, PartialEq, Eq, Debug)]
102#[non_exhaustive]
103#[allow(clippy::unnecessary_cast)]
104pub enum FlushCompress {
105    /// A typical parameter for passing to compression/decompression functions,
106    /// this indicates that the underlying stream to decide how much data to
107    /// accumulate before producing output in order to maximize compression.
108    None = 0,
109
110    /// All pending output is flushed to the output buffer, but the output is
111    /// not aligned to a byte boundary.
112    ///
113    /// All input data so far will be available to the decompressor (as with
114    /// `Flush::Sync`). This completes the current deflate block and follows it
115    /// with an empty fixed codes block that is 10 bits long, and it assures
116    /// that enough bytes are output in order for the decompressor to finish the
117    /// block before the empty fixed code block.
118    Partial = 1,
119
120    /// All pending output is flushed to the output buffer and the output is
121    /// aligned on a byte boundary so that the decompressor can get all input
122    /// data available so far.
123    ///
124    /// Flushing may degrade compression for some compression algorithms and so
125    /// it should only be used when necessary. This will complete the current
126    /// deflate block and follow it with an empty stored block.
127    Sync = 2,
128
129    /// All output is flushed as with `Flush::Sync` and the compression state is
130    /// reset so decompression can restart from this point if previous
131    /// compressed data has been damaged or if random access is desired.
132    ///
133    /// Using this option too often can seriously degrade compression.
134    Full = 3,
135
136    /// Pending input is processed and pending output is flushed.
137    ///
138    /// The return value may indicate that the stream is not yet done and more
139    /// data has yet to be processed.
140    Finish = 4,
141}
142
143mod impls {
144    use std::io;
145
146    use crate::zlib::stream::deflate::{self, Compress, FlushCompress};
147    use crate::zlib::Status;
148
149    pub(crate) fn new_compress() -> Compress {
150        Compress::new()
151    }
152
153    impl<W> deflate::Write<W>
154    where
155        W: io::Write,
156    {
157        /// Create a new instance writing compressed bytes to `inner`.
158        pub fn new(inner: W) -> deflate::Write<W> {
159            deflate::Write {
160                compressor: new_compress(),
161                inner,
162                buf: [0; deflate::BUF_SIZE],
163            }
164        }
165
166        /// Reset the compressor, starting a new compression stream.
167        ///
168        /// That way multiple streams can be written to the same inner writer.
169        pub fn reset(&mut self) {
170            self.compressor.reset();
171        }
172
173        /// Consume `self` and return the inner writer.
174        pub fn into_inner(self) -> W {
175            self.inner
176        }
177
178        fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
179            let total_in_when_start = self.compressor.total_in();
180            loop {
181                let last_total_in = self.compressor.total_in();
182                let last_total_out = self.compressor.total_out();
183
184                let status = self
185                    .compressor
186                    .compress(buf, &mut self.buf, flush)
187                    .map_err(io::Error::other)?;
188
189                let written = self.compressor.total_out() - last_total_out;
190                if written > 0 {
191                    self.inner.write_all(&self.buf[..written as usize])?;
192                }
193
194                match status {
195                    Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
196                    Status::Ok | Status::BufError => {
197                        let consumed = self.compressor.total_in() - last_total_in;
198                        buf = &buf[consumed as usize..];
199
200                        // output buffer still makes progress
201                        if self.compressor.total_out() > last_total_out {
202                            continue;
203                        }
204                        // input still makes progress
205                        if self.compressor.total_in() > last_total_in {
206                            continue;
207                        }
208                        // input also makes no progress anymore, need more so leave with what we have
209                        return Ok((self.compressor.total_in() - total_in_when_start) as usize);
210                    }
211                }
212            }
213        }
214    }
215
216    impl<W: io::Write> io::Write for deflate::Write<W> {
217        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
218            self.write_inner(buf, FlushCompress::None)
219        }
220
221        fn flush(&mut self) -> io::Result<()> {
222            self.write_inner(&[], FlushCompress::Finish).map(|_| ())
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests;