oxidize_pdf/memory/
stream_processor.rs

1//! Stream processing for memory-efficient PDF operations
2//!
3//! Processes PDF content incrementally without loading entire documents
4//! into memory, ideal for large files or memory-constrained environments.
5
6use crate::error::{PdfError, Result};
7use crate::parser::content::{ContentOperation, ContentParser};
8use crate::parser::PdfObject;
9use std::io::{BufRead, BufReader, Read, Seek, Write};
10
11/// Options for streaming operations
12#[derive(Debug, Clone)]
13pub struct StreamingOptions {
14    /// Buffer size for reading
15    pub buffer_size: usize,
16    /// Maximum content stream size to process at once
17    pub max_stream_size: usize,
18    /// Whether to skip processing images
19    pub skip_images: bool,
20    /// Whether to skip processing fonts
21    pub skip_fonts: bool,
22}
23
24impl Default for StreamingOptions {
25    fn default() -> Self {
26        Self {
27            buffer_size: 64 * 1024,            // 64KB
28            max_stream_size: 10 * 1024 * 1024, // 10MB
29            skip_images: false,
30            skip_fonts: false,
31        }
32    }
33}
34
35/// Stream processor for incremental PDF processing
36pub struct StreamProcessor<R: Read + Seek> {
37    reader: BufReader<R>,
38    #[allow(dead_code)]
39    options: StreamingOptions,
40}
41
42impl<R: Read + Seek> StreamProcessor<R> {
43    /// Create a new stream processor
44    pub fn new(reader: R, options: StreamingOptions) -> Self {
45        let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
46        Self {
47            reader: buf_reader,
48            options,
49        }
50    }
51
52    /// Process a PDF incrementally with a callback
53    pub fn process_with<F>(&mut self, mut callback: F) -> Result<()>
54    where
55        F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
56    {
57        // Start processing
58        callback(ProcessingEvent::Start)?;
59
60        // Process header
61        self.process_header(&mut callback)?;
62
63        // Process objects incrementally
64        self.process_objects(&mut callback)?;
65
66        // End processing
67        callback(ProcessingEvent::End)?;
68
69        Ok(())
70    }
71
72    /// Process pages incrementally
73    pub fn process_pages<F>(&mut self, mut page_callback: F) -> Result<()>
74    where
75        F: FnMut(u32, PageData) -> Result<ProcessingAction>,
76    {
77        let mut page_index = 0;
78
79        self.process_with(|event| match event {
80            ProcessingEvent::Page(data) => {
81                let action = page_callback(page_index, data)?;
82                page_index += 1;
83                Ok(action)
84            }
85            _ => Ok(ProcessingAction::Continue),
86        })
87    }
88
89    /// Extract text incrementally
90    pub fn extract_text_streaming<W: Write>(&mut self, output: &mut W) -> Result<()> {
91        self.process_pages(|_index, page_data| {
92            if let Some(text) = page_data.text {
93                output.write_all(text.as_bytes())?;
94                output.write_all(b"\n")?;
95            }
96            Ok(ProcessingAction::Continue)
97        })
98    }
99
100    fn process_header<F>(&mut self, callback: &mut F) -> Result<()>
101    where
102        F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
103    {
104        let mut header = String::new();
105        self.reader.read_line(&mut header)?;
106
107        if !header.starts_with("%PDF-") {
108            return Err(PdfError::InvalidHeader);
109        }
110
111        let version = header.trim_start_matches("%PDF-").trim();
112        callback(ProcessingEvent::Header {
113            version: version.to_string(),
114        })?;
115
116        Ok(())
117    }
118
119    fn process_objects<F>(&mut self, callback: &mut F) -> Result<()>
120    where
121        F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
122    {
123        // In a real implementation, this would parse objects incrementally
124        // For now, we'll simulate streaming behavior
125
126        // Process some mock pages
127        for i in 0..3 {
128            let page_data = PageData {
129                number: i,
130                width: 595.0,
131                height: 842.0,
132                text: Some(format!("Page {} content", i + 1)),
133                operations: vec![],
134            };
135
136            match callback(ProcessingEvent::Page(page_data))? {
137                ProcessingAction::Continue => {}
138                ProcessingAction::Skip => continue,
139                ProcessingAction::Stop => break,
140            }
141        }
142
143        Ok(())
144    }
145}
146
147/// Events during stream processing
148#[derive(Debug)]
149pub enum ProcessingEvent {
150    /// Processing started
151    Start,
152    /// PDF header found
153    Header { version: String },
154    /// Object encountered
155    Object { id: (u32, u16), object: PdfObject },
156    /// Page encountered
157    Page(PageData),
158    /// Resource encountered
159    Resource {
160        name: String,
161        resource_type: ResourceType,
162    },
163    /// Processing ended
164    End,
165}
166
167/// Page data during streaming
168#[derive(Debug)]
169pub struct PageData {
170    /// Page number (0-indexed)
171    pub number: u32,
172    /// Page width in points
173    pub width: f32,
174    /// Page height in points
175    pub height: f32,
176    /// Extracted text (if any)
177    pub text: Option<String>,
178    /// Content operations (if requested)
179    pub operations: Vec<ContentOperation>,
180}
181
182/// Resource types
183#[derive(Debug, Clone)]
184pub enum ResourceType {
185    Font,
186    Image,
187    ColorSpace,
188    Pattern,
189    XObject,
190}
191
192/// Action to take after processing an event
193#[derive(Debug, PartialEq)]
194pub enum ProcessingAction {
195    /// Continue processing
196    Continue,
197    /// Skip this item
198    Skip,
199    /// Stop processing
200    Stop,
201}
202
203/// Stream-based content processor for individual content streams
204pub struct ContentStreamProcessor {
205    buffer: Vec<u8>,
206    options: StreamingOptions,
207}
208
209impl ContentStreamProcessor {
210    /// Create a new content stream processor
211    pub fn new(options: StreamingOptions) -> Self {
212        Self {
213            buffer: Vec::with_capacity(options.buffer_size),
214            options,
215        }
216    }
217
218    /// Process a content stream incrementally
219    pub fn process_stream<R: Read, F>(&mut self, mut reader: R, mut callback: F) -> Result<()>
220    where
221        F: FnMut(&ContentOperation) -> Result<ProcessingAction>,
222    {
223        self.buffer.clear();
224        reader.read_to_end(&mut self.buffer)?;
225
226        if self.buffer.len() > self.options.max_stream_size {
227            return Err(PdfError::ContentStreamTooLarge(self.buffer.len()));
228        }
229
230        let operations =
231            ContentParser::parse(&self.buffer).map_err(|e| PdfError::ParseError(e.to_string()))?;
232
233        for op in operations {
234            match callback(&op)? {
235                ProcessingAction::Continue => {}
236                ProcessingAction::Skip => continue,
237                ProcessingAction::Stop => break,
238            }
239        }
240
241        Ok(())
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use std::io::Cursor;
249
250    #[test]
251    fn test_streaming_options_default() {
252        let options = StreamingOptions::default();
253        assert_eq!(options.buffer_size, 64 * 1024);
254        assert_eq!(options.max_stream_size, 10 * 1024 * 1024);
255        assert!(!options.skip_images);
256        assert!(!options.skip_fonts);
257    }
258
259    #[test]
260    fn test_stream_processor_creation() {
261        let data = b"%PDF-1.7\n";
262        let cursor = Cursor::new(data);
263        let options = StreamingOptions::default();
264        let _processor = StreamProcessor::new(cursor, options);
265    }
266
267    #[test]
268    fn test_processing_events() {
269        let data = b"%PDF-1.7\n";
270        let cursor = Cursor::new(data);
271        let options = StreamingOptions::default();
272        let mut processor = StreamProcessor::new(cursor, options);
273
274        let mut events = Vec::new();
275
276        processor
277            .process_with(|event| {
278                match &event {
279                    ProcessingEvent::Start => events.push("start"),
280                    ProcessingEvent::Header { version } => {
281                        assert_eq!(version, "1.7");
282                        events.push("header");
283                    }
284                    ProcessingEvent::Page(_) => events.push("page"),
285                    ProcessingEvent::End => events.push("end"),
286                    _ => {}
287                }
288                Ok(ProcessingAction::Continue)
289            })
290            .unwrap();
291
292        assert!(events.contains(&"start"));
293        assert!(events.contains(&"header"));
294        assert!(events.contains(&"end"));
295    }
296
297    #[test]
298    fn test_process_pages() {
299        let data = b"%PDF-1.7\n";
300        let cursor = Cursor::new(data);
301        let options = StreamingOptions::default();
302        let mut processor = StreamProcessor::new(cursor, options);
303
304        let mut page_count = 0;
305
306        processor
307            .process_pages(|index, page| {
308                assert_eq!(index, page_count);
309                assert_eq!(page.width, 595.0);
310                assert_eq!(page.height, 842.0);
311                page_count += 1;
312                Ok(ProcessingAction::Continue)
313            })
314            .unwrap();
315
316        assert!(page_count > 0);
317    }
318
319    #[test]
320    fn test_extract_text_streaming() {
321        let data = b"%PDF-1.7\n";
322        let cursor = Cursor::new(data);
323        let options = StreamingOptions::default();
324        let mut processor = StreamProcessor::new(cursor, options);
325
326        let mut output = Vec::new();
327        processor.extract_text_streaming(&mut output).unwrap();
328
329        let text = String::from_utf8(output).unwrap();
330        assert!(text.contains("Page"));
331    }
332
333    #[test]
334    fn test_processing_action() {
335        assert_eq!(ProcessingAction::Continue, ProcessingAction::Continue);
336        assert_eq!(ProcessingAction::Skip, ProcessingAction::Skip);
337        assert_eq!(ProcessingAction::Stop, ProcessingAction::Stop);
338        assert_ne!(ProcessingAction::Continue, ProcessingAction::Stop);
339    }
340
341    #[test]
342    fn test_content_stream_processor() {
343        let options = StreamingOptions::default();
344        let mut processor = ContentStreamProcessor::new(options);
345
346        // Test with simple content
347        let content = b"BT /F1 12 Tf 100 700 Td (Hello) Tj ET";
348        let cursor = Cursor::new(content);
349
350        let mut op_count = 0;
351        processor
352            .process_stream(cursor, |op| {
353                op_count += 1;
354                match op {
355                    ContentOperation::BeginText => assert_eq!(op_count, 1),
356                    ContentOperation::EndText => assert_eq!(op_count, 5),
357                    _ => {}
358                }
359                Ok(ProcessingAction::Continue)
360            })
361            .unwrap();
362
363        assert!(op_count > 0);
364    }
365
366    #[test]
367    fn test_stop_processing() {
368        let data = b"%PDF-1.7\n";
369        let cursor = Cursor::new(data);
370        let options = StreamingOptions::default();
371        let mut processor = StreamProcessor::new(cursor, options);
372
373        let mut page_count = 0;
374
375        processor
376            .process_pages(|_index, _page| {
377                page_count += 1;
378                if page_count >= 2 {
379                    Ok(ProcessingAction::Stop)
380                } else {
381                    Ok(ProcessingAction::Continue)
382                }
383            })
384            .unwrap();
385
386        assert_eq!(page_count, 2);
387    }
388}