Skip to main content

compression/
compression_stream.rs

1use crate::{ffi, util, Algorithm, CompressionError, Result};
2use std::ffi::c_void;
3use std::ptr::NonNull;
4
5const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
6
7/// Wraps `compression_stream_operation` values.
8#[derive(Clone, Copy, Debug, Eq, PartialEq)]
9pub enum StreamOperation {
10    /// Wraps `COMPRESSION_STREAM_ENCODE`.
11    Encode,
12    /// Wraps `COMPRESSION_STREAM_DECODE`.
13    Decode,
14}
15
16impl StreamOperation {
17    const fn operation_name(self) -> &'static str {
18        match self {
19            Self::Encode => "compression_stream_process(encode)",
20            Self::Decode => "compression_stream_process(decode)",
21        }
22    }
23
24    const fn init_name(self) -> &'static str {
25        match self {
26            Self::Encode => "compression_stream_init(encode)",
27            Self::Decode => "compression_stream_init(decode)",
28        }
29    }
30
31    const fn as_raw(self) -> i32 {
32        match self {
33            Self::Encode => 0,
34            Self::Decode => 1,
35        }
36    }
37}
38
39/// Wraps a `compression_stream` handle.
40#[derive(Debug)]
41pub struct CompressionStream {
42    handle: NonNull<c_void>,
43    algorithm: Algorithm,
44    operation: StreamOperation,
45    finished: bool,
46}
47
48impl CompressionStream {
49    /// Wraps `compression_stream_create`.
50    pub fn new(operation: StreamOperation, algorithm: Algorithm) -> Result<Self> {
51        let handle = unsafe {
52            ffi::compression_stream::compression_rs_compression_stream_create(
53                operation.as_raw(),
54                algorithm.as_raw(),
55            )
56        };
57        Ok(Self {
58            handle: util::nonnull_handle(handle, operation.init_name())?,
59            algorithm,
60            operation,
61            finished: false,
62        })
63    }
64
65    fn as_ptr(&self) -> *mut c_void {
66        self.handle.as_ptr()
67    }
68
69    /// Wraps `compression_stream_process`.
70    pub fn process(&mut self, mut input: &[u8], finalize: bool) -> Result<Vec<u8>> {
71        if self.finished {
72            return Err(CompressionError::StreamFinished {
73                operation: self.operation.operation_name(),
74                algorithm: self.algorithm,
75            });
76        }
77
78        let flags = if finalize {
79            ffi::compression_stream::COMPRESSION_STREAM_FINALIZE
80        } else {
81            0
82        };
83        let mut output = Vec::new();
84
85        loop {
86            let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
87            let mut src_remaining = input.len();
88            let mut dst_remaining = buffer.len();
89            let status = unsafe {
90                ffi::compression_stream::compression_rs_compression_stream_process(
91                    self.as_ptr(),
92                    input.as_ptr(),
93                    input.len(),
94                    buffer.as_mut_ptr(),
95                    buffer.len(),
96                    flags,
97                    &mut src_remaining,
98                    &mut dst_remaining,
99                )
100            };
101
102            let consumed = input.len() - src_remaining;
103            let produced = buffer.len() - dst_remaining;
104            output.extend_from_slice(&buffer[..produced]);
105            input = &input[consumed..];
106
107            match status {
108                ffi::compression_stream::COMPRESSION_STATUS_END => {
109                    self.finished = true;
110                    return Ok(output);
111                }
112                ffi::compression_stream::COMPRESSION_STATUS_OK => {
113                    let input_consumed = src_remaining == 0;
114                    let destination_exhausted = dst_remaining == 0;
115
116                    if !finalize && input_consumed {
117                        return Ok(output);
118                    }
119
120                    if destination_exhausted || !input_consumed {
121                        continue;
122                    }
123
124                    if produced == 0 {
125                        return Err(CompressionError::StreamStalled {
126                            operation: self.operation.operation_name(),
127                            algorithm: self.algorithm,
128                        });
129                    }
130                }
131                other => {
132                    return Err(CompressionError::StreamProcessFailed {
133                        operation: self.operation.operation_name(),
134                        algorithm: self.algorithm,
135                        status: other,
136                    });
137                }
138            }
139        }
140    }
141
142    /// Wraps `compression_stream_release`.
143    pub fn finish(&mut self) -> Result<Vec<u8>> {
144        if self.finished {
145            Ok(Vec::new())
146        } else {
147            self.process(&[], true)
148        }
149    }
150}
151
152impl Drop for CompressionStream {
153    fn drop(&mut self) {
154        unsafe {
155            ffi::compression_stream::compression_rs_compression_stream_release(self.as_ptr());
156        }
157    }
158}
159
160/// Wraps `compression_stream_process` for encoding.
161#[derive(Debug)]
162pub struct Encoder {
163    inner: CompressionStream,
164}
165
166impl Encoder {
167    /// Wraps `new` convenience around `compression_stream_process`.
168    pub fn new(algorithm: Algorithm) -> Result<Self> {
169        Ok(Self {
170            inner: CompressionStream::new(StreamOperation::Encode, algorithm)?,
171        })
172    }
173
174    /// Wraps `process` convenience around `compression_stream_process`.
175    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
176        self.inner.process(input, false)
177    }
178
179    /// Wraps `finish` convenience around `compression_stream_process`.
180    pub fn finish(&mut self) -> Result<Vec<u8>> {
181        self.inner.finish()
182    }
183}
184
185/// Wraps `compression_stream_process` for decoding.
186#[derive(Debug)]
187pub struct Decoder {
188    inner: CompressionStream,
189}
190
191impl Decoder {
192    /// Wraps `new` convenience around `compression_stream_process`.
193    pub fn new(algorithm: Algorithm) -> Result<Self> {
194        Ok(Self {
195            inner: CompressionStream::new(StreamOperation::Decode, algorithm)?,
196        })
197    }
198
199    /// Wraps `process` convenience around `compression_stream_process`.
200    pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
201        self.inner.process(input, false)
202    }
203
204    /// Wraps `finish` convenience around `compression_stream_process`.
205    pub fn finish(&mut self) -> Result<Vec<u8>> {
206        self.inner.finish()
207    }
208}