git_features/zlib/stream/deflate/
mod.rs

1use flate2::Compress;
2
3const BUF_SIZE: usize = 4096 * 8;
4
5/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
6///
7/// Be sure to call `flush()` when done to finalize the deflate stream.
8pub struct Write<W> {
9    compressor: Compress,
10    inner: W,
11    buf: [u8; BUF_SIZE],
12}
13
14mod impls {
15    use std::io;
16
17    use flate2::{Compress, Compression, FlushCompress, Status};
18
19    use crate::zlib::stream::deflate;
20
21    impl<W> deflate::Write<W>
22    where
23        W: io::Write,
24    {
25        /// Create a new instance writing compressed bytes to `inner`.
26        pub fn new(inner: W) -> deflate::Write<W> {
27            deflate::Write {
28                compressor: Compress::new(Compression::fast(), true),
29                inner,
30                buf: [0; deflate::BUF_SIZE],
31            }
32        }
33
34        /// Reset the compressor, starting a new compression stream.
35        ///
36        /// That way multiple streams can be written to the same inner writer.
37        pub fn reset(&mut self) {
38            self.compressor.reset();
39        }
40
41        /// Consume `self` and return the inner writer.
42        pub fn into_inner(self) -> W {
43            self.inner
44        }
45
46        fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
47            let total_in_when_start = self.compressor.total_in();
48            loop {
49                let last_total_in = self.compressor.total_in();
50                let last_total_out = self.compressor.total_out();
51
52                let status = self
53                    .compressor
54                    .compress(buf, &mut self.buf, flush)
55                    .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
56
57                let written = self.compressor.total_out() - last_total_out;
58                if written > 0 {
59                    self.inner.write_all(&self.buf[..written as usize])?;
60                }
61
62                match status {
63                    Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
64                    Status::Ok | Status::BufError => {
65                        let consumed = self.compressor.total_in() - last_total_in;
66                        buf = &buf[consumed as usize..];
67
68                        // output buffer still makes progress
69                        if self.compressor.total_out() > last_total_out {
70                            continue;
71                        }
72                        // input still makes progress
73                        if self.compressor.total_in() > last_total_in {
74                            continue;
75                        }
76                        // input also makes no progress anymore, need more so leave with what we have
77                        return Ok((self.compressor.total_in() - total_in_when_start) as usize);
78                    }
79                }
80            }
81        }
82    }
83
84    impl<W: io::Write> io::Write for deflate::Write<W> {
85        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86            self.write_inner(buf, FlushCompress::None)
87        }
88
89        fn flush(&mut self) -> io::Result<()> {
90            self.write_inner(&[], FlushCompress::Finish).map(|_| ())
91        }
92    }
93}
94
95#[cfg(test)]
96mod tests;