use crate::error::Result;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::path::Path;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct StreamingOptions {
pub buffer_size: usize,
pub compression: bool,
pub auto_flush: bool,
pub compression_level: u32,
pub xref_streaming: bool,
pub progress_callbacks: bool,
pub write_strategy: WriteStrategy,
}
impl Default for StreamingOptions {
fn default() -> Self {
Self {
buffer_size: 256 * 1024, compression: true,
auto_flush: true,
compression_level: 6, xref_streaming: true,
progress_callbacks: false,
write_strategy: WriteStrategy::Buffered,
}
}
}
impl StreamingOptions {
pub fn minimal_memory() -> Self {
Self {
buffer_size: 8 * 1024, compression: true,
auto_flush: true,
compression_level: 9, xref_streaming: true,
progress_callbacks: false,
write_strategy: WriteStrategy::Direct,
}
}
pub fn max_speed() -> Self {
Self {
buffer_size: 2 * 1024 * 1024, compression: false, auto_flush: false, compression_level: 1,
xref_streaming: false, progress_callbacks: false,
write_strategy: WriteStrategy::Buffered,
}
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn with_compression(mut self, enabled: bool) -> Self {
self.compression = enabled;
self
}
pub fn with_auto_flush(mut self, enabled: bool) -> Self {
self.auto_flush = enabled;
self
}
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level.clamp(1, 9);
self
}
pub fn with_write_strategy(mut self, strategy: WriteStrategy) -> Self {
self.write_strategy = strategy;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WriteStrategy {
Buffered,
Direct,
MemoryMapped,
}
pub struct StreamingPdfWriter {
writer: BufWriter<File>,
options: StreamingOptions,
current_position: u64,
object_positions: HashMap<u32, u64>,
current_object_id: u32,
buffer_used: usize,
stats: StreamingStats,
start_time: Instant,
pages_written: u32,
}
impl StreamingPdfWriter {
pub fn create<P: AsRef<Path>>(path: P, options: StreamingOptions) -> Result<Self> {
let file = File::create(path)?;
let writer = BufWriter::with_capacity(options.buffer_size, file);
let mut instance = Self {
writer,
options,
current_position: 0,
object_positions: HashMap::new(),
current_object_id: 1,
buffer_used: 0,
stats: StreamingStats::default(),
start_time: Instant::now(),
pages_written: 0,
};
instance.write_header()?;
Ok(instance)
}
fn write_header(&mut self) -> Result<()> {
let header = b"%PDF-1.7\n%\xE2\xE3\xCF\xD3\n";
self.write_raw(header)?;
Ok(())
}
pub fn write_page_streaming(&mut self, page_content: &StreamingPageContent) -> Result<()> {
let start = Instant::now();
let _page_obj_id = self.start_object()?;
writeln!(self.writer, "<<")?;
writeln!(self.writer, " /Type /Page")?;
writeln!(self.writer, " /Parent 2 0 R")?;
writeln!(
self.writer,
" /MediaBox [0 0 {} {}]",
page_content.width, page_content.height
)?;
if !page_content.content_streams.is_empty() {
writeln!(self.writer, " /Contents [")?;
for (i, stream) in page_content.content_streams.iter().enumerate() {
let stream_obj_id = self.write_content_stream(stream)?;
if i == page_content.content_streams.len() - 1 {
writeln!(self.writer, " {} 0 R", stream_obj_id)?;
} else {
writeln!(self.writer, " {} 0 R", stream_obj_id)?;
}
}
writeln!(self.writer, " ]")?;
}
if !page_content.resources.fonts.is_empty() || !page_content.resources.images.is_empty() {
writeln!(self.writer, " /Resources <<")?;
if !page_content.resources.fonts.is_empty() {
writeln!(self.writer, " /Font <<")?;
for (name, font_id) in &page_content.resources.fonts {
writeln!(self.writer, " /{} {} 0 R", name, font_id)?;
}
writeln!(self.writer, " >>")?;
}
if !page_content.resources.images.is_empty() {
writeln!(self.writer, " /XObject <<")?;
for (name, image_id) in &page_content.resources.images {
writeln!(self.writer, " /{} {} 0 R", name, image_id)?;
}
writeln!(self.writer, " >>")?;
}
writeln!(self.writer, " >>")?;
}
writeln!(self.writer, ">>")?;
self.end_object()?;
self.pages_written += 1;
self.stats.pages_written += 1;
self.stats.total_write_time += start.elapsed();
if self.options.auto_flush && self.should_flush() {
self.flush()?;
}
Ok(())
}
fn write_content_stream(&mut self, content: &ContentStream) -> Result<u32> {
let obj_id = self.start_object()?;
let data = if self.options.compression {
self.compress_data(&content.data)?
} else {
content.data.clone()
};
writeln!(self.writer, "<<")?;
writeln!(self.writer, " /Length {}", data.len())?;
if self.options.compression {
writeln!(self.writer, " /Filter /FlateDecode")?;
}
writeln!(self.writer, ">>")?;
writeln!(self.writer, "stream")?;
self.writer.write_all(&data)?;
writeln!(self.writer, "\nendstream")?;
self.end_object()?;
Ok(obj_id)
}
fn start_object(&mut self) -> Result<u32> {
let obj_id = self.current_object_id;
self.current_object_id += 1;
self.object_positions.insert(obj_id, self.current_position);
writeln!(self.writer, "{} 0 obj", obj_id)?;
self.stats.objects_written += 1;
Ok(obj_id)
}
fn end_object(&mut self) -> Result<()> {
writeln!(self.writer, "endobj")?;
Ok(())
}
fn write_raw(&mut self, data: &[u8]) -> Result<()> {
self.writer.write_all(data)?;
self.current_position += data.len() as u64;
self.buffer_used += data.len();
Ok(())
}
fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
use flate2::write::ZlibEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder =
ZlibEncoder::new(Vec::new(), Compression::new(self.options.compression_level));
encoder.write_all(data)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
fn should_flush(&self) -> bool {
self.buffer_used >= self.options.buffer_size
}
pub fn flush(&mut self) -> Result<()> {
let start = Instant::now();
self.writer.flush()?;
self.current_position = self.writer.seek(SeekFrom::Current(0))?;
self.buffer_used = 0;
self.stats.flushes += 1;
self.stats.total_flush_time += start.elapsed();
Ok(())
}
pub fn finalize(&mut self) -> Result<()> {
let start = Instant::now();
self.flush()?;
let pages_obj_id = self.start_object()?;
writeln!(self.writer, "<<")?;
writeln!(self.writer, " /Type /Pages")?;
writeln!(self.writer, " /Count {}", self.pages_written)?;
writeln!(self.writer, " /Kids [")?;
for i in 0..self.pages_written {
writeln!(self.writer, " {} 0 R", 3 + i * 2)?; }
writeln!(self.writer, " ]")?;
writeln!(self.writer, ">>")?;
self.end_object()?;
let catalog_obj_id = self.start_object()?;
writeln!(self.writer, "<<")?;
writeln!(self.writer, " /Type /Catalog")?;
writeln!(self.writer, " /Pages {} 0 R", pages_obj_id)?;
writeln!(self.writer, ">>")?;
self.end_object()?;
let xref_position = self.current_position;
writeln!(self.writer, "xref")?;
writeln!(self.writer, "0 {}", self.current_object_id)?;
writeln!(self.writer, "0000000000 65535 f ")?;
for obj_id in 1..self.current_object_id {
if let Some(position) = self.object_positions.get(&obj_id) {
writeln!(self.writer, "{:010} 00000 n ", position)?;
}
}
writeln!(self.writer, "trailer")?;
writeln!(self.writer, "<<")?;
writeln!(self.writer, " /Size {}", self.current_object_id)?;
writeln!(self.writer, " /Root {} 0 R", catalog_obj_id)?;
writeln!(self.writer, ">>")?;
writeln!(self.writer, "startxref")?;
writeln!(self.writer, "{}", xref_position)?;
writeln!(self.writer, "%%EOF")?;
self.flush()?;
self.stats.total_time = self.start_time.elapsed();
self.stats.finalization_time = start.elapsed();
Ok(())
}
pub fn stats(&self) -> &StreamingStats {
&self.stats
}
pub fn buffer_usage_percent(&self) -> f64 {
(self.buffer_used as f64 / self.options.buffer_size as f64) * 100.0
}
pub fn memory_usage(&self) -> usize {
self.options.buffer_size
+ self.object_positions.len()
* (std::mem::size_of::<u32>() + std::mem::size_of::<u64>())
+ std::mem::size_of::<StreamingStats>()
}
}
#[derive(Debug, Clone)]
pub struct StreamingPageContent {
pub width: f64,
pub height: f64,
pub content_streams: Vec<ContentStream>,
pub resources: PageResources,
}
#[derive(Debug, Clone)]
pub struct ContentStream {
pub data: Vec<u8>,
}
impl ContentStream {
pub fn new(data: Vec<u8>) -> Self {
Self { data }
}
pub fn from_text(text: &str, x: f64, y: f64, font: &str, size: f64) -> Self {
let content = format!(
"BT\n/{} {} Tf\n{} {} Td\n({}) Tj\nET\n",
font, size, x, y, text
);
Self::new(content.into_bytes())
}
}
#[derive(Debug, Clone, Default)]
pub struct PageResources {
pub fonts: HashMap<String, u32>,
pub images: HashMap<String, u32>,
}
#[derive(Debug, Clone, Default)]
pub struct StreamingStats {
pub pages_written: u32,
pub objects_written: u32,
pub bytes_written: u64,
pub flushes: u32,
pub total_time: std::time::Duration,
pub total_write_time: std::time::Duration,
pub total_flush_time: std::time::Duration,
pub finalization_time: std::time::Duration,
}
impl StreamingStats {
pub fn pages_per_second(&self) -> f64 {
if self.total_time.as_secs_f64() == 0.0 {
return 0.0;
}
self.pages_written as f64 / self.total_time.as_secs_f64()
}
pub fn average_write_time_per_page(&self) -> std::time::Duration {
if self.pages_written == 0 {
return std::time::Duration::ZERO;
}
self.total_write_time / self.pages_written
}
pub fn write_throughput_mbps(&self) -> f64 {
if self.total_time.as_secs_f64() == 0.0 {
return 0.0;
}
let mb = self.bytes_written as f64 / (1024.0 * 1024.0);
mb / self.total_time.as_secs_f64()
}
pub fn summary(&self) -> String {
format!(
"Streaming Writer Stats:\n\
- Pages Written: {}\n\
- Objects Written: {}\n\
- Bytes Written: {:.1} MB\n\
- Flushes: {}\n\
- Total Time: {:.2}s\n\
- Write Time: {:.2}s\n\
- Flush Time: {:.2}s\n\
- Finalization Time: {:.2}s\n\
- Pages/Second: {:.1}\n\
- Throughput: {:.1} MB/s\n\
- Avg Time/Page: {:.2}ms",
self.pages_written,
self.objects_written,
self.bytes_written as f64 / (1024.0 * 1024.0),
self.flushes,
self.total_time.as_secs_f64(),
self.total_write_time.as_secs_f64(),
self.total_flush_time.as_secs_f64(),
self.finalization_time.as_secs_f64(),
self.pages_per_second(),
self.write_throughput_mbps(),
self.average_write_time_per_page().as_secs_f64() * 1000.0
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_streaming_options_default() {
let options = StreamingOptions::default();
assert_eq!(options.buffer_size, 256 * 1024);
assert!(options.compression);
assert!(options.auto_flush);
}
#[test]
fn test_streaming_options_minimal_memory() {
let options = StreamingOptions::minimal_memory();
assert_eq!(options.buffer_size, 8 * 1024);
assert!(options.compression);
assert_eq!(options.compression_level, 9);
assert_eq!(options.write_strategy, WriteStrategy::Direct);
}
#[test]
fn test_streaming_options_max_speed() {
let options = StreamingOptions::max_speed();
assert_eq!(options.buffer_size, 2 * 1024 * 1024);
assert!(!options.compression);
assert!(!options.auto_flush);
}
#[test]
fn test_streaming_writer_creation() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.pdf");
let options = StreamingOptions::default();
let writer = StreamingPdfWriter::create(&file_path, options);
assert!(writer.is_ok());
assert!(file_path.exists());
}
#[test]
fn test_content_stream_creation() {
let stream = ContentStream::new(b"Hello".to_vec());
assert_eq!(stream.data, b"Hello");
let text_stream = ContentStream::from_text("Hello", 100.0, 200.0, "F1", 12.0);
assert!(String::from_utf8_lossy(&text_stream.data).contains("Hello"));
assert!(String::from_utf8_lossy(&text_stream.data).contains("100 200 Td"));
}
#[test]
fn test_page_content() {
let mut resources = PageResources::default();
resources.fonts.insert("F1".to_string(), 1);
let content = StreamingPageContent {
width: 595.0,
height: 842.0,
content_streams: vec![ContentStream::from_text("Hello", 100.0, 200.0, "F1", 12.0)],
resources,
};
assert_eq!(content.width, 595.0);
assert_eq!(content.content_streams.len(), 1);
assert!(content.resources.fonts.contains_key("F1"));
}
#[test]
fn test_streaming_stats() {
let mut stats = StreamingStats::default();
stats.pages_written = 10;
stats.total_time = std::time::Duration::from_secs(2);
stats.bytes_written = 1024 * 1024;
assert_eq!(stats.pages_per_second(), 5.0);
assert_eq!(stats.write_throughput_mbps(), 0.5);
}
#[test]
fn test_buffer_usage_calculation() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.pdf");
let options = StreamingOptions::default().with_buffer_size(1000);
let writer = StreamingPdfWriter::create(&file_path, options).unwrap();
assert!(writer.buffer_usage_percent() < 10.0);
}
#[test]
fn test_write_strategy_enum() {
assert_eq!(WriteStrategy::Buffered, WriteStrategy::Buffered);
assert_ne!(WriteStrategy::Buffered, WriteStrategy::Direct);
assert_ne!(WriteStrategy::Direct, WriteStrategy::MemoryMapped);
}
#[test]
fn test_streaming_options_builder() {
let options = StreamingOptions::default()
.with_buffer_size(128 * 1024)
.with_compression(false)
.with_auto_flush(false)
.with_compression_level(3)
.with_write_strategy(WriteStrategy::Direct);
assert_eq!(options.buffer_size, 128 * 1024);
assert!(!options.compression);
assert!(!options.auto_flush);
assert_eq!(options.compression_level, 3);
assert_eq!(options.write_strategy, WriteStrategy::Direct);
}
#[test]
fn test_compression_level_clamping() {
let options = StreamingOptions::default().with_compression_level(15);
assert_eq!(options.compression_level, 9);
let options = StreamingOptions::default().with_compression_level(0);
assert_eq!(options.compression_level, 1); }
}