compression_codecs/flate/
encoder.rs

1use crate::{flate::params::FlateEncoderParams, Encode};
2use compression_core::util::PartialBuffer;
3use flate2::{Compress, FlushCompress, Status};
4use std::io;
5
6#[derive(Debug)]
7pub struct FlateEncoder {
8    compress: Compress,
9    flushed: bool,
10}
11
12impl FlateEncoder {
13    pub fn new(level: FlateEncoderParams, zlib_header: bool) -> Self {
14        let level = flate2::Compression::from(level);
15        Self {
16            compress: Compress::new(level, zlib_header),
17            flushed: true,
18        }
19    }
20
21    pub fn get_ref(&self) -> &Compress {
22        &self.compress
23    }
24
25    fn encode(
26        &mut self,
27        input: &mut PartialBuffer<impl AsRef<[u8]>>,
28        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
29        flush: FlushCompress,
30    ) -> io::Result<Status> {
31        let prior_in = self.compress.total_in();
32        let prior_out = self.compress.total_out();
33
34        let status = self
35            .compress
36            .compress(input.unwritten(), output.unwritten_mut(), flush)?;
37
38        input.advance((self.compress.total_in() - prior_in) as usize);
39        output.advance((self.compress.total_out() - prior_out) as usize);
40
41        Ok(status)
42    }
43}
44
45impl Encode for FlateEncoder {
46    fn encode(
47        &mut self,
48        input: &mut PartialBuffer<impl AsRef<[u8]>>,
49        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
50    ) -> io::Result<()> {
51        self.flushed = false;
52        match self.encode(input, output, FlushCompress::None)? {
53            Status::Ok => Ok(()),
54            Status::StreamEnd => unreachable!(),
55            Status::BufError => Err(io::Error::other("unexpected BufError")),
56        }
57    }
58
59    fn flush(
60        &mut self,
61        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
62    ) -> io::Result<bool> {
63        // We need to keep track of whether we've already flushed otherwise we'll just keep writing
64        // out sync blocks continuously and probably never complete flushing.
65        if self.flushed {
66            return Ok(true);
67        }
68
69        self.encode(
70            &mut PartialBuffer::new(&[][..]),
71            output,
72            FlushCompress::Sync,
73        )?;
74
75        loop {
76            let old_len = output.written().len();
77            self.encode(
78                &mut PartialBuffer::new(&[][..]),
79                output,
80                FlushCompress::None,
81            )?;
82            if output.written().len() == old_len {
83                break;
84            }
85        }
86
87        let internal_flushed = !output.unwritten().is_empty();
88        self.flushed = internal_flushed;
89        Ok(internal_flushed)
90    }
91
92    fn finish(
93        &mut self,
94        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
95    ) -> io::Result<bool> {
96        self.flushed = false;
97        match self.encode(
98            &mut PartialBuffer::new(&[][..]),
99            output,
100            FlushCompress::Finish,
101        )? {
102            Status::Ok => Ok(false),
103            Status::StreamEnd => Ok(true),
104            Status::BufError => Err(io::Error::other("unexpected BufError")),
105        }
106    }
107}