1use crate::error::{PdfError, Result};
7use crate::parser::content::{ContentOperation, ContentParser};
8use crate::parser::PdfObject;
9use std::io::{BufRead, BufReader, Read, Seek, Write};
10
11#[derive(Debug, Clone)]
13pub struct StreamingOptions {
14 pub buffer_size: usize,
16 pub max_stream_size: usize,
18 pub skip_images: bool,
20 pub skip_fonts: bool,
22}
23
24impl Default for StreamingOptions {
25 fn default() -> Self {
26 Self {
27 buffer_size: 64 * 1024, max_stream_size: 10 * 1024 * 1024, skip_images: false,
30 skip_fonts: false,
31 }
32 }
33}
34
35pub struct StreamProcessor<R: Read + Seek> {
37 reader: BufReader<R>,
38 #[allow(dead_code)]
39 options: StreamingOptions,
40}
41
42impl<R: Read + Seek> StreamProcessor<R> {
43 pub fn new(reader: R, options: StreamingOptions) -> Self {
45 let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
46 Self {
47 reader: buf_reader,
48 options,
49 }
50 }
51
52 pub fn process_with<F>(&mut self, mut callback: F) -> Result<()>
54 where
55 F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
56 {
57 callback(ProcessingEvent::Start)?;
59
60 self.process_header(&mut callback)?;
62
63 self.process_objects(&mut callback)?;
65
66 callback(ProcessingEvent::End)?;
68
69 Ok(())
70 }
71
72 pub fn process_pages<F>(&mut self, mut page_callback: F) -> Result<()>
74 where
75 F: FnMut(u32, PageData) -> Result<ProcessingAction>,
76 {
77 let mut page_index = 0;
78
79 self.process_with(|event| match event {
80 ProcessingEvent::Page(data) => {
81 let action = page_callback(page_index, data)?;
82 page_index += 1;
83 Ok(action)
84 }
85 _ => Ok(ProcessingAction::Continue),
86 })
87 }
88
89 pub fn extract_text_streaming<W: Write>(&mut self, output: &mut W) -> Result<()> {
91 self.process_pages(|_index, page_data| {
92 if let Some(text) = page_data.text {
93 output.write_all(text.as_bytes())?;
94 output.write_all(b"\n")?;
95 }
96 Ok(ProcessingAction::Continue)
97 })
98 }
99
100 fn process_header<F>(&mut self, callback: &mut F) -> Result<()>
101 where
102 F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
103 {
104 let mut header = String::new();
105 self.reader.read_line(&mut header)?;
106
107 if !header.starts_with("%PDF-") {
108 return Err(PdfError::InvalidHeader);
109 }
110
111 let version = header.trim_start_matches("%PDF-").trim();
112 callback(ProcessingEvent::Header {
113 version: version.to_string(),
114 })?;
115
116 Ok(())
117 }
118
119 fn process_objects<F>(&mut self, callback: &mut F) -> Result<()>
120 where
121 F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
122 {
123 for i in 0..3 {
128 let page_data = PageData {
129 number: i,
130 width: 595.0,
131 height: 842.0,
132 text: Some(format!("Page {} content", i + 1)),
133 operations: vec![],
134 };
135
136 match callback(ProcessingEvent::Page(page_data))? {
137 ProcessingAction::Continue => {}
138 ProcessingAction::Skip => continue,
139 ProcessingAction::Stop => break,
140 }
141 }
142
143 Ok(())
144 }
145}
146
147#[derive(Debug)]
149pub enum ProcessingEvent {
150 Start,
152 Header { version: String },
154 Object { id: (u32, u16), object: PdfObject },
156 Page(PageData),
158 Resource {
160 name: String,
161 resource_type: ResourceType,
162 },
163 End,
165}
166
167#[derive(Debug)]
169pub struct PageData {
170 pub number: u32,
172 pub width: f32,
174 pub height: f32,
176 pub text: Option<String>,
178 pub operations: Vec<ContentOperation>,
180}
181
182#[derive(Debug, Clone)]
184pub enum ResourceType {
185 Font,
186 Image,
187 ColorSpace,
188 Pattern,
189 XObject,
190}
191
192#[derive(Debug, PartialEq)]
194pub enum ProcessingAction {
195 Continue,
197 Skip,
199 Stop,
201}
202
203pub struct ContentStreamProcessor {
205 buffer: Vec<u8>,
206 options: StreamingOptions,
207}
208
209impl ContentStreamProcessor {
210 pub fn new(options: StreamingOptions) -> Self {
212 Self {
213 buffer: Vec::with_capacity(options.buffer_size),
214 options,
215 }
216 }
217
218 pub fn process_stream<R: Read, F>(&mut self, mut reader: R, mut callback: F) -> Result<()>
220 where
221 F: FnMut(&ContentOperation) -> Result<ProcessingAction>,
222 {
223 self.buffer.clear();
224 reader.read_to_end(&mut self.buffer)?;
225
226 if self.buffer.len() > self.options.max_stream_size {
227 return Err(PdfError::ContentStreamTooLarge(self.buffer.len()));
228 }
229
230 let operations =
231 ContentParser::parse(&self.buffer).map_err(|e| PdfError::ParseError(e.to_string()))?;
232
233 for op in operations {
234 match callback(&op)? {
235 ProcessingAction::Continue => {}
236 ProcessingAction::Skip => continue,
237 ProcessingAction::Stop => break,
238 }
239 }
240
241 Ok(())
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use std::io::Cursor;
249
250 #[test]
251 fn test_streaming_options_default() {
252 let options = StreamingOptions::default();
253 assert_eq!(options.buffer_size, 64 * 1024);
254 assert_eq!(options.max_stream_size, 10 * 1024 * 1024);
255 assert!(!options.skip_images);
256 assert!(!options.skip_fonts);
257 }
258
259 #[test]
260 fn test_stream_processor_creation() {
261 let data = b"%PDF-1.7\n";
262 let cursor = Cursor::new(data);
263 let options = StreamingOptions::default();
264 let _processor = StreamProcessor::new(cursor, options);
265 }
266
267 #[test]
268 fn test_processing_events() {
269 let data = b"%PDF-1.7\n";
270 let cursor = Cursor::new(data);
271 let options = StreamingOptions::default();
272 let mut processor = StreamProcessor::new(cursor, options);
273
274 let mut events = Vec::new();
275
276 processor
277 .process_with(|event| {
278 match &event {
279 ProcessingEvent::Start => events.push("start"),
280 ProcessingEvent::Header { version } => {
281 assert_eq!(version, "1.7");
282 events.push("header");
283 }
284 ProcessingEvent::Page(_) => events.push("page"),
285 ProcessingEvent::End => events.push("end"),
286 _ => {}
287 }
288 Ok(ProcessingAction::Continue)
289 })
290 .unwrap();
291
292 assert!(events.contains(&"start"));
293 assert!(events.contains(&"header"));
294 assert!(events.contains(&"end"));
295 }
296
297 #[test]
298 fn test_process_pages() {
299 let data = b"%PDF-1.7\n";
300 let cursor = Cursor::new(data);
301 let options = StreamingOptions::default();
302 let mut processor = StreamProcessor::new(cursor, options);
303
304 let mut page_count = 0;
305
306 processor
307 .process_pages(|index, page| {
308 assert_eq!(index, page_count);
309 assert_eq!(page.width, 595.0);
310 assert_eq!(page.height, 842.0);
311 page_count += 1;
312 Ok(ProcessingAction::Continue)
313 })
314 .unwrap();
315
316 assert!(page_count > 0);
317 }
318
319 #[test]
320 fn test_extract_text_streaming() {
321 let data = b"%PDF-1.7\n";
322 let cursor = Cursor::new(data);
323 let options = StreamingOptions::default();
324 let mut processor = StreamProcessor::new(cursor, options);
325
326 let mut output = Vec::new();
327 processor.extract_text_streaming(&mut output).unwrap();
328
329 let text = String::from_utf8(output).unwrap();
330 assert!(text.contains("Page"));
331 }
332
333 #[test]
334 fn test_processing_action() {
335 assert_eq!(ProcessingAction::Continue, ProcessingAction::Continue);
336 assert_eq!(ProcessingAction::Skip, ProcessingAction::Skip);
337 assert_eq!(ProcessingAction::Stop, ProcessingAction::Stop);
338 assert_ne!(ProcessingAction::Continue, ProcessingAction::Stop);
339 }
340
341 #[test]
342 fn test_content_stream_processor() {
343 let options = StreamingOptions::default();
344 let mut processor = ContentStreamProcessor::new(options);
345
346 let content = b"BT /F1 12 Tf 100 700 Td (Hello) Tj ET";
348 let cursor = Cursor::new(content);
349
350 let mut op_count = 0;
351 processor
352 .process_stream(cursor, |op| {
353 op_count += 1;
354 match op {
355 ContentOperation::BeginText => assert_eq!(op_count, 1),
356 ContentOperation::EndText => assert_eq!(op_count, 5),
357 _ => {}
358 }
359 Ok(ProcessingAction::Continue)
360 })
361 .unwrap();
362
363 assert!(op_count > 0);
364 }
365
366 #[test]
367 fn test_stop_processing() {
368 let data = b"%PDF-1.7\n";
369 let cursor = Cursor::new(data);
370 let options = StreamingOptions::default();
371 let mut processor = StreamProcessor::new(cursor, options);
372
373 let mut page_count = 0;
374
375 processor
376 .process_pages(|_index, _page| {
377 page_count += 1;
378 if page_count >= 2 {
379 Ok(ProcessingAction::Stop)
380 } else {
381 Ok(ProcessingAction::Continue)
382 }
383 })
384 .unwrap();
385
386 assert_eq!(page_count, 2);
387 }
388}