Skip to main content

async_compression_issue_150_workaround/codec/bzip2/
decoder.rs

1use crate::{codec::Decode, util::PartialBuffer};
2use std::fmt;
3use std::io::{Error, ErrorKind, Result};
4
5use bzip2::{Decompress, Status};
6
7pub struct BzDecoder {
8    decompress: Decompress,
9}
10
11impl fmt::Debug for BzDecoder {
12    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13        write!(
14            f,
15            "BzDecoder {{total_in: {}, total_out: {}}}",
16            self.decompress.total_in(),
17            self.decompress.total_out()
18        )
19    }
20}
21
22impl BzDecoder {
23    pub fn new() -> Self {
24        Self {
25            decompress: Decompress::new(false),
26        }
27    }
28
29    fn decode(
30        &mut self,
31        input: &mut PartialBuffer<impl AsRef<[u8]>>,
32        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
33    ) -> Result<Status> {
34        let prior_in = self.decompress.total_in();
35        let prior_out = self.decompress.total_out();
36
37        let status = self
38            .decompress
39            .decompress(input.unwritten(), output.unwritten_mut())
40            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
41
42        input.advance((self.decompress.total_in() - prior_in) as usize);
43        output.advance((self.decompress.total_out() - prior_out) as usize);
44
45        Ok(status)
46    }
47}
48
49impl Decode for BzDecoder {
50    fn reinit(&mut self) -> Result<()> {
51        self.decompress = Decompress::new(false);
52        Ok(())
53    }
54
55    fn decode(
56        &mut self,
57        input: &mut PartialBuffer<impl AsRef<[u8]>>,
58        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
59    ) -> Result<bool> {
60        match self.decode(input, output)? {
61            // Decompression went fine, nothing much to report.
62            Status::Ok => Ok(false),
63
64            // The Flush action on a compression went ok.
65            Status::FlushOk => unreachable!(),
66
67            // THe Run action on compression went ok.
68            Status::RunOk => unreachable!(),
69
70            // The Finish action on compression went ok.
71            Status::FinishOk => unreachable!(),
72
73            // The stream's end has been met, meaning that no more data can be input.
74            Status::StreamEnd => Ok(true),
75
76            // There was insufficient memory in the input or output buffer to complete
77            // the request, but otherwise everything went normally.
78            Status::MemNeeded => Err(Error::new(ErrorKind::Other, "out of memory")),
79        }
80    }
81
82    fn flush(
83        &mut self,
84        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
85    ) -> Result<bool> {
86        self.decode(&mut PartialBuffer::new(&[][..]), output)?;
87
88        loop {
89            let old_len = output.written().len();
90            self.decode(&mut PartialBuffer::new(&[][..]), output)?;
91            if output.written().len() == old_len {
92                break;
93            }
94        }
95
96        Ok(!output.unwritten().is_empty())
97    }
98
99    fn finish(
100        &mut self,
101        _output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
102    ) -> Result<bool> {
103        Ok(true)
104    }
105}