1use crate::error::Result;
43use std::collections::VecDeque;
44use std::io::{BufReader, Read, Seek};
45
46pub mod chunk_processor;
47pub mod incremental_parser;
48pub mod page_streamer;
49pub mod text_streamer;
50
51pub use chunk_processor::{
53 process_in_chunks, ChunkOptions, ChunkProcessor, ChunkType, ContentChunk,
54};
55pub use incremental_parser::{process_incrementally, IncrementalParser, ParseEvent};
56pub use page_streamer::{PageStreamer, StreamingPage};
57pub use text_streamer::{stream_text, TextChunk, TextStreamOptions, TextStreamer};
58
59#[derive(Debug, Clone)]
61pub struct StreamingOptions {
62 pub buffer_size: usize,
64 pub page_cache_size: usize,
66 pub max_content_stream_size: usize,
68 pub progressive_hints: bool,
70 pub memory_limit: usize,
72}
73
74impl Default for StreamingOptions {
75 fn default() -> Self {
76 Self {
77 buffer_size: 256 * 1024, page_cache_size: 3, max_content_stream_size: 10 * 1024 * 1024, progressive_hints: true,
81 memory_limit: 100 * 1024 * 1024, }
83 }
84}
85
86impl StreamingOptions {
87 pub fn minimal_memory() -> Self {
89 Self {
90 buffer_size: 64 * 1024,
91 page_cache_size: 1,
92 max_content_stream_size: 1024 * 1024,
93 progressive_hints: false,
94 memory_limit: 10 * 1024 * 1024,
95 }
96 }
97
98 pub fn fast_processing() -> Self {
100 Self {
101 buffer_size: 1024 * 1024,
102 page_cache_size: 10,
103 max_content_stream_size: 50 * 1024 * 1024,
104 progressive_hints: true,
105 memory_limit: 500 * 1024 * 1024,
106 }
107 }
108
109 pub fn with_buffer_size(mut self, size: usize) -> Self {
110 self.buffer_size = size;
111 self
112 }
113
114 pub fn with_page_cache_size(mut self, size: usize) -> Self {
115 self.page_cache_size = size;
116 self
117 }
118
119 pub fn with_memory_limit(mut self, limit: usize) -> Self {
120 self.memory_limit = limit;
121 self
122 }
123}
124
125pub struct StreamingDocument<R: Read + Seek> {
127 #[allow(dead_code)]
128 reader: BufReader<R>,
129 options: StreamingOptions,
130 page_cache: VecDeque<StreamingPage>,
131 current_page: u32,
132 total_pages: Option<u32>,
133 memory_used: usize,
134}
135
136impl<R: Read + Seek> StreamingDocument<R> {
137 pub fn new(reader: R, options: StreamingOptions) -> Result<Self> {
139 let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
140
141 Ok(Self {
142 reader: buf_reader,
143 options,
144 page_cache: VecDeque::new(),
145 current_page: 0,
146 total_pages: None,
147 memory_used: 0,
148 })
149 }
150
151 pub fn next_page(&mut self) -> Result<Option<StreamingPage>> {
153 if let Some(total) = self.total_pages {
155 if self.current_page >= total {
156 return Ok(None);
157 }
158 } else {
159 if self.current_page >= 10 {
161 return Ok(None);
162 }
163 }
164
165 if self.memory_used > self.options.memory_limit {
167 self.evict_pages();
168 }
169
170 let page = StreamingPage {
173 number: self.current_page,
174 width: 595.0,
175 height: 842.0,
176 content_offset: 0,
177 content_length: 0,
178 };
179
180 self.current_page += 1;
181
182 if self.page_cache.len() < self.options.page_cache_size {
184 self.page_cache.push_back(page.clone());
185 }
186
187 Ok(Some(page))
188 }
189
190 pub fn process_pages<F>(&mut self, mut callback: F) -> Result<()>
192 where
193 F: FnMut(&StreamingPage) -> Result<()>,
194 {
195 while let Some(page) = self.next_page()? {
196 callback(&page)?;
197 }
198 Ok(())
199 }
200
201 pub fn memory_usage(&self) -> usize {
203 self.memory_used
204 }
205
206 pub fn clear_cache(&mut self) {
208 self.page_cache.clear();
209 self.memory_used = 0;
210 }
211
212 fn evict_pages(&mut self) {
213 while self.memory_used > self.options.memory_limit && !self.page_cache.is_empty() {
215 if self.page_cache.pop_front().is_some() {
216 self.memory_used = self.memory_used.saturating_sub(1024);
218 }
219 }
220 }
221}
222
223#[derive(Debug, Clone, Default)]
225pub struct StreamingStats {
226 pub bytes_processed: usize,
228 pub pages_processed: u32,
230 pub objects_parsed: u32,
232 pub memory_used: usize,
234 pub peak_memory: usize,
236 pub cache_evictions: u32,
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use std::io::Cursor;
244
245 #[test]
246 fn test_streaming_options_default() {
247 let options = StreamingOptions::default();
248 assert_eq!(options.buffer_size, 256 * 1024);
249 assert_eq!(options.page_cache_size, 3);
250 assert!(options.progressive_hints);
251 }
252
253 #[test]
254 fn test_streaming_options_minimal() {
255 let options = StreamingOptions::minimal_memory();
256 assert_eq!(options.buffer_size, 64 * 1024);
257 assert_eq!(options.page_cache_size, 1);
258 assert!(!options.progressive_hints);
259 assert_eq!(options.memory_limit, 10 * 1024 * 1024);
260 }
261
262 #[test]
263 fn test_streaming_options_fast() {
264 let options = StreamingOptions::fast_processing();
265 assert_eq!(options.buffer_size, 1024 * 1024);
266 assert_eq!(options.page_cache_size, 10);
267 assert!(options.progressive_hints);
268 }
269
270 #[test]
271 fn test_streaming_options_builder() {
272 let options = StreamingOptions::default()
273 .with_buffer_size(512 * 1024)
274 .with_page_cache_size(5)
275 .with_memory_limit(50 * 1024 * 1024);
276
277 assert_eq!(options.buffer_size, 512 * 1024);
278 assert_eq!(options.page_cache_size, 5);
279 assert_eq!(options.memory_limit, 50 * 1024 * 1024);
280 }
281
282 #[test]
283 fn test_streaming_document_creation() {
284 let data = b"%PDF-1.7\n";
285 let cursor = Cursor::new(data);
286 let options = StreamingOptions::default();
287
288 let doc = StreamingDocument::new(cursor, options);
289 assert!(doc.is_ok());
290 }
291
292 #[test]
293 fn test_next_page() {
294 let data = b"%PDF-1.7\n";
295 let cursor = Cursor::new(data);
296 let options = StreamingOptions::default();
297
298 let mut doc = StreamingDocument::new(cursor, options).unwrap();
299
300 let page = doc.next_page().unwrap();
302 assert!(page.is_some());
303
304 let page = page.unwrap();
305 assert_eq!(page.number(), 0);
306 assert_eq!(page.width(), 595.0);
307 assert_eq!(page.height(), 842.0);
308 }
309
310 #[test]
311 fn test_process_pages() {
312 let data = b"%PDF-1.7\n";
313 let cursor = Cursor::new(data);
314 let options = StreamingOptions::default();
315
316 let mut doc = StreamingDocument::new(cursor, options).unwrap();
317 let mut page_count = 0;
318
319 doc.process_pages(|page| {
320 page_count += 1;
321 assert!(page.number() < 1000); Ok(())
323 })
324 .unwrap();
325
326 assert!(page_count > 0);
327 }
328
329 #[test]
330 fn test_memory_management() {
331 let data = b"%PDF-1.7\n";
332 let cursor = Cursor::new(data);
333 let options = StreamingOptions::default().with_memory_limit(1024); let mut doc = StreamingDocument::new(cursor, options).unwrap();
336
337 for _ in 0..5 {
339 let _ = doc.next_page();
340 }
341
342 assert!(doc.page_cache.len() <= 3);
344
345 doc.clear_cache();
347 assert_eq!(doc.page_cache.len(), 0);
348 assert_eq!(doc.memory_usage(), 0);
349 }
350
351 #[test]
352 fn test_streaming_stats() {
353 let stats = StreamingStats::default();
354 assert_eq!(stats.bytes_processed, 0);
355 assert_eq!(stats.pages_processed, 0);
356 assert_eq!(stats.objects_parsed, 0);
357 assert_eq!(stats.memory_used, 0);
358 assert_eq!(stats.peak_memory, 0);
359 assert_eq!(stats.cache_evictions, 0);
360 }
361}