Skip to main content

rpdfium_codec/
streaming.rs

1// Derived from PDFium's codec streaming patterns
2// Original: Copyright 2014 The PDFium Authors
3// Licensed under BSD-3-Clause / Apache-2.0
4// See pdfium-upstream/LICENSE for the original license.
5
6//! Streaming (incremental) decode API for large image decompression.
7//!
8//! The [`StreamingDecoder`] trait allows feeding input data in chunks
9//! and reading decompressed output incrementally, which is useful for
10//! large inline images and progressive rendering.
11
12use crate::error::DecodeError;
13
14/// A streaming (incremental) decoder for PDF stream data.
15///
16/// Callers feed input chunks via [`feed()`](StreamingDecoder::feed),
17/// then read available output via [`read_output()`](StreamingDecoder::read_output).
18/// Call [`finish()`](StreamingDecoder::finish) after all input is provided.
19pub trait StreamingDecoder {
20    /// Feed a chunk of compressed input data.
21    fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError>;
22
23    /// Signal that all input has been provided and perform final flush.
24    fn finish(&mut self) -> Result<(), DecodeError>;
25
26    /// Returns the number of decompressed bytes available to read.
27    fn output_available(&self) -> usize;
28
29    /// Read decompressed output into the provided buffer.
30    /// Returns the number of bytes actually written to `buf`.
31    fn read_output(&mut self, buf: &mut [u8]) -> usize;
32}
33
34/// Types of filters that support streaming decode.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum StreamingDecoderType {
37    /// FlateDecode (zlib/deflate) — true incremental streaming.
38    Flate,
39    /// DCTDecode (JPEG) — accumulates input, decodes on finish().
40    Dct,
41}
42
43/// Create a streaming decoder for the given filter type.
44pub fn create_streaming_decoder(filter: StreamingDecoderType) -> Box<dyn StreamingDecoder> {
45    match filter {
46        StreamingDecoderType::Flate => Box::new(FlateStreamingDecoder::new()),
47        StreamingDecoderType::Dct => Box::new(DctStreamingDecoder::new()),
48    }
49}
50
51// ---------------------------------------------------------------------------
52// FlateStreamingDecoder — true incremental zlib decompression
53// ---------------------------------------------------------------------------
54
55/// Streaming decoder for FlateDecode (zlib/deflate).
56///
57/// Decompresses data incrementally as chunks are fed in via [`feed()`](StreamingDecoder::feed).
58pub struct FlateStreamingDecoder {
59    decompressor: flate2::Decompress,
60    output_buffer: Vec<u8>,
61    finished: bool,
62}
63
64impl Default for FlateStreamingDecoder {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl FlateStreamingDecoder {
71    /// Create a new streaming Flate decoder.
72    pub fn new() -> Self {
73        Self {
74            decompressor: flate2::Decompress::new(true), // zlib header
75            output_buffer: Vec::new(),
76            finished: false,
77        }
78    }
79}
80
81impl StreamingDecoder for FlateStreamingDecoder {
82    fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError> {
83        if self.finished {
84            return Err(DecodeError::InvalidInput("decoder already finished".into()));
85        }
86        let mut buf = [0u8; 32 * 1024];
87        let mut offset = 0;
88        while offset < input.len() {
89            let before_in = self.decompressor.total_in();
90            let before_out = self.decompressor.total_out();
91            let status = self
92                .decompressor
93                .decompress(&input[offset..], &mut buf, flate2::FlushDecompress::None)
94                .map_err(|e| DecodeError::Flate(e.to_string()))?;
95            let consumed = (self.decompressor.total_in() - before_in) as usize;
96            let produced = (self.decompressor.total_out() - before_out) as usize;
97            self.output_buffer.extend_from_slice(&buf[..produced]);
98            offset += consumed;
99            if matches!(status, flate2::Status::StreamEnd) {
100                self.finished = true;
101                break;
102            }
103            if consumed == 0 && produced == 0 {
104                break;
105            }
106        }
107        Ok(())
108    }
109
110    fn finish(&mut self) -> Result<(), DecodeError> {
111        if !self.finished {
112            let mut buf = [0u8; 4096];
113            let before_out = self.decompressor.total_out();
114            let _ = self
115                .decompressor
116                .decompress(&[], &mut buf, flate2::FlushDecompress::Finish);
117            let produced = (self.decompressor.total_out() - before_out) as usize;
118            self.output_buffer.extend_from_slice(&buf[..produced]);
119            self.finished = true;
120        }
121        Ok(())
122    }
123
124    fn output_available(&self) -> usize {
125        self.output_buffer.len()
126    }
127
128    fn read_output(&mut self, buf: &mut [u8]) -> usize {
129        let len = buf.len().min(self.output_buffer.len());
130        buf[..len].copy_from_slice(&self.output_buffer[..len]);
131        self.output_buffer.drain(..len);
132        len
133    }
134}
135
136// ---------------------------------------------------------------------------
137// DctStreamingDecoder — accumulates input, decodes on finish()
138// ---------------------------------------------------------------------------
139
140/// Streaming decoder for DCTDecode (JPEG).
141///
142/// Since JPEG requires the full input to decode, this accumulates bytes
143/// and performs the actual decode in [`finish()`](StreamingDecoder::finish).
144pub struct DctStreamingDecoder {
145    accumulated: Vec<u8>,
146    output_buffer: Vec<u8>,
147    finished: bool,
148}
149
150impl Default for DctStreamingDecoder {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156impl DctStreamingDecoder {
157    /// Create a new streaming DCT decoder.
158    pub fn new() -> Self {
159        Self {
160            accumulated: Vec::new(),
161            output_buffer: Vec::new(),
162            finished: false,
163        }
164    }
165}
166
167impl StreamingDecoder for DctStreamingDecoder {
168    fn feed(&mut self, input: &[u8]) -> Result<(), DecodeError> {
169        if self.finished {
170            return Err(DecodeError::InvalidInput("decoder already finished".into()));
171        }
172        self.accumulated.extend_from_slice(input);
173        Ok(())
174    }
175
176    fn finish(&mut self) -> Result<(), DecodeError> {
177        if !self.finished {
178            self.output_buffer = crate::jpeg::decode(&self.accumulated)?;
179            self.accumulated = Vec::new();
180            self.finished = true;
181        }
182        Ok(())
183    }
184
185    fn output_available(&self) -> usize {
186        self.output_buffer.len()
187    }
188
189    fn read_output(&mut self, buf: &mut [u8]) -> usize {
190        let len = buf.len().min(self.output_buffer.len());
191        buf[..len].copy_from_slice(&self.output_buffer[..len]);
192        self.output_buffer.drain(..len);
193        len
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use flate2::Compression;
201    use flate2::write::ZlibEncoder;
202    use std::io::Write;
203
204    fn zlib_compress(data: &[u8]) -> Vec<u8> {
205        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
206        encoder.write_all(data).unwrap();
207        encoder.finish().unwrap()
208    }
209
210    #[test]
211    fn test_flate_streaming_multi_chunk() {
212        let original = b"Hello, World!";
213        let compressed = zlib_compress(original);
214        let mut decoder = FlateStreamingDecoder::new();
215
216        // Feed in 3 chunks
217        let chunk_size = compressed.len().div_ceil(3);
218        for chunk in compressed.chunks(chunk_size) {
219            decoder.feed(chunk).unwrap();
220        }
221        decoder.finish().unwrap();
222
223        let mut output = vec![0u8; 256];
224        let n = decoder.read_output(&mut output);
225        assert_eq!(&output[..n], original);
226        assert_eq!(decoder.output_available(), 0);
227    }
228
229    #[test]
230    fn test_flate_streaming_single_byte() {
231        let original = b"Streaming byte by byte test";
232        let compressed = zlib_compress(original);
233        let mut decoder = FlateStreamingDecoder::new();
234
235        // Feed one byte at a time
236        for &byte in &compressed {
237            decoder.feed(&[byte]).unwrap();
238        }
239        decoder.finish().unwrap();
240
241        let mut output = vec![0u8; 256];
242        let n = decoder.read_output(&mut output);
243        assert_eq!(&output[..n], original);
244    }
245
246    #[test]
247    fn test_flate_streaming_matches_batch() {
248        let original: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
249        let compressed = zlib_compress(&original);
250
251        // Batch decode
252        let batch_result = crate::flate::decode(&compressed, None, None, None, None).unwrap();
253
254        // Streaming decode
255        let mut decoder = FlateStreamingDecoder::new();
256        decoder.feed(&compressed).unwrap();
257        decoder.finish().unwrap();
258
259        let mut streaming_result = vec![0u8; decoder.output_available()];
260        let n = decoder.read_output(&mut streaming_result);
261        streaming_result.truncate(n);
262
263        assert_eq!(streaming_result, batch_result);
264        assert_eq!(streaming_result, original);
265    }
266
267    #[test]
268    fn test_dct_streaming_accumulate() {
269        let mut decoder = DctStreamingDecoder::new();
270        // Feed some invalid JPEG data in 2 chunks
271        decoder.feed(&[0xFF, 0xD8]).unwrap();
272        decoder.feed(&[0xFF, 0xE0]).unwrap();
273        // finish() should fail because the data is not a valid JPEG
274        let result = decoder.finish();
275        assert!(result.is_err());
276    }
277
278    #[test]
279    fn test_output_available_accuracy() {
280        let original = b"Test output availability tracking";
281        let compressed = zlib_compress(original);
282        let mut decoder = FlateStreamingDecoder::new();
283
284        decoder.feed(&compressed).unwrap();
285        decoder.finish().unwrap();
286
287        let available = decoder.output_available();
288        assert_eq!(available, original.len());
289
290        let mut output = vec![0u8; available];
291        let n = decoder.read_output(&mut output);
292        assert_eq!(n, available);
293        assert_eq!(decoder.output_available(), 0);
294        assert_eq!(&output, original);
295    }
296
297    #[test]
298    fn test_read_output_small_buffer() {
299        let original = b"Drain me in small reads";
300        let compressed = zlib_compress(original);
301        let mut decoder = FlateStreamingDecoder::new();
302
303        decoder.feed(&compressed).unwrap();
304        decoder.finish().unwrap();
305
306        let mut collected = Vec::new();
307        let mut buf = [0u8; 4];
308        loop {
309            let n = decoder.read_output(&mut buf);
310            if n == 0 {
311                break;
312            }
313            collected.extend_from_slice(&buf[..n]);
314        }
315        assert_eq!(collected, original);
316        assert_eq!(decoder.output_available(), 0);
317    }
318
319    #[test]
320    fn test_feed_after_finish_errors() {
321        let compressed = zlib_compress(b"data");
322        let mut decoder = FlateStreamingDecoder::new();
323        decoder.feed(&compressed).unwrap();
324        decoder.finish().unwrap();
325
326        let result = decoder.feed(b"more data");
327        assert!(result.is_err());
328    }
329
330    #[test]
331    fn test_dct_feed_after_finish_errors() {
332        let mut decoder = DctStreamingDecoder::new();
333        // finish with empty data — dct::decode on empty input will error
334        let _ = decoder.finish();
335        // Even after finish, feed should error because finished is set
336        // (finish sets finished=true even if decode fails, but let's
337        // use a successful-ish flow)
338        let mut decoder2 = DctStreamingDecoder::new();
339        decoder2.accumulated = Vec::new();
340        // Force finish state
341        decoder2.finished = true;
342        let result = decoder2.feed(b"more");
343        assert!(result.is_err());
344    }
345
346    #[test]
347    fn test_create_streaming_decoder_flate() {
348        let mut decoder = create_streaming_decoder(StreamingDecoderType::Flate);
349        let compressed = zlib_compress(b"factory test");
350        decoder.feed(&compressed).unwrap();
351        decoder.finish().unwrap();
352        let mut out = vec![0u8; 64];
353        let n = decoder.read_output(&mut out);
354        assert_eq!(&out[..n], b"factory test");
355    }
356
357    #[test]
358    fn test_create_streaming_decoder_dct() {
359        let mut decoder = create_streaming_decoder(StreamingDecoderType::Dct);
360        decoder.feed(&[0xFF, 0xD8]).unwrap();
361        // Invalid JPEG, finish should error
362        assert!(decoder.finish().is_err());
363    }
364}