gix_features/zlib/stream/deflate/
mod.rs

1use crate::zlib::Status;
2use std::ffi::c_int;
3
4const BUF_SIZE: usize = 4096 * 8;
5
6/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
7///
8/// Be sure to call `flush()` when done to finalize the deflate stream.
9pub struct Write<W> {
10    compressor: Compress,
11    inner: W,
12    buf: [u8; BUF_SIZE],
13}
14
15impl<W> Clone for Write<W>
16where
17    W: Clone,
18{
19    fn clone(&self) -> Self {
20        Write {
21            compressor: impls::new_compress(),
22            inner: self.inner.clone(),
23            buf: self.buf,
24        }
25    }
26}
27
28/// Hold all state needed for compressing data.
29pub struct Compress(libz_rs_sys::z_stream);
30
31unsafe impl Sync for Compress {}
32unsafe impl Send for Compress {}
33
34impl Default for Compress {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl Compress {
41    /// The number of bytes that were read from the input.
42    pub fn total_in(&self) -> u64 {
43        self.0.total_in as _
44    }
45
46    /// The number of compressed bytes that were written to the output.
47    pub fn total_out(&self) -> u64 {
48        self.0.total_out as _
49    }
50
51    /// Create a new instance - this allocates so should be done with care.
52    pub fn new() -> Self {
53        let mut this = libz_rs_sys::z_stream::default();
54
55        unsafe {
56            libz_rs_sys::deflateInit_(
57                &mut this,
58                libz_rs_sys::Z_BEST_SPEED,
59                libz_rs_sys::zlibVersion(),
60                core::mem::size_of::<libz_rs_sys::z_stream>() as core::ffi::c_int,
61            );
62        }
63
64        Self(this)
65    }
66
67    /// Prepare the instance for a new stream.
68    pub fn reset(&mut self) {
69        unsafe { libz_rs_sys::deflateReset(&mut self.0) };
70    }
71
72    /// Compress `input` and write compressed bytes to `output`, with `flush` controlling additional characteristics.
73    pub fn compress(&mut self, input: &[u8], output: &mut [u8], flush: FlushCompress) -> Result<Status, CompressError> {
74        self.0.avail_in = input.len() as _;
75        self.0.avail_out = output.len() as _;
76
77        self.0.next_in = input.as_ptr();
78        self.0.next_out = output.as_mut_ptr();
79
80        match unsafe { libz_rs_sys::deflate(&mut self.0, flush as _) } {
81            libz_rs_sys::Z_OK => Ok(Status::Ok),
82            libz_rs_sys::Z_BUF_ERROR => Ok(Status::BufError),
83            libz_rs_sys::Z_STREAM_END => Ok(Status::StreamEnd),
84
85            libz_rs_sys::Z_STREAM_ERROR => Err(CompressError::StreamError),
86            libz_rs_sys::Z_MEM_ERROR => Err(CompressError::InsufficientMemory),
87            err => Err(CompressError::Unknown { err }),
88        }
89    }
90}
91
92impl Drop for Compress {
93    fn drop(&mut self) {
94        unsafe { libz_rs_sys::deflateEnd(&mut self.0) };
95    }
96}
97
98/// The error produced by [`Compress::compress()`].
99#[derive(Debug, thiserror::Error)]
100#[error("{msg}")]
101#[allow(missing_docs)]
102pub enum CompressError {
103    #[error("stream error")]
104    StreamError,
105    #[error("Not enough memory")]
106    InsufficientMemory,
107    #[error("An unknown error occurred: {err}")]
108    Unknown { err: c_int },
109}
110
111/// Values which indicate the form of flushing to be used when compressing
112/// in-memory data.
113#[derive(Copy, Clone, PartialEq, Eq, Debug)]
114#[non_exhaustive]
115#[allow(clippy::unnecessary_cast)]
116pub enum FlushCompress {
117    /// A typical parameter for passing to compression/decompression functions,
118    /// this indicates that the underlying stream to decide how much data to
119    /// accumulate before producing output in order to maximize compression.
120    None = libz_rs_sys::Z_NO_FLUSH as isize,
121
122    /// All pending output is flushed to the output buffer, but the output is
123    /// not aligned to a byte boundary.
124    ///
125    /// All input data so far will be available to the decompressor (as with
126    /// `Flush::Sync`). This completes the current deflate block and follows it
127    /// with an empty fixed codes block that is 10 bits long, and it assures
128    /// that enough bytes are output in order for the decompressor to finish the
129    /// block before the empty fixed code block.
130    Partial = libz_rs_sys::Z_PARTIAL_FLUSH as isize,
131
132    /// All pending output is flushed to the output buffer and the output is
133    /// aligned on a byte boundary so that the decompressor can get all input
134    /// data available so far.
135    ///
136    /// Flushing may degrade compression for some compression algorithms and so
137    /// it should only be used when necessary. This will complete the current
138    /// deflate block and follow it with an empty stored block.
139    Sync = libz_rs_sys::Z_SYNC_FLUSH as isize,
140
141    /// All output is flushed as with `Flush::Sync` and the compression state is
142    /// reset so decompression can restart from this point if previous
143    /// compressed data has been damaged or if random access is desired.
144    ///
145    /// Using this option too often can seriously degrade compression.
146    Full = libz_rs_sys::Z_FULL_FLUSH as isize,
147
148    /// Pending input is processed and pending output is flushed.
149    ///
150    /// The return value may indicate that the stream is not yet done and more
151    /// data has yet to be processed.
152    Finish = libz_rs_sys::Z_FINISH as isize,
153}
154
155mod impls {
156    use std::io;
157
158    use crate::zlib::stream::deflate::{self, Compress, FlushCompress};
159    use crate::zlib::Status;
160
161    pub(crate) fn new_compress() -> Compress {
162        Compress::new()
163    }
164
165    impl<W> deflate::Write<W>
166    where
167        W: io::Write,
168    {
169        /// Create a new instance writing compressed bytes to `inner`.
170        pub fn new(inner: W) -> deflate::Write<W> {
171            deflate::Write {
172                compressor: new_compress(),
173                inner,
174                buf: [0; deflate::BUF_SIZE],
175            }
176        }
177
178        /// Reset the compressor, starting a new compression stream.
179        ///
180        /// That way multiple streams can be written to the same inner writer.
181        pub fn reset(&mut self) {
182            self.compressor.reset();
183        }
184
185        /// Consume `self` and return the inner writer.
186        pub fn into_inner(self) -> W {
187            self.inner
188        }
189
190        fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
191            let total_in_when_start = self.compressor.total_in();
192            loop {
193                let last_total_in = self.compressor.total_in();
194                let last_total_out = self.compressor.total_out();
195
196                let status = self
197                    .compressor
198                    .compress(buf, &mut self.buf, flush)
199                    .map_err(io::Error::other)?;
200
201                let written = self.compressor.total_out() - last_total_out;
202                if written > 0 {
203                    self.inner.write_all(&self.buf[..written as usize])?;
204                }
205
206                match status {
207                    Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
208                    Status::Ok | Status::BufError => {
209                        let consumed = self.compressor.total_in() - last_total_in;
210                        buf = &buf[consumed as usize..];
211
212                        // output buffer still makes progress
213                        if self.compressor.total_out() > last_total_out {
214                            continue;
215                        }
216                        // input still makes progress
217                        if self.compressor.total_in() > last_total_in {
218                            continue;
219                        }
220                        // input also makes no progress anymore, need more so leave with what we have
221                        return Ok((self.compressor.total_in() - total_in_when_start) as usize);
222                    }
223                }
224            }
225        }
226    }
227
228    impl<W: io::Write> io::Write for deflate::Write<W> {
229        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
230            self.write_inner(buf, FlushCompress::None)
231        }
232
233        fn flush(&mut self) -> io::Result<()> {
234            self.write_inner(&[], FlushCompress::Finish).map(|_| ())
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests;