compression_codecs/flate/
encoder.rs

1use crate::{flate::params::FlateEncoderParams, EncodeV2};
2use compression_core::util::{PartialBuffer, WriteBuffer};
3use flate2::{Compress, FlushCompress, Status};
4use std::io;
5
6#[derive(Debug)]
7pub struct FlateEncoder {
8    compress: Compress,
9    flushed: bool,
10}
11
12impl FlateEncoder {
13    pub fn new(level: FlateEncoderParams, zlib_header: bool) -> Self {
14        let level = flate2::Compression::from(level);
15        Self {
16            compress: Compress::new(level, zlib_header),
17            flushed: true,
18        }
19    }
20
21    pub fn get_ref(&self) -> &Compress {
22        &self.compress
23    }
24
25    fn encode(
26        &mut self,
27        input: &mut PartialBuffer<&[u8]>,
28        output: &mut WriteBuffer<'_>,
29        flush: FlushCompress,
30    ) -> io::Result<Status> {
31        let prior_in = self.compress.total_in();
32        let prior_out = self.compress.total_out();
33
34        let result =
35            self.compress
36                .compress(input.unwritten(), output.initialize_unwritten(), flush);
37
38        input.advance((self.compress.total_in() - prior_in) as usize);
39        output.advance((self.compress.total_out() - prior_out) as usize);
40
41        Ok(result?)
42    }
43}
44
45impl EncodeV2 for FlateEncoder {
46    fn encode(
47        &mut self,
48        input: &mut PartialBuffer<&[u8]>,
49        output: &mut WriteBuffer<'_>,
50    ) -> io::Result<()> {
51        self.flushed = false;
52        match self.encode(input, output, FlushCompress::None)? {
53            Status::Ok => Ok(()),
54            Status::StreamEnd => unreachable!(),
55            Status::BufError => Err(io::Error::other("unexpected BufError")),
56        }
57    }
58
59    fn flush(&mut self, output: &mut WriteBuffer<'_>) -> io::Result<bool> {
60        // We need to keep track of whether we've already flushed otherwise we'll just keep writing
61        // out sync blocks continuously and probably never complete flushing.
62        if self.flushed {
63            return Ok(true);
64        }
65
66        self.encode(
67            &mut PartialBuffer::new(&[][..]),
68            output,
69            FlushCompress::Sync,
70        )?;
71
72        loop {
73            let old_len = output.written_len();
74            self.encode(
75                &mut PartialBuffer::new(&[][..]),
76                output,
77                FlushCompress::None,
78            )?;
79            if output.written_len() == old_len {
80                break;
81            }
82        }
83
84        let internal_flushed = !output.has_no_spare_space();
85        self.flushed = internal_flushed;
86        Ok(internal_flushed)
87    }
88
89    fn finish(&mut self, output: &mut WriteBuffer<'_>) -> io::Result<bool> {
90        self.flushed = false;
91        match self.encode(
92            &mut PartialBuffer::new(&[][..]),
93            output,
94            FlushCompress::Finish,
95        )? {
96            Status::Ok => Ok(false),
97            Status::StreamEnd => Ok(true),
98            Status::BufError => Err(io::Error::other("unexpected BufError")),
99        }
100    }
101}