rpdfium_codec/
streaming.rs1use crate::error::DecodeError;
13
14pub trait StreamingDecoder {
20 fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError>;
22
23 fn finish(&mut self) -> Result<(), DecodeError>;
25
26 fn output_available(&self) -> usize;
28
29 fn read_output(&mut self, buf: &mut [u8]) -> usize;
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum StreamingDecoderType {
37 Flate,
39 Dct,
41}
42
43pub fn create_streaming_decoder(filter: StreamingDecoderType) -> Box<dyn StreamingDecoder> {
45 match filter {
46 StreamingDecoderType::Flate => Box::new(FlateStreamingDecoder::new()),
47 StreamingDecoderType::Dct => Box::new(DctStreamingDecoder::new()),
48 }
49}
50
51pub struct FlateStreamingDecoder {
59 decompressor: flate2::Decompress,
60 output_buffer: Vec<u8>,
61 finished: bool,
62}
63
64impl Default for FlateStreamingDecoder {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl FlateStreamingDecoder {
71 pub fn new() -> Self {
73 Self {
74 decompressor: flate2::Decompress::new(true), output_buffer: Vec::new(),
76 finished: false,
77 }
78 }
79}
80
81impl StreamingDecoder for FlateStreamingDecoder {
82 fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError> {
83 if self.finished {
84 return Err(DecodeError::InvalidInput("decoder already finished".into()));
85 }
86 let mut buf = [0u8; 32 * 1024];
87 let mut offset = 0;
88 while offset < input.len() {
89 let before_in = self.decompressor.total_in();
90 let before_out = self.decompressor.total_out();
91 let status = self
92 .decompressor
93 .decompress(&input[offset..], &mut buf, flate2::FlushDecompress::None)
94 .map_err(|e| DecodeError::Flate(e.to_string()))?;
95 let consumed = (self.decompressor.total_in() - before_in) as usize;
96 let produced = (self.decompressor.total_out() - before_out) as usize;
97 self.output_buffer.extend_from_slice(&buf[..produced]);
98 offset += consumed;
99 if matches!(status, flate2::Status::StreamEnd) {
100 self.finished = true;
101 break;
102 }
103 if consumed == 0 && produced == 0 {
104 break;
105 }
106 }
107 Ok(())
108 }
109
110 fn finish(&mut self) -> Result<(), DecodeError> {
111 if !self.finished {
112 let mut buf = [0u8; 4096];
113 let before_out = self.decompressor.total_out();
114 let _ = self
115 .decompressor
116 .decompress(&[], &mut buf, flate2::FlushDecompress::Finish);
117 let produced = (self.decompressor.total_out() - before_out) as usize;
118 self.output_buffer.extend_from_slice(&buf[..produced]);
119 self.finished = true;
120 }
121 Ok(())
122 }
123
124 fn output_available(&self) -> usize {
125 self.output_buffer.len()
126 }
127
128 fn read_output(&mut self, buf: &mut [u8]) -> usize {
129 let len = buf.len().min(self.output_buffer.len());
130 buf[..len].copy_from_slice(&self.output_buffer[..len]);
131 self.output_buffer.drain(..len);
132 len
133 }
134}
135
136pub struct DctStreamingDecoder {
145 accumulated: Vec<u8>,
146 output_buffer: Vec<u8>,
147 finished: bool,
148}
149
150impl Default for DctStreamingDecoder {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl DctStreamingDecoder {
157 pub fn new() -> Self {
159 Self {
160 accumulated: Vec::new(),
161 output_buffer: Vec::new(),
162 finished: false,
163 }
164 }
165}
166
167impl StreamingDecoder for DctStreamingDecoder {
168 fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError> {
169 if self.finished {
170 return Err(DecodeError::InvalidInput("decoder already finished".into()));
171 }
172 self.accumulated.extend_from_slice(input);
173 Ok(())
174 }
175
176 fn finish(&mut self) -> Result<(), DecodeError> {
177 if !self.finished {
178 self.output_buffer = crate::jpeg::decode(&self.accumulated)?;
179 self.accumulated = Vec::new();
180 self.finished = true;
181 }
182 Ok(())
183 }
184
185 fn output_available(&self) -> usize {
186 self.output_buffer.len()
187 }
188
189 fn read_output(&mut self, buf: &mut [u8]) -> usize {
190 let len = buf.len().min(self.output_buffer.len());
191 buf[..len].copy_from_slice(&self.output_buffer[..len]);
192 self.output_buffer.drain(..len);
193 len
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use flate2::Compression;
201 use flate2::write::ZlibEncoder;
202 use std::io::Write;
203
204 fn zlib_compress(data: &[u8]) -> Vec<u8> {
205 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
206 encoder.write_all(data).unwrap();
207 encoder.finish().unwrap()
208 }
209
210 #[test]
211 fn test_flate_streaming_multi_chunk() {
212 let original = b"Hello, World!";
213 let compressed = zlib_compress(original);
214 let mut decoder = FlateStreamingDecoder::new();
215
216 let chunk_size = compressed.len().div_ceil(3);
218 for chunk in compressed.chunks(chunk_size) {
219 decoder.feed(chunk).unwrap();
220 }
221 decoder.finish().unwrap();
222
223 let mut output = vec![0u8; 256];
224 let n = decoder.read_output(&mut output);
225 assert_eq!(&output[..n], original);
226 assert_eq!(decoder.output_available(), 0);
227 }
228
229 #[test]
230 fn test_flate_streaming_single_byte() {
231 let original = b"Streaming byte by byte test";
232 let compressed = zlib_compress(original);
233 let mut decoder = FlateStreamingDecoder::new();
234
235 for &byte in &compressed {
237 decoder.feed(&[byte]).unwrap();
238 }
239 decoder.finish().unwrap();
240
241 let mut output = vec![0u8; 256];
242 let n = decoder.read_output(&mut output);
243 assert_eq!(&output[..n], original);
244 }
245
246 #[test]
247 fn test_flate_streaming_matches_batch() {
248 let original: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
249 let compressed = zlib_compress(&original);
250
251 let batch_result = crate::flate::decode(&compressed, None, None, None, None).unwrap();
253
254 let mut decoder = FlateStreamingDecoder::new();
256 decoder.feed(&compressed).unwrap();
257 decoder.finish().unwrap();
258
259 let mut streaming_result = vec![0u8; decoder.output_available()];
260 let n = decoder.read_output(&mut streaming_result);
261 streaming_result.truncate(n);
262
263 assert_eq!(streaming_result, batch_result);
264 assert_eq!(streaming_result, original);
265 }
266
267 #[test]
268 fn test_dct_streaming_accumulate() {
269 let mut decoder = DctStreamingDecoder::new();
270 decoder.feed(&[0xFF, 0xD8]).unwrap();
272 decoder.feed(&[0xFF, 0xE0]).unwrap();
273 let result = decoder.finish();
275 assert!(result.is_err());
276 }
277
278 #[test]
279 fn test_output_available_accuracy() {
280 let original = b"Test output availability tracking";
281 let compressed = zlib_compress(original);
282 let mut decoder = FlateStreamingDecoder::new();
283
284 decoder.feed(&compressed).unwrap();
285 decoder.finish().unwrap();
286
287 let available = decoder.output_available();
288 assert_eq!(available, original.len());
289
290 let mut output = vec![0u8; available];
291 let n = decoder.read_output(&mut output);
292 assert_eq!(n, available);
293 assert_eq!(decoder.output_available(), 0);
294 assert_eq!(&output, original);
295 }
296
297 #[test]
298 fn test_read_output_small_buffer() {
299 let original = b"Drain me in small reads";
300 let compressed = zlib_compress(original);
301 let mut decoder = FlateStreamingDecoder::new();
302
303 decoder.feed(&compressed).unwrap();
304 decoder.finish().unwrap();
305
306 let mut collected = Vec::new();
307 let mut buf = [0u8; 4];
308 loop {
309 let n = decoder.read_output(&mut buf);
310 if n == 0 {
311 break;
312 }
313 collected.extend_from_slice(&buf[..n]);
314 }
315 assert_eq!(collected, original);
316 assert_eq!(decoder.output_available(), 0);
317 }
318
319 #[test]
320 fn test_feed_after_finish_errors() {
321 let compressed = zlib_compress(b"data");
322 let mut decoder = FlateStreamingDecoder::new();
323 decoder.feed(&compressed).unwrap();
324 decoder.finish().unwrap();
325
326 let result = decoder.feed(b"more data");
327 assert!(result.is_err());
328 }
329
330 #[test]
331 fn test_dct_feed_after_finish_errors() {
332 let mut decoder = DctStreamingDecoder::new();
333 let _ = decoder.finish();
335 let mut decoder2 = DctStreamingDecoder::new();
339 decoder2.accumulated = Vec::new();
340 decoder2.finished = true;
342 let result = decoder2.feed(b"more");
343 assert!(result.is_err());
344 }
345
346 #[test]
347 fn test_create_streaming_decoder_flate() {
348 let mut decoder = create_streaming_decoder(StreamingDecoderType::Flate);
349 let compressed = zlib_compress(b"factory test");
350 decoder.feed(&compressed).unwrap();
351 decoder.finish().unwrap();
352 let mut out = vec![0u8; 64];
353 let n = decoder.read_output(&mut out);
354 assert_eq!(&out[..n], b"factory test");
355 }
356
357 #[test]
358 fn test_create_streaming_decoder_dct() {
359 let mut decoder = create_streaming_decoder(StreamingDecoderType::Dct);
360 decoder.feed(&[0xFF, 0xD8]).unwrap();
361 assert!(decoder.finish().is_err());
363 }
364}