1use crate::{error::Result, ffi, Algorithm, CompressionError};
2
3const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6enum StreamKind {
7 Encode,
8 Decode,
9}
10
11impl StreamKind {
12 fn operation_name(self) -> &'static str {
13 match self {
14 Self::Encode => "compression_stream_process(encode)",
15 Self::Decode => "compression_stream_process(decode)",
16 }
17 }
18
19 fn init_name(self) -> &'static str {
20 match self {
21 Self::Encode => "compression_stream_init(encode)",
22 Self::Decode => "compression_stream_init(decode)",
23 }
24 }
25
26 fn to_ffi(self) -> ffi::compression_stream_operation {
27 match self {
28 Self::Encode => ffi::COMPRESSION_STREAM_ENCODE,
29 Self::Decode => ffi::COMPRESSION_STREAM_DECODE,
30 }
31 }
32}
33
34#[derive(Debug)]
35struct CompressionStream {
36 raw: ffi::compression_stream,
37 algorithm: Algorithm,
38 kind: StreamKind,
39 finished: bool,
40}
41
42impl CompressionStream {
43 fn new(kind: StreamKind, algorithm: Algorithm) -> Result<Self> {
44 let mut raw = ffi::compression_stream::default();
45 let status =
46 unsafe { ffi::compression_stream_init(&mut raw, kind.to_ffi(), algorithm.as_ffi()) };
47 if status != ffi::COMPRESSION_STATUS_OK {
48 return Err(CompressionError::StreamInitFailed {
49 operation: kind.init_name(),
50 algorithm,
51 });
52 }
53
54 Ok(Self {
55 raw,
56 algorithm,
57 kind,
58 finished: false,
59 })
60 }
61
62 fn process(&mut self, input: &[u8], finalize: bool) -> Result<Vec<u8>> {
63 if self.finished {
64 return Err(CompressionError::StreamFinished {
65 operation: self.kind.operation_name(),
66 algorithm: self.algorithm,
67 });
68 }
69
70 self.raw.src_ptr = input.as_ptr();
71 self.raw.src_size = input.len();
72 let flags = if finalize {
73 ffi::COMPRESSION_STREAM_FINALIZE
74 } else {
75 0
76 };
77
78 let mut output = Vec::new();
79
80 loop {
81 let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
82 self.raw.dst_ptr = buffer.as_mut_ptr();
83 self.raw.dst_size = buffer.len();
84
85 let status = unsafe { ffi::compression_stream_process(&mut self.raw, flags) };
86 let produced = buffer.len() - self.raw.dst_size;
87 output.extend_from_slice(&buffer[..produced]);
88
89 match status {
90 ffi::COMPRESSION_STATUS_END => {
91 self.finished = true;
92 return Ok(output);
93 }
94 ffi::COMPRESSION_STATUS_OK => {
95 let input_consumed = self.raw.src_size == 0;
96 let destination_exhausted = self.raw.dst_size == 0;
97
98 if !finalize && input_consumed {
99 return Ok(output);
100 }
101
102 if destination_exhausted || !input_consumed {
103 continue;
104 }
105
106 if produced == 0 {
107 return Err(CompressionError::StreamStalled {
108 operation: self.kind.operation_name(),
109 algorithm: self.algorithm,
110 });
111 }
112 }
113 _ => {
114 return Err(CompressionError::StreamProcessFailed {
115 operation: self.kind.operation_name(),
116 algorithm: self.algorithm,
117 });
118 }
119 }
120 }
121 }
122}
123
124impl Drop for CompressionStream {
125 fn drop(&mut self) {
126 if !self.raw.state.is_null() {
127 let _ = unsafe { ffi::compression_stream_destroy(&mut self.raw) };
128 self.raw.state = std::ptr::null_mut();
129 }
130 }
131}
132
133#[derive(Debug)]
134pub struct Encoder {
135 inner: CompressionStream,
136}
137
138impl Encoder {
139 pub fn new(algorithm: Algorithm) -> Result<Self> {
140 Ok(Self {
141 inner: CompressionStream::new(StreamKind::Encode, algorithm)?,
142 })
143 }
144
145 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
146 self.inner.process(input, false)
147 }
148
149 pub fn finish(&mut self) -> Result<Vec<u8>> {
150 self.inner.process(&[], true)
151 }
152}
153
154#[derive(Debug)]
155pub struct Decoder {
156 inner: CompressionStream,
157}
158
159impl Decoder {
160 pub fn new(algorithm: Algorithm) -> Result<Self> {
161 Ok(Self {
162 inner: CompressionStream::new(StreamKind::Decode, algorithm)?,
163 })
164 }
165
166 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
167 self.inner.process(input, false)
168 }
169
170 pub fn finish(&mut self) -> Result<Vec<u8>> {
171 self.inner.process(&[], true)
172 }
173}