compression/
compression_stream.rs1use crate::{ffi, util, Algorithm, CompressionError, Result};
2use std::ffi::c_void;
3use std::ptr::NonNull;
4
5const OUTPUT_CHUNK_LEN: usize = 32 * 1024;
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
9pub enum StreamOperation {
10 Encode,
12 Decode,
14}
15
16impl StreamOperation {
17 const fn operation_name(self) -> &'static str {
18 match self {
19 Self::Encode => "compression_stream_process(encode)",
20 Self::Decode => "compression_stream_process(decode)",
21 }
22 }
23
24 const fn init_name(self) -> &'static str {
25 match self {
26 Self::Encode => "compression_stream_init(encode)",
27 Self::Decode => "compression_stream_init(decode)",
28 }
29 }
30
31 const fn as_raw(self) -> i32 {
32 match self {
33 Self::Encode => 0,
34 Self::Decode => 1,
35 }
36 }
37}
38
39#[derive(Debug)]
41pub struct CompressionStream {
42 handle: NonNull<c_void>,
43 algorithm: Algorithm,
44 operation: StreamOperation,
45 finished: bool,
46}
47
48impl CompressionStream {
49 pub fn new(operation: StreamOperation, algorithm: Algorithm) -> Result<Self> {
51 let handle = unsafe {
52 ffi::compression_stream::compression_rs_compression_stream_create(
53 operation.as_raw(),
54 algorithm.as_raw(),
55 )
56 };
57 Ok(Self {
58 handle: util::nonnull_handle(handle, operation.init_name())?,
59 algorithm,
60 operation,
61 finished: false,
62 })
63 }
64
65 fn as_ptr(&self) -> *mut c_void {
66 self.handle.as_ptr()
67 }
68
69 pub fn process(&mut self, mut input: &[u8], finalize: bool) -> Result<Vec<u8>> {
71 if self.finished {
72 return Err(CompressionError::StreamFinished {
73 operation: self.operation.operation_name(),
74 algorithm: self.algorithm,
75 });
76 }
77
78 let flags = if finalize {
79 ffi::compression_stream::COMPRESSION_STREAM_FINALIZE
80 } else {
81 0
82 };
83 let mut output = Vec::new();
84
85 loop {
86 let mut buffer = vec![0_u8; OUTPUT_CHUNK_LEN];
87 let mut src_remaining = input.len();
88 let mut dst_remaining = buffer.len();
89 let status = unsafe {
90 ffi::compression_stream::compression_rs_compression_stream_process(
91 self.as_ptr(),
92 input.as_ptr(),
93 input.len(),
94 buffer.as_mut_ptr(),
95 buffer.len(),
96 flags,
97 &mut src_remaining,
98 &mut dst_remaining,
99 )
100 };
101
102 let consumed = input.len() - src_remaining;
103 let produced = buffer.len() - dst_remaining;
104 output.extend_from_slice(&buffer[..produced]);
105 input = &input[consumed..];
106
107 match status {
108 ffi::compression_stream::COMPRESSION_STATUS_END => {
109 self.finished = true;
110 return Ok(output);
111 }
112 ffi::compression_stream::COMPRESSION_STATUS_OK => {
113 let input_consumed = src_remaining == 0;
114 let destination_exhausted = dst_remaining == 0;
115
116 if !finalize && input_consumed {
117 return Ok(output);
118 }
119
120 if destination_exhausted || !input_consumed {
121 continue;
122 }
123
124 if produced == 0 {
125 return Err(CompressionError::StreamStalled {
126 operation: self.operation.operation_name(),
127 algorithm: self.algorithm,
128 });
129 }
130 }
131 other => {
132 return Err(CompressionError::StreamProcessFailed {
133 operation: self.operation.operation_name(),
134 algorithm: self.algorithm,
135 status: other,
136 });
137 }
138 }
139 }
140 }
141
142 pub fn finish(&mut self) -> Result<Vec<u8>> {
144 if self.finished {
145 Ok(Vec::new())
146 } else {
147 self.process(&[], true)
148 }
149 }
150}
151
152impl Drop for CompressionStream {
153 fn drop(&mut self) {
154 unsafe {
155 ffi::compression_stream::compression_rs_compression_stream_release(self.as_ptr());
156 }
157 }
158}
159
160#[derive(Debug)]
162pub struct Encoder {
163 inner: CompressionStream,
164}
165
166impl Encoder {
167 pub fn new(algorithm: Algorithm) -> Result<Self> {
169 Ok(Self {
170 inner: CompressionStream::new(StreamOperation::Encode, algorithm)?,
171 })
172 }
173
174 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
176 self.inner.process(input, false)
177 }
178
179 pub fn finish(&mut self) -> Result<Vec<u8>> {
181 self.inner.finish()
182 }
183}
184
185#[derive(Debug)]
187pub struct Decoder {
188 inner: CompressionStream,
189}
190
191impl Decoder {
192 pub fn new(algorithm: Algorithm) -> Result<Self> {
194 Ok(Self {
195 inner: CompressionStream::new(StreamOperation::Decode, algorithm)?,
196 })
197 }
198
199 pub fn process(&mut self, input: &[u8]) -> Result<Vec<u8>> {
201 self.inner.process(input, false)
202 }
203
204 pub fn finish(&mut self) -> Result<Vec<u8>> {
206 self.inner.finish()
207 }
208}