Skip to main content

compression/
stream.rs

1use crate::{error::Result, ffi, Algorithm, CompressionError};
2
3const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6enum StreamKind {
7    Encode,
8    Decode,
9}
10
11impl StreamKind {
12    fn operation_name(self) -> &'static str {
13        match self {
14            Self::Encode => "compression_stream_process(encode)",
15            Self::Decode => "compression_stream_process(decode)",
16        }
17    }
18
19    fn init_name(self) -> &'static str {
20        match self {
21            Self::Encode => "compression_stream_init(encode)",
22            Self::Decode => "compression_stream_init(decode)",
23        }
24    }
25
26    fn to_ffi(self) -> ffi::compression_stream_operation {
27        match self {
28            Self::Encode => ffi::COMPRESSION_STREAM_ENCODE,
29            Self::Decode => ffi::COMPRESSION_STREAM_DECODE,
30        }
31    }
32}
33
34#[derive(Debug)]
35struct CompressionStream {
36    raw: ffi::compression_stream,
37    algorithm: Algorithm,
38    kind: StreamKind,
39    finished: bool,
40}
41
42impl CompressionStream {
43    fn new(kind: StreamKind, algorithm: Algorithm) -> Result<Self> {
44        let mut raw = ffi::compression_stream::default();
45        let status =
46            unsafe { ffi::compression_stream_init(&mut raw, kind.to_ffi(), algorithm.as_ffi()) };
47        if status != ffi::COMPRESSION_STATUS_OK {
48            return Err(CompressionError::StreamInitFailed {
49                operation: kind.init_name(),
50                algorithm,
51            });
52        }
53
54        Ok(Self {
55            raw,
56            algorithm,
57            kind,
58            finished: false,
59        })
60    }
61
62    fn process(&mut self, input: &[u8], finalize: bool) -> Result<Vec<u8>> {
63        if self.finished {
64            return Err(CompressionError::StreamFinished {
65                operation: self.kind.operation_name(),
66                algorithm: self.algorithm,
67            });
68        }
69
70        self.raw.src_ptr = input.as_ptr();
71        self.raw.src_size = input.len();
72        let flags = if finalize {
73            ffi::COMPRESSION_STREAM_FINALIZE
74        } else {
75            0
76        };
77
78        let mut output = Vec::new();
79
80        loop {
81            let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
82            self.raw.dst_ptr = buffer.as_mut_ptr();
83            self.raw.dst_size = buffer.len();
84
85            let status = unsafe { ffi::compression_stream_process(&mut self.raw, flags) };
86            let produced = buffer.len() - self.raw.dst_size;
87            output.extend_from_slice(&buffer[..produced]);
88
89            match status {
90                ffi::COMPRESSION_STATUS_END => {
91                    self.finished = true;
92                    return Ok(output);
93                }
94                ffi::COMPRESSION_STATUS_OK => {
95                    let input_consumed = self.raw.src_size == 0;
96                    let destination_exhausted = self.raw.dst_size == 0;
97
98                    if !finalize && input_consumed {
99                        return Ok(output);
100                    }
101
102                    if destination_exhausted || !input_consumed {
103                        continue;
104                    }
105
106                    if produced == 0 {
107                        return Err(CompressionError::StreamStalled {
108                            operation: self.kind.operation_name(),
109                            algorithm: self.algorithm,
110                        });
111                    }
112                }
113                _ => {
114                    return Err(CompressionError::StreamProcessFailed {
115                        operation: self.kind.operation_name(),
116                        algorithm: self.algorithm,
117                    });
118                }
119            }
120        }
121    }
122}
123
124impl Drop for CompressionStream {
125    fn drop(&mut self) {
126        if !self.raw.state.is_null() {
127            let _ = unsafe { ffi::compression_stream_destroy(&mut self.raw) };
128            self.raw.state = std::ptr::null_mut();
129        }
130    }
131}
132
133#[derive(Debug)]
134pub struct Encoder {
135    inner: CompressionStream,
136}
137
138impl Encoder {
139    pub fn new(algorithm: Algorithm) -> Result<Self> {
140        Ok(Self {
141            inner: CompressionStream::new(StreamKind::Encode, algorithm)?,
142        })
143    }
144
145    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
146        self.inner.process(input, false)
147    }
148
149    pub fn finish(&mut self) -> Result<Vec<u8>> {
150        self.inner.process(&[], true)
151    }
152}
153
154#[derive(Debug)]
155pub struct Decoder {
156    inner: CompressionStream,
157}
158
159impl Decoder {
160    pub fn new(algorithm: Algorithm) -> Result<Self> {
161        Ok(Self {
162            inner: CompressionStream::new(StreamKind::Decode, algorithm)?,
163        })
164    }
165
166    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
167        self.inner.process(input, false)
168    }
169
170    pub fn finish(&mut self) -> Result<Vec<u8>> {
171        self.inner.process(&[], true)
172    }
173}