Skip to main content

compression/
compression_stream.rs

1use crate::{ffi, util, Algorithm, CompressionError, Result};
2use std::ffi::c_void;
3use std::ptr::NonNull;
4
5const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum StreamOperation {
9    Encode,
10    Decode,
11}
12
13impl StreamOperation {
14    const fn operation_name(self) -> &'static str {
15        match self {
16            Self::Encode => "compression_stream_process(encode)",
17            Self::Decode => "compression_stream_process(decode)",
18        }
19    }
20
21    const fn init_name(self) -> &'static str {
22        match self {
23            Self::Encode => "compression_stream_init(encode)",
24            Self::Decode => "compression_stream_init(decode)",
25        }
26    }
27
28    const fn as_raw(self) -> i32 {
29        match self {
30            Self::Encode => 0,
31            Self::Decode => 1,
32        }
33    }
34}
35
36#[derive(Debug)]
37pub struct CompressionStream {
38    handle: NonNull<c_void>,
39    algorithm: Algorithm,
40    operation: StreamOperation,
41    finished: bool,
42}
43
44impl CompressionStream {
45    pub fn new(operation: StreamOperation, algorithm: Algorithm) -> Result<Self> {
46        let handle = unsafe {
47            ffi::compression_stream::compression_rs_compression_stream_create(
48                operation.as_raw(),
49                algorithm.as_raw(),
50            )
51        };
52        Ok(Self {
53            handle: util::nonnull_handle(handle, operation.init_name())?,
54            algorithm,
55            operation,
56            finished: false,
57        })
58    }
59
60    fn as_ptr(&self) -> *mut c_void {
61        self.handle.as_ptr()
62    }
63
64    pub fn process(&mut self, mut input: &[u8], finalize: bool) -> Result<Vec<u8>> {
65        if self.finished {
66            return Err(CompressionError::StreamFinished {
67                operation: self.operation.operation_name(),
68                algorithm: self.algorithm,
69            });
70        }
71
72        let flags = if finalize {
73            ffi::compression_stream::COMPRESSION_STREAM_FINALIZE
74        } else {
75            0
76        };
77        let mut output = Vec::new();
78
79        loop {
80            let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
81            let mut src_remaining = input.len();
82            let mut dst_remaining = buffer.len();
83            let status = unsafe {
84                ffi::compression_stream::compression_rs_compression_stream_process(
85                    self.as_ptr(),
86                    input.as_ptr(),
87                    input.len(),
88                    buffer.as_mut_ptr(),
89                    buffer.len(),
90                    flags,
91                    &mut src_remaining,
92                    &mut dst_remaining,
93                )
94            };
95
96            let consumed = input.len() - src_remaining;
97            let produced = buffer.len() - dst_remaining;
98            output.extend_from_slice(&buffer[..produced]);
99            input = &input[consumed..];
100
101            match status {
102                ffi::compression_stream::COMPRESSION_STATUS_END => {
103                    self.finished = true;
104                    return Ok(output);
105                }
106                ffi::compression_stream::COMPRESSION_STATUS_OK => {
107                    let input_consumed = src_remaining == 0;
108                    let destination_exhausted = dst_remaining == 0;
109
110                    if !finalize && input_consumed {
111                        return Ok(output);
112                    }
113
114                    if destination_exhausted || !input_consumed {
115                        continue;
116                    }
117
118                    if produced == 0 {
119                        return Err(CompressionError::StreamStalled {
120                            operation: self.operation.operation_name(),
121                            algorithm: self.algorithm,
122                        });
123                    }
124                }
125                other => {
126                    return Err(CompressionError::StreamProcessFailed {
127                        operation: self.operation.operation_name(),
128                        algorithm: self.algorithm,
129                        status: other,
130                    });
131                }
132            }
133        }
134    }
135
136    pub fn finish(&mut self) -> Result<Vec<u8>> {
137        if self.finished {
138            Ok(Vec::new())
139        } else {
140            self.process(&[], true)
141        }
142    }
143}
144
145impl Drop for CompressionStream {
146    fn drop(&mut self) {
147        unsafe {
148            ffi::compression_stream::compression_rs_compression_stream_release(self.as_ptr());
149        }
150    }
151}
152
153#[derive(Debug)]
154pub struct Encoder {
155    inner: CompressionStream,
156}
157
158impl Encoder {
159    pub fn new(algorithm: Algorithm) -> Result<Self> {
160        Ok(Self {
161            inner: CompressionStream::new(StreamOperation::Encode, algorithm)?,
162        })
163    }
164
165    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
166        self.inner.process(input, false)
167    }
168
169    pub fn finish(&mut self) -> Result<Vec<u8>> {
170        self.inner.finish()
171    }
172}
173
174#[derive(Debug)]
175pub struct Decoder {
176    inner: CompressionStream,
177}
178
179impl Decoder {
180    pub fn new(algorithm: Algorithm) -> Result<Self> {
181        Ok(Self {
182            inner: CompressionStream::new(StreamOperation::Decode, algorithm)?,
183        })
184    }
185
186    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
187        self.inner.process(input, false)
188    }
189
190    pub fn finish(&mut self) -> Result<Vec<u8>> {
191        self.inner.finish()
192    }
193}