use crate::error::Result;
use std::collections::VecDeque;
use std::io::{BufReader, Read, Seek};
pub mod chunk_processor;
pub mod incremental_parser;
pub mod page_streamer;
pub mod text_streamer;
pub use chunk_processor::{
process_in_chunks, ChunkOptions, ChunkProcessor, ChunkType, ContentChunk,
};
pub use incremental_parser::{process_incrementally, IncrementalParser, ParseEvent};
pub use page_streamer::{PageStreamer, StreamingPage};
pub use text_streamer::{stream_text, TextChunk, TextStreamOptions, TextStreamer};
#[derive(Debug, Clone)]
pub struct StreamingOptions {
pub buffer_size: usize,
pub page_cache_size: usize,
pub max_content_stream_size: usize,
pub progressive_hints: bool,
pub memory_limit: usize,
}
impl Default for StreamingOptions {
fn default() -> Self {
Self {
buffer_size: 256 * 1024, page_cache_size: 3, max_content_stream_size: 10 * 1024 * 1024, progressive_hints: true,
memory_limit: 100 * 1024 * 1024, }
}
}
impl StreamingOptions {
pub fn minimal_memory() -> Self {
Self {
buffer_size: 64 * 1024,
page_cache_size: 1,
max_content_stream_size: 1024 * 1024,
progressive_hints: false,
memory_limit: 10 * 1024 * 1024,
}
}
pub fn fast_processing() -> Self {
Self {
buffer_size: 1024 * 1024,
page_cache_size: 10,
max_content_stream_size: 50 * 1024 * 1024,
progressive_hints: true,
memory_limit: 500 * 1024 * 1024,
}
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn with_page_cache_size(mut self, size: usize) -> Self {
self.page_cache_size = size;
self
}
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = limit;
self
}
}
pub struct StreamingDocument<R: Read + Seek> {
#[allow(dead_code)]
reader: BufReader<R>,
options: StreamingOptions,
page_cache: VecDeque<StreamingPage>,
current_page: u32,
total_pages: Option<u32>,
memory_used: usize,
}
impl<R: Read + Seek> StreamingDocument<R> {
pub fn new(reader: R, options: StreamingOptions) -> Result<Self> {
let buf_reader = BufReader::with_capacity(options.buffer_size, reader);
Ok(Self {
reader: buf_reader,
options,
page_cache: VecDeque::new(),
current_page: 0,
total_pages: None,
memory_used: 0,
})
}
pub fn next_page(&mut self) -> Result<Option<StreamingPage>> {
if let Some(total) = self.total_pages {
if self.current_page >= total {
return Ok(None);
}
} else {
if self.current_page >= 10 {
return Ok(None);
}
}
if self.memory_used > self.options.memory_limit {
self.evict_pages();
}
let page = StreamingPage {
number: self.current_page,
width: 595.0,
height: 842.0,
content_offset: 0,
content_length: 0,
};
self.current_page += 1;
if self.page_cache.len() < self.options.page_cache_size {
self.page_cache.push_back(page.clone());
}
Ok(Some(page))
}
pub fn process_pages<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(&StreamingPage) -> Result<()>,
{
while let Some(page) = self.next_page()? {
callback(&page)?;
}
Ok(())
}
pub fn memory_usage(&self) -> usize {
self.memory_used
}
pub fn clear_cache(&mut self) {
self.page_cache.clear();
self.memory_used = 0;
}
fn evict_pages(&mut self) {
while self.memory_used > self.options.memory_limit && !self.page_cache.is_empty() {
if self.page_cache.pop_front().is_some() {
self.memory_used = self.memory_used.saturating_sub(1024);
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamingStats {
pub bytes_processed: usize,
pub pages_processed: u32,
pub objects_parsed: u32,
pub memory_used: usize,
pub peak_memory: usize,
pub cache_evictions: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_streaming_options_default() {
let options = StreamingOptions::default();
assert_eq!(options.buffer_size, 256 * 1024);
assert_eq!(options.page_cache_size, 3);
assert!(options.progressive_hints);
}
#[test]
fn test_streaming_options_minimal() {
let options = StreamingOptions::minimal_memory();
assert_eq!(options.buffer_size, 64 * 1024);
assert_eq!(options.page_cache_size, 1);
assert!(!options.progressive_hints);
assert_eq!(options.memory_limit, 10 * 1024 * 1024);
}
#[test]
fn test_streaming_options_fast() {
let options = StreamingOptions::fast_processing();
assert_eq!(options.buffer_size, 1024 * 1024);
assert_eq!(options.page_cache_size, 10);
assert!(options.progressive_hints);
}
#[test]
fn test_streaming_options_builder() {
let options = StreamingOptions::default()
.with_buffer_size(512 * 1024)
.with_page_cache_size(5)
.with_memory_limit(50 * 1024 * 1024);
assert_eq!(options.buffer_size, 512 * 1024);
assert_eq!(options.page_cache_size, 5);
assert_eq!(options.memory_limit, 50 * 1024 * 1024);
}
#[test]
fn test_streaming_document_creation() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let doc = StreamingDocument::new(cursor, options);
assert!(doc.is_ok());
}
#[test]
fn test_next_page() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut doc = StreamingDocument::new(cursor, options).unwrap();
let page = doc.next_page().unwrap();
assert!(page.is_some());
let page = page.unwrap();
assert_eq!(page.number(), 0);
assert_eq!(page.width(), 595.0);
assert_eq!(page.height(), 842.0);
}
#[test]
fn test_process_pages() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default();
let mut doc = StreamingDocument::new(cursor, options).unwrap();
let mut page_count = 0;
doc.process_pages(|page| {
page_count += 1;
assert!(page.number() < 1000); Ok(())
})
.unwrap();
assert!(page_count > 0);
}
#[test]
fn test_memory_management() {
let data = b"%PDF-1.7\n";
let cursor = Cursor::new(data);
let options = StreamingOptions::default().with_memory_limit(1024);
let mut doc = StreamingDocument::new(cursor, options).unwrap();
for _ in 0..5 {
let _ = doc.next_page();
}
assert!(doc.page_cache.len() <= 3);
doc.clear_cache();
assert_eq!(doc.page_cache.len(), 0);
assert_eq!(doc.memory_usage(), 0);
}
#[test]
fn test_streaming_stats() {
let stats = StreamingStats::default();
assert_eq!(stats.bytes_processed, 0);
assert_eq!(stats.pages_processed, 0);
assert_eq!(stats.objects_parsed, 0);
assert_eq!(stats.memory_used, 0);
assert_eq!(stats.peak_memory, 0);
assert_eq!(stats.cache_evictions, 0);
}
}