oxidize_pdf/streaming/
mod.rs

1//! Streaming support for incremental PDF processing
2//!
3//! This module provides advanced streaming capabilities for processing PDFs
4//! without loading the entire document into memory. It's designed for handling
5//! very large PDFs or situations with limited memory.
6//!
7//! # Features
8//!
9//! - **Incremental Parsing**: Parse PDF objects as they're needed
10//! - **Page Streaming**: Process pages one at a time
11//! - **Content Stream Processing**: Handle content streams in chunks
12//! - **Progressive Text Extraction**: Extract text as it's encountered
13//! - **Memory Bounds**: Configurable memory limits for buffering
14//! - **Async Support**: Future-ready for async I/O operations
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use oxidize_pdf::streaming::{StreamingDocument, StreamingOptions};
20//! use std::fs::File;
21//!
22//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! let file = File::open("large_document.pdf")?;
24//! let options = StreamingOptions::default()
25//!     .with_buffer_size(1024 * 1024) // 1MB buffer
26//!     .with_page_cache_size(5);      // Keep 5 pages in memory
27//!
28//! let mut doc = StreamingDocument::new(file, options)?;
29//!
30//! // Process pages incrementally
31//! while let Some(page) = doc.next_page()? {
32//!     println!("Processing page {}", page.number());
33//!     
34//!     // Extract text incrementally
35//!     let text = page.extract_text_streaming()?;
36//!     println!("Text: {}", text);
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::error::Result;
43use std::collections::VecDeque;
44use std::io::{BufReader, Read, Seek};
45
46pub mod chunk_processor;
47pub mod incremental_parser;
48pub mod page_streamer;
49pub mod text_streamer;
50
51// Re-export main types
52pub use chunk_processor::{
53    process_in_chunks, ChunkOptions, ChunkProcessor, ChunkType, ContentChunk,
54};
55pub use incremental_parser::{process_incrementally, IncrementalParser, ParseEvent};
56pub use page_streamer::{PageStreamer, StreamingPage};
57pub use text_streamer::{stream_text, TextChunk, TextStreamOptions, TextStreamer};
58
59/// Options for streaming operations
60#[derive(Debug, Clone)]
61pub struct StreamingOptions {
62    /// Buffer size for reading
63    pub buffer_size: usize,
64    /// Maximum number of pages to keep in cache
65    pub page_cache_size: usize,
66    /// Maximum size of a single content stream
67    pub max_content_stream_size: usize,
68    /// Enable progressive rendering hints
69    pub progressive_hints: bool,
70    /// Memory limit for buffers (bytes)
71    pub memory_limit: usize,
72}
73
74impl Default for StreamingOptions {
75    fn default() -> Self {
76        Self {
77            buffer_size: 256 * 1024,                   // 256KB
78            page_cache_size: 3,                        // Keep 3 pages
79            max_content_stream_size: 10 * 1024 * 1024, // 10MB
80            progressive_hints: true,
81            memory_limit: 100 * 1024 * 1024, // 100MB
82        }
83    }
84}
85
86impl StreamingOptions {
87    /// Create options optimized for minimal memory usage
88    pub fn minimal_memory() -> Self {
89        Self {
90            buffer_size: 64 * 1024,
91            page_cache_size: 1,
92            max_content_stream_size: 1024 * 1024,
93            progressive_hints: false,
94            memory_limit: 10 * 1024 * 1024,
95        }
96    }
97
98    /// Create options optimized for speed
99    pub fn fast_processing() -> Self {
100        Self {
101            buffer_size: 1024 * 1024,
102            page_cache_size: 10,
103            max_content_stream_size: 50 * 1024 * 1024,
104            progressive_hints: true,
105            memory_limit: 500 * 1024 * 1024,
106        }
107    }
108
109    pub fn with_buffer_size(mut self, size: usize) -> Self {
110        self.buffer_size = size;
111        self
112    }
113
114    pub fn with_page_cache_size(mut self, size: usize) -> Self {
115        self.page_cache_size = size;
116        self
117    }
118
119    pub fn with_memory_limit(mut self, limit: usize) -> Self {
120        self.memory_limit = limit;
121        self
122    }
123}
124
125/// A PDF document that supports streaming operations
126pub struct StreamingDocument<R: Read + Seek> {
127    #[allow(dead_code)]
128    reader: BufReader<R>,
129    options: StreamingOptions,
130    page_cache: VecDeque<StreamingPage>,
131    current_page: u32,
132    total_pages: Option<u32>,
133    memory_used: usize,
134}
135
136impl<R: Read + Seek> StreamingDocument<R> {
137    /// Create a new streaming document
138    pub fn new(reader: R, options: StreamingOptions) -> Result<Self> {
139        let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
140
141        Ok(Self {
142            reader: buf_reader,
143            options,
144            page_cache: VecDeque::new(),
145            current_page: 0,
146            total_pages: None,
147            memory_used: 0,
148        })
149    }
150
151    /// Get the next page for processing
152    pub fn next_page(&mut self) -> Result<Option<StreamingPage>> {
153        // Check if we've processed all pages
154        if let Some(total) = self.total_pages {
155            if self.current_page >= total {
156                return Ok(None);
157            }
158        } else {
159            // For demo/test purposes, limit to 10 pages when total is unknown
160            if self.current_page >= 10 {
161                return Ok(None);
162            }
163        }
164
165        // Check memory limit
166        if self.memory_used > self.options.memory_limit {
167            self.evict_pages();
168        }
169
170        // In a real implementation, this would parse the next page
171        // For now, return a mock page
172        let page = StreamingPage {
173            number: self.current_page,
174            width: 595.0,
175            height: 842.0,
176            content_offset: 0,
177            content_length: 0,
178        };
179
180        self.current_page += 1;
181
182        // Cache the page if there's room
183        if self.page_cache.len() < self.options.page_cache_size {
184            self.page_cache.push_back(page.clone());
185        }
186
187        Ok(Some(page))
188    }
189
190    /// Process all pages with a callback
191    pub fn process_pages<F>(&mut self, mut callback: F) -> Result<()>
192    where
193        F: FnMut(&StreamingPage) -> Result<()>,
194    {
195        while let Some(page) = self.next_page()? {
196            callback(&page)?;
197        }
198        Ok(())
199    }
200
201    /// Get current memory usage
202    pub fn memory_usage(&self) -> usize {
203        self.memory_used
204    }
205
206    /// Clear page cache to free memory
207    pub fn clear_cache(&mut self) {
208        self.page_cache.clear();
209        self.memory_used = 0;
210    }
211
212    fn evict_pages(&mut self) {
213        // Evict oldest pages until we're under the memory limit
214        while self.memory_used > self.options.memory_limit && !self.page_cache.is_empty() {
215            if self.page_cache.pop_front().is_some() {
216                // In a real implementation, update memory_used
217                self.memory_used = self.memory_used.saturating_sub(1024);
218            }
219        }
220    }
221}
222
223/// Statistics for streaming operations
224#[derive(Debug, Clone, Default)]
225pub struct StreamingStats {
226    /// Total bytes processed
227    pub bytes_processed: usize,
228    /// Number of pages processed
229    pub pages_processed: u32,
230    /// Number of objects parsed
231    pub objects_parsed: u32,
232    /// Current memory usage
233    pub memory_used: usize,
234    /// Peak memory usage
235    pub peak_memory: usize,
236    /// Number of cache evictions
237    pub cache_evictions: u32,
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use std::io::Cursor;
244
245    #[test]
246    fn test_streaming_options_default() {
247        let options = StreamingOptions::default();
248        assert_eq!(options.buffer_size, 256 * 1024);
249        assert_eq!(options.page_cache_size, 3);
250        assert!(options.progressive_hints);
251    }
252
253    #[test]
254    fn test_streaming_options_minimal() {
255        let options = StreamingOptions::minimal_memory();
256        assert_eq!(options.buffer_size, 64 * 1024);
257        assert_eq!(options.page_cache_size, 1);
258        assert!(!options.progressive_hints);
259        assert_eq!(options.memory_limit, 10 * 1024 * 1024);
260    }
261
262    #[test]
263    fn test_streaming_options_fast() {
264        let options = StreamingOptions::fast_processing();
265        assert_eq!(options.buffer_size, 1024 * 1024);
266        assert_eq!(options.page_cache_size, 10);
267        assert!(options.progressive_hints);
268    }
269
270    #[test]
271    fn test_streaming_options_builder() {
272        let options = StreamingOptions::default()
273            .with_buffer_size(512 * 1024)
274            .with_page_cache_size(5)
275            .with_memory_limit(50 * 1024 * 1024);
276
277        assert_eq!(options.buffer_size, 512 * 1024);
278        assert_eq!(options.page_cache_size, 5);
279        assert_eq!(options.memory_limit, 50 * 1024 * 1024);
280    }
281
282    #[test]
283    fn test_streaming_document_creation() {
284        let data = b"%PDF-1.7\n";
285        let cursor = Cursor::new(data);
286        let options = StreamingOptions::default();
287
288        let doc = StreamingDocument::new(cursor, options);
289        assert!(doc.is_ok());
290    }
291
292    #[test]
293    fn test_next_page() {
294        let data = b"%PDF-1.7\n";
295        let cursor = Cursor::new(data);
296        let options = StreamingOptions::default();
297
298        let mut doc = StreamingDocument::new(cursor, options).unwrap();
299
300        // Should get at least one page
301        let page = doc.next_page().unwrap();
302        assert!(page.is_some());
303
304        let page = page.unwrap();
305        assert_eq!(page.number(), 0);
306        assert_eq!(page.width(), 595.0);
307        assert_eq!(page.height(), 842.0);
308    }
309
310    #[test]
311    fn test_process_pages() {
312        let data = b"%PDF-1.7\n";
313        let cursor = Cursor::new(data);
314        let options = StreamingOptions::default();
315
316        let mut doc = StreamingDocument::new(cursor, options).unwrap();
317        let mut page_count = 0;
318
319        doc.process_pages(|page| {
320            page_count += 1;
321            assert!(page.number() < 1000); // Sanity check with higher limit
322            Ok(())
323        })
324        .unwrap();
325
326        assert!(page_count > 0);
327    }
328
329    #[test]
330    fn test_memory_management() {
331        let data = b"%PDF-1.7\n";
332        let cursor = Cursor::new(data);
333        let options = StreamingOptions::default().with_memory_limit(1024); // Very small limit
334
335        let mut doc = StreamingDocument::new(cursor, options).unwrap();
336
337        // Process multiple pages
338        for _ in 0..5 {
339            let _ = doc.next_page();
340        }
341
342        // Cache should be limited
343        assert!(doc.page_cache.len() <= 3);
344
345        // Clear cache
346        doc.clear_cache();
347        assert_eq!(doc.page_cache.len(), 0);
348        assert_eq!(doc.memory_usage(), 0);
349    }
350
351    #[test]
352    fn test_streaming_stats() {
353        let stats = StreamingStats::default();
354        assert_eq!(stats.bytes_processed, 0);
355        assert_eq!(stats.pages_processed, 0);
356        assert_eq!(stats.objects_parsed, 0);
357        assert_eq!(stats.memory_used, 0);
358        assert_eq!(stats.peak_memory, 0);
359        assert_eq!(stats.cache_evictions, 0);
360    }
361}