use crate::error::{PdfError, Result};
use crate::parser::content::{ContentOperation, ContentParser};
use crate::parser::PdfObject;
use std::io::{BufRead, BufReader, Read, Seek, Write};
#[derive(Debug, Clone)]
pub struct StreamingOptions {
pub buffer_size: usize,
pub max_stream_size: usize,
pub skip_images: bool,
pub skip_fonts: bool,
}
impl Default for StreamingOptions {
fn default() -> Self {
Self {
buffer_size: 64 * 1024, max_stream_size: 10 * 1024 * 1024, skip_images: false,
skip_fonts: false,
}
}
}
pub struct StreamProcessor<R: Read + Seek> {
reader: BufReader<R>,
#[allow(dead_code)]
options: StreamingOptions,
}
impl<R: Read + Seek> StreamProcessor<R> {
pub fn new(reader: R, options: StreamingOptions) -> Self {
let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
Self {
reader: buf_reader,
options,
}
}
pub fn process_with<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
{
callback(ProcessingEvent::Start)?;
self.process_header(&mut callback)?;
self.process_objects(&mut callback)?;
callback(ProcessingEvent::End)?;
Ok(())
}
pub fn process_pages<F>(&mut self, mut page_callback: F) -> Result<()>
where
F: FnMut(u32, PageData) -> Result<ProcessingAction>,
{
let mut page_index = 0;
self.process_with(|event| match event {
ProcessingEvent::Page(data) => {
let action = page_callback(page_index, data)?;
page_index += 1;
Ok(action)
}
_ => Ok(ProcessingAction::Continue),
})
}
pub fn extract_text_streaming<W: Write>(&mut self, output: &mut W) -> Result<()> {
self.process_pages(|_index, page_data| {
if let Some(text) = page_data.text {
output.write_all(text.as_bytes())?;
output.write_all(b"\n")?;
}
Ok(ProcessingAction::Continue)
})
}
fn process_header<F>(&mut self, callback: &mut F) -> Result<()>
where
F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
{
let mut header = String::new();
self.reader.read_line(&mut header)?;
if !header.starts_with("%PDF-") {
return Err(PdfError::InvalidHeader);
}
let version = header.trim_start_matches("%PDF-").trim();
callback(ProcessingEvent::Header {
version: version.to_string(),
})?;
Ok(())
}
fn process_objects<F>(&mut self, callback: &mut F) -> Result<()>
where
F: FnMut(ProcessingEvent) -> Result<ProcessingAction>,
{
for i in 0..3 {
let page_data = PageData {
number: i,
width: 595.0,
height: 842.0,
text: Some(format!("Page {} content", i + 1)),
operations: vec![],
};
match callback(ProcessingEvent::Page(page_data))? {
ProcessingAction::Continue => {}
ProcessingAction::Skip => continue,
ProcessingAction::Stop => break,
}
}
Ok(())
}
}
#[derive(Debug)]
pub enum ProcessingEvent {
Start,
Header { version: String },
Object { id: (u32, u16), object: PdfObject },
Page(PageData),
Resource {
name: String,
resource_type: ResourceType,
},
End,
}
#[derive(Debug)]
pub struct PageData {
pub number: u32,
pub width: f32,
pub height: f32,
pub text: Option<String>,
pub operations: Vec<ContentOperation>,
}
#[derive(Debug, Clone)]
pub enum ResourceType {
Font,
Image,
ColorSpace,
Pattern,
XObject,
}
#[derive(Debug, PartialEq)]
pub enum ProcessingAction {
Continue,
Skip,
Stop,
}
pub struct ContentStreamProcessor {
buffer: Vec<u8>,
options: StreamingOptions,
}
impl ContentStreamProcessor {
pub fn new(options: StreamingOptions) -> Self {
Self {
buffer: Vec::with_capacity(options.buffer_size),
options,
}
}
pub fn process_stream<R: Read, F>(&mut self, mut reader: R, mut callback: F) -> Result<()>
where
F: FnMut(&ContentOperation) -> Result<ProcessingAction>,
{
self.buffer.clear();
reader.read_to_end(&mut self.buffer)?;
if self.buffer.len() > self.options.max_stream_size {
return Err(PdfError::ContentStreamTooLarge(self.buffer.len()));
}
let operations =
ContentParser::parse(&self.buffer).map_err(|e| PdfError::ParseError(e.to_string()))?;
for op in operations {
match callback(&op)? {
ProcessingAction::Continue => {}
ProcessingAction::Skip => continue,
ProcessingAction::Stop => break,
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_streaming_options_default() {
let options = StreamingOptions::default();
assert_eq!(options.buffer_size, 64 * 1024);
assert_eq!(options.max_stream_size, 10 * 1024 * 1024);
assert!(!options.skip_images);
assert!(!options.skip_fonts);
}
#[test]
fn test_stream_processor_creation() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let _processor = StreamProcessor::new(cursor, options);
}
#[test]
fn test_processing_events() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut events = Vec::new();
processor
.process_with(|event| {
match &event {
ProcessingEvent::Start => events.push("start"),
ProcessingEvent::Header { version } => {
assert_eq!(version, "1.7");
events.push("header");
}
ProcessingEvent::Page(_) => events.push("page"),
ProcessingEvent::End => events.push("end"),
_ => {}
}
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(events.contains(&"start"));
assert!(events.contains(&"header"));
assert!(events.contains(&"end"));
}
#[test]
fn test_process_pages() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut page_count = 0;
processor
.process_pages(|index, page| {
assert_eq!(index, page_count);
assert_eq!(page.width, 595.0);
assert_eq!(page.height, 842.0);
page_count += 1;
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(page_count > 0);
}
#[test]
fn test_extract_text_streaming() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut output = Vec::new();
processor.extract_text_streaming(&mut output).unwrap();
let text = String::from_utf8(output).unwrap();
assert!(text.contains("Page"));
}
#[test]
fn test_processing_action() {
assert_eq!(ProcessingAction::Continue, ProcessingAction::Continue);
assert_eq!(ProcessingAction::Skip, ProcessingAction::Skip);
assert_eq!(ProcessingAction::Stop, ProcessingAction::Stop);
assert_ne!(ProcessingAction::Continue, ProcessingAction::Stop);
}
#[test]
fn test_content_stream_processor() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"BT /F1 12 Tf 100 700 Td (Hello) Tj ET";
let cursor = Cursor::new(content);
let mut op_count = 0;
processor
.process_stream(cursor, |op| {
op_count += 1;
match op {
ContentOperation::BeginText => assert_eq!(op_count, 1),
ContentOperation::EndText => assert_eq!(op_count, 5),
_ => {}
}
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(op_count > 0);
}
#[test]
fn test_stop_processing() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut page_count = 0;
processor
.process_pages(|_index, _page| {
page_count += 1;
if page_count >= 2 {
Ok(ProcessingAction::Stop)
} else {
Ok(ProcessingAction::Continue)
}
})
.unwrap();
assert_eq!(page_count, 2);
}
#[test]
fn test_streaming_options_custom() {
let options = StreamingOptions {
buffer_size: 1024,
max_stream_size: 2048,
skip_images: true,
skip_fonts: true,
};
assert_eq!(options.buffer_size, 1024);
assert_eq!(options.max_stream_size, 2048);
assert!(options.skip_images);
assert!(options.skip_fonts);
}
#[test]
fn test_streaming_options_debug_clone() {
let options = StreamingOptions {
buffer_size: 512,
max_stream_size: 1024,
skip_images: false,
skip_fonts: true,
};
let debug_str = format!("{options:?}");
assert!(debug_str.contains("StreamingOptions"));
assert!(debug_str.contains("512"));
assert!(debug_str.contains("1024"));
let cloned = options;
assert_eq!(cloned.buffer_size, 512);
assert_eq!(cloned.max_stream_size, 1024);
assert!(!cloned.skip_images);
assert!(cloned.skip_fonts);
}
#[test]
fn test_processing_event_debug() {
let events = vec![
ProcessingEvent::Start,
ProcessingEvent::Header {
version: "1.7".to_string(),
},
ProcessingEvent::Object {
id: (1, 0),
object: PdfObject::Null,
},
ProcessingEvent::Page(PageData {
number: 0,
width: 595.0,
height: 842.0,
text: Some("test".to_string()),
operations: vec![],
}),
ProcessingEvent::Resource {
name: "Font1".to_string(),
resource_type: ResourceType::Font,
},
ProcessingEvent::End,
];
for event in events {
let debug_str = format!("{event:?}");
assert!(!debug_str.is_empty());
}
}
#[test]
fn test_page_data_debug() {
let page_data = PageData {
number: 5,
width: 612.0,
height: 792.0,
text: Some("Page content".to_string()),
operations: vec![ContentOperation::BeginText],
};
let debug_str = format!("{page_data:?}");
assert!(debug_str.contains("PageData"));
assert!(debug_str.contains("5"));
assert!(debug_str.contains("612.0"));
assert!(debug_str.contains("Page content"));
}
#[test]
fn test_resource_type_debug_clone() {
let resource_types = vec![
ResourceType::Font,
ResourceType::Image,
ResourceType::ColorSpace,
ResourceType::Pattern,
ResourceType::XObject,
];
for resource_type in resource_types {
let debug_str = format!("{resource_type:?}");
assert!(!debug_str.is_empty());
let cloned = resource_type.clone();
let cloned_debug = format!("{cloned:?}");
assert_eq!(debug_str, cloned_debug);
}
}
#[test]
fn test_processing_action_debug_partial_eq() {
let action = ProcessingAction::Continue;
let debug_str = format!("{action:?}");
assert!(debug_str.contains("Continue"));
assert_eq!(ProcessingAction::Continue, ProcessingAction::Continue);
assert_eq!(ProcessingAction::Skip, ProcessingAction::Skip);
assert_eq!(ProcessingAction::Stop, ProcessingAction::Stop);
assert_ne!(ProcessingAction::Continue, ProcessingAction::Skip);
assert_ne!(ProcessingAction::Skip, ProcessingAction::Stop);
assert_ne!(ProcessingAction::Stop, ProcessingAction::Continue);
}
#[test]
fn test_stream_processor_invalid_header() {
let data = b"Not a PDF\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let result = processor.process_with(|_event| Ok(ProcessingAction::Continue));
assert!(result.is_err());
match result {
Err(PdfError::InvalidHeader) => {}
_ => panic!("Expected InvalidHeader error"),
}
}
#[test]
fn test_stream_processor_header_parsing() {
let data = b"%PDF-2.0\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut header_version = String::new();
processor
.process_with(|event| {
if let ProcessingEvent::Header { version } = event {
header_version = version;
}
Ok(ProcessingAction::Continue)
})
.unwrap();
assert_eq!(header_version, "2.0");
}
#[test]
fn test_skip_processing_action() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut page_count = 0;
let mut skipped_count = 0;
processor
.process_pages(|index, _page| {
if index % 2 == 0 {
page_count += 1;
Ok(ProcessingAction::Continue)
} else {
skipped_count += 1;
Ok(ProcessingAction::Skip)
}
})
.unwrap();
assert!(page_count > 0);
assert!(skipped_count > 0);
}
#[test]
fn test_extract_text_streaming_with_output() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut output = Vec::new();
processor.extract_text_streaming(&mut output).unwrap();
let text = String::from_utf8(output).unwrap();
assert!(text.contains("Page 1 content"));
assert!(text.contains("Page 2 content"));
assert!(text.contains("Page 3 content"));
assert!(text.contains('\n'));
}
#[test]
fn test_content_stream_processor_creation() {
let options = StreamingOptions {
buffer_size: 2048,
max_stream_size: 4096,
skip_images: true,
skip_fonts: false,
};
let processor = ContentStreamProcessor::new(options.clone());
assert_eq!(processor.buffer.capacity(), options.buffer_size);
assert_eq!(processor.options.buffer_size, 2048);
assert_eq!(processor.options.max_stream_size, 4096);
assert!(processor.options.skip_images);
assert!(!processor.options.skip_fonts);
}
#[test]
fn test_content_stream_processor_empty_stream() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"";
let cursor = Cursor::new(content);
let mut op_count = 0;
processor
.process_stream(cursor, |_op| {
op_count += 1;
Ok(ProcessingAction::Continue)
})
.unwrap();
assert_eq!(op_count, 0);
}
#[test]
fn test_content_stream_processor_large_stream_error() {
let options = StreamingOptions {
buffer_size: 1024,
max_stream_size: 10, skip_images: false,
skip_fonts: false,
};
let mut processor = ContentStreamProcessor::new(options);
let content = b"BT /F1 12 Tf 100 700 Td (This is a long content stream) Tj ET";
let cursor = Cursor::new(content);
let result = processor.process_stream(cursor, |_op| Ok(ProcessingAction::Continue));
assert!(result.is_err());
match result {
Err(PdfError::ContentStreamTooLarge(size)) => {
assert_eq!(size, content.len());
}
_ => panic!("Expected ContentStreamTooLarge error"),
}
}
#[test]
fn test_content_stream_processor_skip_action() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"BT /F1 12 Tf 100 700 Td (Hello) Tj 50 0 Td (World) Tj ET";
let cursor = Cursor::new(content);
let mut processed_count = 0;
let mut skipped_count = 0;
processor
.process_stream(cursor, |op| match op {
ContentOperation::ShowText(_) => {
skipped_count += 1;
Ok(ProcessingAction::Skip)
}
_ => {
processed_count += 1;
Ok(ProcessingAction::Continue)
}
})
.unwrap();
assert!(processed_count > 0);
assert!(skipped_count > 0);
}
#[test]
fn test_content_stream_processor_stop_action() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"BT /F1 12 Tf 100 700 Td (Hello) Tj 50 0 Td (World) Tj ET";
let cursor = Cursor::new(content);
let mut op_count = 0;
processor
.process_stream(cursor, |_op| {
op_count += 1;
if op_count >= 3 {
Ok(ProcessingAction::Stop)
} else {
Ok(ProcessingAction::Continue)
}
})
.unwrap();
assert_eq!(op_count, 3);
}
#[test]
fn test_content_stream_processor_invalid_content() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"Invalid PDF content that cannot be parsed";
let cursor = Cursor::new(content);
let result = processor.process_stream(cursor, |_op| Ok(ProcessingAction::Continue));
match result {
Ok(_) => {} Err(PdfError::ParseError(_)) => {} _ => panic!("Unexpected error type"),
}
}
#[test]
fn test_content_stream_processor_callback_error() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content = b"BT /F1 12 Tf ET";
let cursor = Cursor::new(content);
let result = processor.process_stream(cursor, |_op| {
Err(PdfError::ParseError("Test error".to_string()))
});
assert!(result.is_err());
match result {
Err(PdfError::ParseError(msg)) => {
assert_eq!(msg, "Test error");
}
_ => panic!("Expected ParseError"),
}
}
#[test]
fn test_stream_processor_with_custom_buffer_size() {
let options = StreamingOptions {
buffer_size: 128,
max_stream_size: 1024,
skip_images: false,
skip_fonts: false,
};
let data = b"%PDF-1.4\n";
let cursor = Cursor::new(data);
let mut processor = StreamProcessor::new(cursor, options);
let mut header_found = false;
processor
.process_with(|event| {
if let ProcessingEvent::Header { version } = event {
assert_eq!(version, "1.4");
header_found = true;
}
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(header_found);
}
#[test]
fn test_processing_with_all_event_types() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut event_types = Vec::new();
processor
.process_with(|event| {
match event {
ProcessingEvent::Start => event_types.push("start"),
ProcessingEvent::Header { .. } => event_types.push("header"),
ProcessingEvent::Object { .. } => event_types.push("object"),
ProcessingEvent::Page(_) => event_types.push("page"),
ProcessingEvent::Resource { .. } => event_types.push("resource"),
ProcessingEvent::End => event_types.push("end"),
}
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(event_types.contains(&"start"));
assert!(event_types.contains(&"header"));
assert!(event_types.contains(&"page"));
assert!(event_types.contains(&"end"));
}
#[test]
fn test_page_data_with_operations() {
let page_data = PageData {
number: 0,
width: 595.0,
height: 842.0,
text: Some("Test page".to_string()),
operations: vec![ContentOperation::BeginText, ContentOperation::EndText],
};
assert_eq!(page_data.number, 0);
assert_eq!(page_data.width, 595.0);
assert_eq!(page_data.height, 842.0);
assert_eq!(page_data.text, Some("Test page".to_string()));
assert_eq!(page_data.operations.len(), 2);
}
#[test]
fn test_page_data_without_text() {
let page_data = PageData {
number: 1,
width: 612.0,
height: 792.0,
text: None,
operations: vec![],
};
assert_eq!(page_data.number, 1);
assert_eq!(page_data.text, None);
assert!(page_data.operations.is_empty());
}
#[test]
fn test_extract_text_streaming_no_text() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(cursor, options);
let mut pages_processed = 0;
processor
.process_pages(|_index, page| {
pages_processed += 1;
assert!(page.text.is_some()); Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(pages_processed > 0);
}
#[test]
fn test_stream_processor_io_error() {
use std::io::Error;
struct ErrorReader;
impl Read for ErrorReader {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
Err(Error::other("IO Error"))
}
}
impl Seek for ErrorReader {
fn seek(&mut self, _pos: std::io::SeekFrom) -> std::io::Result<u64> {
Ok(0)
}
}
let options = StreamingOptions::default();
let mut processor = StreamProcessor::new(ErrorReader, options);
let result = processor.process_with(|_event| Ok(ProcessingAction::Continue));
assert!(result.is_err());
}
#[test]
fn test_content_stream_processor_buffer_reuse() {
let options = StreamingOptions::default();
let mut processor = ContentStreamProcessor::new(options);
let content1 = b"BT (First) Tj ET";
let cursor1 = Cursor::new(content1);
let mut first_ops = Vec::new();
processor
.process_stream(cursor1, |op| {
first_ops.push(format!("{op:?}"));
Ok(ProcessingAction::Continue)
})
.unwrap();
let content2 = b"BT (Second) Tj ET";
let cursor2 = Cursor::new(content2);
let mut second_ops = Vec::new();
processor
.process_stream(cursor2, |op| {
second_ops.push(format!("{op:?}"));
Ok(ProcessingAction::Continue)
})
.unwrap();
assert!(!first_ops.is_empty());
assert!(!second_ops.is_empty());
assert_ne!(first_ops, second_ops);
}
}