#[cfg(feature = "rayon")]
use rayon::prelude::*;
use crate::error::Result;
use crate::performance::{MemoryPool, PerformancePage, ResourcePool};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ParallelGenerationOptions {
pub max_threads: usize,
pub chunk_size: usize,
pub load_balancing: bool,
pub max_memory_per_thread: usize,
pub progress_reporting: bool,
pub thread_pool_config: ThreadPoolConfig,
}
impl Default for ParallelGenerationOptions {
fn default() -> Self {
Self {
max_threads: num_cpus::get().min(8),
chunk_size: 4,
load_balancing: true,
max_memory_per_thread: 64 * 1024 * 1024, progress_reporting: false,
thread_pool_config: ThreadPoolConfig::default(),
}
}
}
impl ParallelGenerationOptions {
pub fn max_throughput() -> Self {
Self {
max_threads: num_cpus::get(),
chunk_size: 2, load_balancing: true,
max_memory_per_thread: 128 * 1024 * 1024, progress_reporting: false, thread_pool_config: ThreadPoolConfig::max_performance(),
}
}
pub fn memory_efficient() -> Self {
Self {
max_threads: (num_cpus::get() / 2).max(1),
chunk_size: 8, load_balancing: false,
max_memory_per_thread: 16 * 1024 * 1024, progress_reporting: true, thread_pool_config: ThreadPoolConfig::memory_efficient(),
}
}
pub fn with_max_threads(mut self, threads: usize) -> Self {
self.max_threads = threads.max(1);
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size.max(1);
self
}
pub fn with_load_balancing(mut self, enabled: bool) -> Self {
self.load_balancing = enabled;
self
}
pub fn with_max_memory_per_thread(mut self, bytes: usize) -> Self {
self.max_memory_per_thread = bytes;
self
}
pub fn with_progress_reporting(mut self, enabled: bool) -> Self {
self.progress_reporting = enabled;
self
}
}
#[derive(Debug, Clone)]
pub struct ThreadPoolConfig {
pub stack_size: usize,
pub thread_priority: ThreadPriority,
pub thread_name_prefix: String,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ThreadPriority {
Low,
Normal,
High,
}
impl Default for ThreadPoolConfig {
fn default() -> Self {
Self {
stack_size: 2 * 1024 * 1024, thread_priority: ThreadPriority::Normal,
thread_name_prefix: "pdf-worker".to_string(),
}
}
}
impl ThreadPoolConfig {
pub fn max_performance() -> Self {
Self {
stack_size: 4 * 1024 * 1024, thread_priority: ThreadPriority::High,
thread_name_prefix: "pdf-fast".to_string(),
}
}
pub fn memory_efficient() -> Self {
Self {
stack_size: 512 * 1024, thread_priority: ThreadPriority::Low,
thread_name_prefix: "pdf-mem".to_string(),
}
}
}
pub struct ParallelPageGenerator {
options: ParallelGenerationOptions,
resource_pool: Arc<ResourcePool>,
stats: Arc<Mutex<ParallelStats>>,
#[cfg(feature = "rayon")]
thread_pool: Option<rayon::ThreadPool>,
}
impl ParallelPageGenerator {
pub fn new(options: ParallelGenerationOptions) -> Result<Self> {
let resource_pool = Arc::new(ResourcePool::new());
let stats = Arc::new(Mutex::new(ParallelStats::default()));
#[cfg(feature = "rayon")]
let thread_pool = Self::create_thread_pool(&options)?;
#[cfg(not(feature = "rayon"))]
let thread_pool: Option<rayon::ThreadPool> = None;
Ok(Self {
options,
resource_pool,
stats,
#[cfg(feature = "rayon")]
thread_pool,
})
}
#[cfg(feature = "rayon")]
fn create_thread_pool(
options: &ParallelGenerationOptions,
) -> Result<Option<rayon::ThreadPool>> {
let thread_name_prefix = options.thread_pool_config.thread_name_prefix.clone();
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(options.max_threads)
.stack_size(options.thread_pool_config.stack_size)
.thread_name(move |index| format!("{}-{}", thread_name_prefix, index))
.build()
.map_err(|e| {
crate::error::PdfError::Internal(format!("Failed to create thread pool: {}", e))
})?;
Ok(Some(pool))
}
#[cfg(feature = "rayon")]
pub fn process_pages_parallel(&self, pages: Vec<PageSpec>) -> Result<Vec<ProcessedPage>> {
let start_time = Instant::now();
if let Some(ref pool) = self.thread_pool {
let result = pool.install(|| self.process_pages_internal(pages));
let mut stats = self.stats.lock().unwrap();
stats.total_processing_time = start_time.elapsed();
stats.parallel_executions += 1;
result
} else {
self.process_pages_sequential(pages)
}
}
#[cfg(not(feature = "rayon"))]
pub fn process_pages_parallel(&self, pages: Vec<PageSpec>) -> Result<Vec<ProcessedPage>> {
self.process_pages_sequential(pages)
}
#[cfg(feature = "rayon")]
fn process_pages_internal(&self, pages: Vec<PageSpec>) -> Result<Vec<ProcessedPage>> {
let chunk_size = self.options.chunk_size;
let resource_pool = Arc::clone(&self.resource_pool);
let stats = Arc::clone(&self.stats);
let results: Result<Vec<Vec<ProcessedPage>>> = pages
.chunks(chunk_size)
.enumerate()
.collect::<Vec<_>>()
.par_iter()
.map(|(chunk_idx, chunk)| {
self.process_chunk(
*chunk_idx,
chunk,
Arc::clone(&resource_pool),
Arc::clone(&stats),
)
})
.collect();
let processed_chunks = results?;
let final_results: Vec<ProcessedPage> = processed_chunks.into_iter().flatten().collect();
let mut stats_guard = self.stats.lock().unwrap();
stats_guard.total_pages_processed = final_results.len();
Ok(final_results)
}
fn process_chunk(
&self,
_chunk_idx: usize,
chunk: &[PageSpec],
resource_pool: Arc<ResourcePool>,
stats: Arc<Mutex<ParallelStats>>,
) -> Result<Vec<ProcessedPage>> {
let start = Instant::now();
let thread_id = self.get_current_thread_id();
let memory_pool = MemoryPool::new(self.options.max_memory_per_thread);
let mut processed = Vec::with_capacity(chunk.len());
for (_page_idx, spec) in chunk.iter().enumerate() {
let page_start = Instant::now();
let processor = PageProcessor::new(Arc::clone(&resource_pool), &memory_pool, thread_id);
let processed_page = processor.process_page(spec)?;
processed.push(processed_page);
if self.options.progress_reporting {
let mut stats_guard = stats.lock().unwrap();
stats_guard.pages_completed += 1;
stats_guard.total_page_time += page_start.elapsed();
let current_count = *stats_guard.thread_usage.get(&thread_id).unwrap_or(&0);
stats_guard
.thread_usage
.insert(thread_id, current_count + 1);
}
}
let mut stats_guard = stats.lock().unwrap();
stats_guard.chunks_processed += 1;
stats_guard.total_chunk_time += start.elapsed();
stats_guard.chunk_sizes.push(chunk.len());
Ok(processed)
}
fn process_pages_sequential(&self, pages: Vec<PageSpec>) -> Result<Vec<ProcessedPage>> {
let start_time = Instant::now();
let memory_pool = MemoryPool::new(self.options.max_memory_per_thread);
let processor = PageProcessor::new(
Arc::clone(&self.resource_pool),
&memory_pool,
0, );
let mut results = Vec::with_capacity(pages.len());
for spec in pages {
let processed = processor.process_page(&spec)?;
results.push(processed);
}
let mut stats = self.stats.lock().unwrap();
stats.total_processing_time = start_time.elapsed();
stats.total_pages_processed = results.len();
stats.sequential_executions += 1;
Ok(results)
}
fn get_current_thread_id(&self) -> usize {
#[cfg(feature = "rayon")]
{
rayon::current_thread_index().unwrap_or(0)
}
#[cfg(not(feature = "rayon"))]
{
0
}
}
pub fn stats(&self) -> ParallelStats {
self.stats.lock().unwrap().clone()
}
pub fn reset_stats(&self) {
*self.stats.lock().unwrap() = ParallelStats::default();
}
pub fn is_parallel_available(&self) -> bool {
#[cfg(feature = "rayon")]
{
self.thread_pool.is_some()
}
#[cfg(not(feature = "rayon"))]
{
false
}
}
pub fn optimal_chunk_size(&self, total_pages: usize) -> usize {
let threads = self.options.max_threads;
let base_chunk_size = (total_pages / threads).max(1);
let memory_per_page = 1024 * 1024; let max_chunk_by_memory = self.options.max_memory_per_thread / memory_per_page;
base_chunk_size.min(max_chunk_by_memory).max(1)
}
}
pub struct PageProcessor {
#[allow(dead_code)]
resource_pool: Arc<ResourcePool>,
memory_pool: MemoryPool,
thread_id: usize,
}
impl PageProcessor {
pub fn new(
resource_pool: Arc<ResourcePool>,
memory_pool: &MemoryPool,
thread_id: usize,
) -> Self {
Self {
resource_pool,
memory_pool: MemoryPool::new(memory_pool.memory_usage()), thread_id,
}
}
pub fn process_page(&self, spec: &PageSpec) -> Result<ProcessedPage> {
let start = Instant::now();
let performance_page = PerformancePage {
index: spec.index,
width: spec.width,
height: spec.height,
content_refs: spec.resource_keys.clone(),
estimated_size: self.estimate_page_size(spec),
};
let processing_time = start.elapsed();
Ok(ProcessedPage {
page: performance_page,
processing_time,
thread_id: self.thread_id,
memory_used: self.memory_pool.memory_usage(),
})
}
fn estimate_page_size(&self, spec: &PageSpec) -> usize {
let base_size = 2048; let content_size = spec.content_length;
let resource_overhead = spec.resource_keys.len() * 512;
base_size + content_size + resource_overhead
}
}
#[derive(Debug, Clone)]
pub struct PageSpec {
pub index: u32,
pub width: f64,
pub height: f64,
pub content_length: usize,
pub resource_keys: Vec<super::ResourceKey>,
pub complexity_score: f32, }
impl PageSpec {
pub fn new(index: u32, width: f64, height: f64) -> Self {
Self {
index,
width,
height,
content_length: 0,
resource_keys: Vec::new(),
complexity_score: 0.5, }
}
pub fn with_content_length(mut self, length: usize) -> Self {
self.content_length = length;
self
}
pub fn with_resources(mut self, keys: Vec<super::ResourceKey>) -> Self {
self.resource_keys = keys;
self
}
pub fn with_complexity(mut self, score: f32) -> Self {
self.complexity_score = score.clamp(0.0, 1.0);
self
}
}
#[derive(Debug, Clone)]
pub struct ProcessedPage {
pub page: PerformancePage,
pub processing_time: Duration,
pub thread_id: usize,
pub memory_used: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ParallelStats {
pub total_pages_processed: usize,
pub pages_completed: usize,
pub chunks_processed: usize,
pub parallel_executions: u32,
pub sequential_executions: u32,
pub total_processing_time: Duration,
pub total_page_time: Duration,
pub total_chunk_time: Duration,
pub thread_usage: HashMap<usize, usize>,
pub chunk_sizes: Vec<usize>,
}
impl ParallelStats {
pub fn pages_per_second(&self) -> f64 {
if self.total_processing_time.as_secs_f64() == 0.0 {
return 0.0;
}
self.total_pages_processed as f64 / self.total_processing_time.as_secs_f64()
}
pub fn average_time_per_page(&self) -> Duration {
if self.total_pages_processed == 0 {
return Duration::ZERO;
}
self.total_processing_time / self.total_pages_processed as u32
}
pub fn parallel_efficiency(&self) -> f64 {
let total_executions = self.parallel_executions + self.sequential_executions;
if total_executions == 0 {
return 0.0;
}
self.parallel_executions as f64 / total_executions as f64
}
pub fn thread_balance(&self) -> f64 {
if self.thread_usage.is_empty() {
return 1.0;
}
let values: Vec<usize> = self.thread_usage.values().copied().collect();
if values.is_empty() {
return 1.0;
}
let max_usage = *values.iter().max().unwrap() as f64;
let min_usage = *values.iter().min().unwrap() as f64;
if max_usage == 0.0 {
return 1.0;
}
min_usage / max_usage
}
pub fn summary(&self) -> String {
format!(
"Parallel Processing Stats:\n\
- Pages Processed: {}\n\
- Total Time: {:.2}s\n\
- Pages/Second: {:.1}\n\
- Average Time/Page: {:.2}ms\n\
- Chunks Processed: {}\n\
- Parallel Executions: {}\n\
- Sequential Executions: {}\n\
- Parallel Efficiency: {:.1}%\n\
- Thread Balance: {:.1}%\n\
- Active Threads: {}",
self.total_pages_processed,
self.total_processing_time.as_secs_f64(),
self.pages_per_second(),
self.average_time_per_page().as_secs_f64() * 1000.0,
self.chunks_processed,
self.parallel_executions,
self.sequential_executions,
self.parallel_efficiency() * 100.0,
self.thread_balance() * 100.0,
self.thread_usage.len()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_generation_options_default() {
let options = ParallelGenerationOptions::default();
assert!(options.max_threads <= 8);
assert_eq!(options.chunk_size, 4);
assert!(options.load_balancing);
}
#[test]
fn test_parallel_generation_options_max_throughput() {
let options = ParallelGenerationOptions::max_throughput();
assert_eq!(options.max_threads, num_cpus::get());
assert_eq!(options.chunk_size, 2);
assert!(!options.progress_reporting);
}
#[test]
fn test_parallel_generation_options_memory_efficient() {
let options = ParallelGenerationOptions::memory_efficient();
assert!(options.max_threads <= num_cpus::get());
assert_eq!(options.chunk_size, 8);
assert!(options.progress_reporting);
}
#[test]
fn test_page_spec_creation() {
let spec = PageSpec::new(0, 595.0, 842.0)
.with_content_length(1024)
.with_complexity(0.8);
assert_eq!(spec.index, 0);
assert_eq!(spec.content_length, 1024);
assert_eq!(spec.complexity_score, 0.8);
}
#[test]
fn test_page_spec_complexity_clamping() {
let spec = PageSpec::new(0, 595.0, 842.0).with_complexity(1.5);
assert_eq!(spec.complexity_score, 1.0);
let spec2 = PageSpec::new(0, 595.0, 842.0).with_complexity(-0.5);
assert_eq!(spec2.complexity_score, 0.0);
}
#[test]
fn test_thread_pool_config() {
let config = ThreadPoolConfig::default();
assert_eq!(config.stack_size, 2 * 1024 * 1024);
assert_eq!(config.thread_priority, ThreadPriority::Normal);
let fast_config = ThreadPoolConfig::max_performance();
assert_eq!(fast_config.thread_priority, ThreadPriority::High);
assert!(fast_config.stack_size > config.stack_size);
let mem_config = ThreadPoolConfig::memory_efficient();
assert_eq!(mem_config.thread_priority, ThreadPriority::Low);
assert!(mem_config.stack_size < config.stack_size);
}
#[test]
fn test_parallel_stats() {
let mut stats = ParallelStats::default();
stats.total_pages_processed = 100;
stats.total_processing_time = Duration::from_secs(10);
stats.parallel_executions = 3;
stats.sequential_executions = 1;
assert_eq!(stats.pages_per_second(), 10.0);
assert_eq!(stats.average_time_per_page(), Duration::from_millis(100));
assert_eq!(stats.parallel_efficiency(), 0.75);
}
#[test]
fn test_thread_balance_calculation() {
let mut stats = ParallelStats::default();
stats.thread_usage.insert(0, 10);
stats.thread_usage.insert(1, 10);
stats.thread_usage.insert(2, 10);
assert_eq!(stats.thread_balance(), 1.0);
stats.thread_usage.insert(3, 5);
assert_eq!(stats.thread_balance(), 0.5); }
#[test]
fn test_parallel_generator_creation() {
let options = ParallelGenerationOptions::default();
let generator = ParallelPageGenerator::new(options);
assert!(generator.is_ok());
}
#[test]
fn test_page_processor_creation() {
let resource_pool = Arc::new(ResourcePool::new());
let memory_pool = MemoryPool::new(1024 * 1024);
let processor = PageProcessor::new(resource_pool, &memory_pool, 0);
let spec = PageSpec::new(0, 595.0, 842.0);
let result = processor.process_page(&spec);
assert!(result.is_ok());
let processed = result.unwrap();
assert_eq!(processed.page.index, 0);
assert_eq!(processed.thread_id, 0);
}
#[test]
fn test_optimal_chunk_size() {
let options = ParallelGenerationOptions::default().with_max_threads(4);
let generator = ParallelPageGenerator::new(options).unwrap();
let chunk_size = generator.optimal_chunk_size(100);
assert!(chunk_size >= 1);
assert!(chunk_size <= 100);
let small_chunk = generator.optimal_chunk_size(2);
assert_eq!(small_chunk, 1); }
#[cfg(feature = "rayon")]
#[test]
fn test_parallel_processing() {
let options = ParallelGenerationOptions::default().with_max_threads(2);
let generator = ParallelPageGenerator::new(options).unwrap();
let pages = vec![
PageSpec::new(0, 595.0, 842.0),
PageSpec::new(1, 595.0, 842.0),
PageSpec::new(2, 595.0, 842.0),
];
let result = generator.process_pages_parallel(pages);
assert!(result.is_ok());
let processed = result.unwrap();
assert_eq!(processed.len(), 3);
let stats = generator.stats();
assert_eq!(stats.total_pages_processed, 3);
assert!(stats.parallel_executions > 0 || stats.sequential_executions > 0);
}
#[test]
fn test_sequential_fallback() {
let options = ParallelGenerationOptions::default();
let generator = ParallelPageGenerator::new(options).unwrap();
let pages = vec![
PageSpec::new(0, 595.0, 842.0),
PageSpec::new(1, 595.0, 842.0),
];
let result = generator.process_pages_sequential(pages);
assert!(result.is_ok());
let processed = result.unwrap();
assert_eq!(processed.len(), 2);
}
}