compression/
compression_stream.rs1use crate::{ffi, util, Algorithm, CompressionError, Result};
2use std::ffi::c_void;
3use std::ptr::NonNull;
4
5const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum StreamOperation {
9 Encode,
10 Decode,
11}
12
13impl StreamOperation {
14 const fn operation_name(self) -> &'static str {
15 match self {
16 Self::Encode => "compression_stream_process(encode)",
17 Self::Decode => "compression_stream_process(decode)",
18 }
19 }
20
21 const fn init_name(self) -> &'static str {
22 match self {
23 Self::Encode => "compression_stream_init(encode)",
24 Self::Decode => "compression_stream_init(decode)",
25 }
26 }
27
28 const fn as_raw(self) -> i32 {
29 match self {
30 Self::Encode => 0,
31 Self::Decode => 1,
32 }
33 }
34}
35
36#[derive(Debug)]
37pub struct CompressionStream {
38 handle: NonNull<c_void>,
39 algorithm: Algorithm,
40 operation: StreamOperation,
41 finished: bool,
42}
43
44impl CompressionStream {
45 pub fn new(operation: StreamOperation, algorithm: Algorithm) -> Result<Self> {
46 let handle = unsafe {
47 ffi::compression_stream::compression_rs_compression_stream_create(
48 operation.as_raw(),
49 algorithm.as_raw(),
50 )
51 };
52 Ok(Self {
53 handle: util::nonnull_handle(handle, operation.init_name())?,
54 algorithm,
55 operation,
56 finished: false,
57 })
58 }
59
60 fn as_ptr(&self) -> *mut c_void {
61 self.handle.as_ptr()
62 }
63
64 pub fn process(&mut self, mut input: &[u8], finalize: bool) -> Result<Vec<u8>> {
65 if self.finished {
66 return Err(CompressionError::StreamFinished {
67 operation: self.operation.operation_name(),
68 algorithm: self.algorithm,
69 });
70 }
71
72 let flags = if finalize {
73 ffi::compression_stream::COMPRESSION_STREAM_FINALIZE
74 } else {
75 0
76 };
77 let mut output = Vec::new();
78
79 loop {
80 let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
81 let mut src_remaining = input.len();
82 let mut dst_remaining = buffer.len();
83 let status = unsafe {
84 ffi::compression_stream::compression_rs_compression_stream_process(
85 self.as_ptr(),
86 input.as_ptr(),
87 input.len(),
88 buffer.as_mut_ptr(),
89 buffer.len(),
90 flags,
91 &mut src_remaining,
92 &mut dst_remaining,
93 )
94 };
95
96 let consumed = input.len() - src_remaining;
97 let produced = buffer.len() - dst_remaining;
98 output.extend_from_slice(&buffer[..produced]);
99 input = &input[consumed..];
100
101 match status {
102 ffi::compression_stream::COMPRESSION_STATUS_END => {
103 self.finished = true;
104 return Ok(output);
105 }
106 ffi::compression_stream::COMPRESSION_STATUS_OK => {
107 let input_consumed = src_remaining == 0;
108 let destination_exhausted = dst_remaining == 0;
109
110 if !finalize && input_consumed {
111 return Ok(output);
112 }
113
114 if destination_exhausted || !input_consumed {
115 continue;
116 }
117
118 if produced == 0 {
119 return Err(CompressionError::StreamStalled {
120 operation: self.operation.operation_name(),
121 algorithm: self.algorithm,
122 });
123 }
124 }
125 other => {
126 return Err(CompressionError::StreamProcessFailed {
127 operation: self.operation.operation_name(),
128 algorithm: self.algorithm,
129 status: other,
130 });
131 }
132 }
133 }
134 }
135
136 pub fn finish(&mut self) -> Result<Vec<u8>> {
137 if self.finished {
138 Ok(Vec::new())
139 } else {
140 self.process(&[], true)
141 }
142 }
143}
144
145impl Drop for CompressionStream {
146 fn drop(&mut self) {
147 unsafe {
148 ffi::compression_stream::compression_rs_compression_stream_release(self.as_ptr());
149 }
150 }
151}
152
153#[derive(Debug)]
154pub struct Encoder {
155 inner: CompressionStream,
156}
157
158impl Encoder {
159 pub fn new(algorithm: Algorithm) -> Result<Self> {
160 Ok(Self {
161 inner: CompressionStream::new(StreamOperation::Encode, algorithm)?,
162 })
163 }
164
165 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
166 self.inner.process(input, false)
167 }
168
169 pub fn finish(&mut self) -> Result<Vec<u8>> {
170 self.inner.finish()
171 }
172}
173
174#[derive(Debug)]
175pub struct Decoder {
176 inner: CompressionStream,
177}
178
179impl Decoder {
180 pub fn new(algorithm: Algorithm) -> Result<Self> {
181 Ok(Self {
182 inner: CompressionStream::new(StreamOperation::Decode, algorithm)?,
183 })
184 }
185
186 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
187 self.inner.process(input, false)
188 }
189
190 pub fn finish(&mut self) -> Result<Vec<u8>> {
191 self.inner.finish()
192 }
193}