Skip to main content

async_compression_issue_150_workaround/codec/flate/
encoder.rs

1use crate::{codec::Encode, util::PartialBuffer};
2use std::io::{Error, ErrorKind, Result};
3
4use flate2::{Compress, Compression, FlushCompress, Status};
5
6#[derive(Debug)]
7pub struct FlateEncoder {
8    compress: Compress,
9    flushed: bool,
10}
11
12impl FlateEncoder {
13    pub(crate) fn new(level: Compression, zlib_header: bool) -> Self {
14        Self {
15            compress: Compress::new(level, zlib_header),
16            flushed: true,
17        }
18    }
19
20    fn encode(
21        &mut self,
22        input: &mut PartialBuffer<impl AsRef<[u8]>>,
23        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
24        flush: FlushCompress,
25    ) -> Result<Status> {
26        let prior_in = self.compress.total_in();
27        let prior_out = self.compress.total_out();
28
29        let status = self
30            .compress
31            .compress(input.unwritten(), output.unwritten_mut(), flush)?;
32
33        input.advance((self.compress.total_in() - prior_in) as usize);
34        output.advance((self.compress.total_out() - prior_out) as usize);
35
36        Ok(status)
37    }
38}
39
40impl Encode for FlateEncoder {
41    fn encode(
42        &mut self,
43        input: &mut PartialBuffer<impl AsRef<[u8]>>,
44        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
45    ) -> Result<()> {
46        self.flushed = false;
47        match self.encode(input, output, FlushCompress::None)? {
48            Status::Ok => Ok(()),
49            Status::StreamEnd => unreachable!(),
50            Status::BufError => Err(Error::new(ErrorKind::Other, "unexpected BufError")),
51        }
52    }
53
54    fn flush(
55        &mut self,
56        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
57    ) -> Result<bool> {
58        // We need to keep track of whether we've already flushed otherwise we'll just keep writing
59        // out sync blocks continuously and probably never complete flushing.
60        if self.flushed {
61            return Ok(true);
62        }
63
64        self.encode(
65            &mut PartialBuffer::new(&[][..]),
66            output,
67            FlushCompress::Sync,
68        )?;
69
70        loop {
71            let old_len = output.written().len();
72            self.encode(
73                &mut PartialBuffer::new(&[][..]),
74                output,
75                FlushCompress::None,
76            )?;
77            if output.written().len() == old_len {
78                break;
79            }
80        }
81
82        self.flushed = true;
83        Ok(!output.unwritten().is_empty())
84    }
85
86    fn finish(
87        &mut self,
88        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
89    ) -> Result<bool> {
90        self.flushed = false;
91        match self.encode(
92            &mut PartialBuffer::new(&[][..]),
93            output,
94            FlushCompress::Finish,
95        )? {
96            Status::Ok => Ok(false),
97            Status::StreamEnd => Ok(true),
98            Status::BufError => Err(Error::new(ErrorKind::Other, "unexpected BufError")),
99        }
100    }
101}