use crate::error::Result;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::{Duration, Instant};
pub mod job;
pub mod progress;
pub mod result;
pub mod worker;
pub use job::{BatchJob, JobStatus, JobType};
pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
pub use result::{BatchResult, BatchSummary, JobResult};
pub use worker::{WorkerOptions, WorkerPool};
#[derive(Clone)]
pub struct BatchOptions {
pub parallelism: usize,
pub memory_limit_per_worker: usize,
pub progress_interval: Duration,
pub stop_on_error: bool,
pub progress_callback: Option<Arc<dyn ProgressCallback>>,
pub job_timeout: Option<Duration>,
}
impl Default for BatchOptions {
fn default() -> Self {
Self {
parallelism: num_cpus::get().min(8),
memory_limit_per_worker: 512 * 1024 * 1024, progress_interval: Duration::from_millis(100),
stop_on_error: false,
progress_callback: None,
job_timeout: Some(Duration::from_secs(300)), }
}
}
impl BatchOptions {
pub fn with_parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism.max(1);
self
}
pub fn with_memory_limit(mut self, bytes: usize) -> Self {
self.memory_limit_per_worker = bytes;
self
}
pub fn with_progress_callback<F>(mut self, callback: F) -> Self
where
F: Fn(&ProgressInfo) + Send + Sync + 'static,
{
self.progress_callback = Some(Arc::new(callback));
self
}
pub fn stop_on_error(mut self, stop: bool) -> Self {
self.stop_on_error = stop;
self
}
pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
self.job_timeout = Some(timeout);
self
}
}
pub struct BatchProcessor {
options: BatchOptions,
jobs: Vec<BatchJob>,
cancelled: Arc<AtomicBool>,
progress: Arc<BatchProgress>,
}
impl BatchProcessor {
pub fn new(options: BatchOptions) -> Self {
Self {
options,
jobs: Vec::new(),
cancelled: Arc::new(AtomicBool::new(false)),
progress: Arc::new(BatchProgress::new()),
}
}
pub fn add_job(&mut self, job: BatchJob) {
self.jobs.push(job);
self.progress.add_job();
}
pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
for job in jobs {
self.add_job(job);
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::SeqCst);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
pub fn execute(self) -> Result<BatchSummary> {
let start_time = Instant::now();
let total_jobs = self.jobs.len();
if total_jobs == 0 {
return Ok(BatchSummary::empty());
}
let worker_options = WorkerOptions {
num_workers: self.options.parallelism,
memory_limit: self.options.memory_limit_per_worker,
job_timeout: self.options.job_timeout,
};
let pool = WorkerPool::new(worker_options);
let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
let progress_handle = if let Some(callback) = &self.options.progress_callback {
let progress = Arc::clone(&self.progress);
let callback = Arc::clone(callback);
let interval = self.options.progress_interval;
let cancelled = Arc::clone(&self.cancelled);
Some(thread::spawn(move || {
while !cancelled.load(Ordering::SeqCst) {
let info = progress.get_info();
callback.on_progress(&info);
if info.is_complete() {
break;
}
thread::sleep(interval);
}
}))
} else {
None
};
let job_results = pool.process_jobs(
self.jobs,
Arc::clone(&self.progress),
Arc::clone(&self.cancelled),
self.options.stop_on_error,
);
let mut successful = 0;
let mut failed = 0;
let mut all_results = Vec::new();
for result in job_results {
match &result {
JobResult::Success { .. } => successful += 1,
JobResult::Failed { .. } => failed += 1,
JobResult::Cancelled { .. } => {}
}
all_results.push(result);
}
if let Some(handle) = progress_handle {
let _ = handle.join();
}
if let Some(callback) = &self.options.progress_callback {
let final_info = self.progress.get_info();
callback.on_progress(&final_info);
}
Ok(BatchSummary {
total_jobs,
successful,
failed,
cancelled: self.cancelled.load(Ordering::SeqCst),
duration: start_time.elapsed(),
results: all_results,
})
}
pub fn get_progress(&self) -> ProgressInfo {
self.progress.get_info()
}
}
pub fn batch_process_files<P, F>(
files: Vec<P>,
operation: F,
options: BatchOptions,
) -> Result<BatchSummary>
where
P: AsRef<Path>,
F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
{
let mut processor = BatchProcessor::new(options);
for file in files {
let path = file.as_ref().to_path_buf();
let op = operation.clone();
processor.add_job(BatchJob::Custom {
name: format!("Process {}", path.display()),
operation: Box::new(move || op(&path)),
});
}
processor.execute()
}
pub fn batch_split_pdfs<P: AsRef<Path>>(
files: Vec<P>,
pages_per_file: usize,
options: BatchOptions,
) -> Result<BatchSummary> {
let mut processor = BatchProcessor::new(options);
for file in files {
let path = file.as_ref();
processor.add_job(BatchJob::Split {
input: path.to_path_buf(),
output_pattern: format!(
"{}_page_%d.pdf",
path.file_stem()
.and_then(|stem| stem.to_str())
.unwrap_or("output")
),
pages_per_file,
});
}
processor.execute()
}
pub fn batch_merge_pdfs(
merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
options: BatchOptions,
) -> Result<BatchSummary> {
let mut processor = BatchProcessor::new(options);
for (inputs, output) in merge_groups {
processor.add_job(BatchJob::Merge { inputs, output });
}
processor.execute()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_options_default() {
let options = BatchOptions::default();
assert!(options.parallelism > 0);
assert!(options.parallelism <= 8);
assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
assert!(!options.stop_on_error);
}
#[test]
fn test_batch_options_builder() {
let called = Arc::new(AtomicBool::new(false));
let called_clone = Arc::clone(&called);
let options = BatchOptions::default()
.with_parallelism(4)
.with_memory_limit(1024 * 1024 * 1024)
.stop_on_error(true)
.with_job_timeout(Duration::from_secs(60))
.with_progress_callback(move |_info| {
called_clone.store(true, Ordering::SeqCst);
});
assert_eq!(options.parallelism, 4);
assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
assert!(options.stop_on_error);
assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
assert!(options.progress_callback.is_some());
}
#[test]
fn test_batch_processor_creation() {
let processor = BatchProcessor::new(BatchOptions::default());
assert_eq!(processor.jobs.len(), 0);
assert!(!processor.is_cancelled());
}
#[test]
fn test_batch_processor_add_jobs() {
let mut processor = BatchProcessor::new(BatchOptions::default());
processor.add_job(BatchJob::Custom {
name: "Test Job 1".to_string(),
operation: Box::new(|| Ok(())),
});
processor.add_jobs(vec![
BatchJob::Custom {
name: "Test Job 2".to_string(),
operation: Box::new(|| Ok(())),
},
BatchJob::Custom {
name: "Test Job 3".to_string(),
operation: Box::new(|| Ok(())),
},
]);
assert_eq!(processor.jobs.len(), 3);
}
#[test]
fn test_batch_processor_cancel() {
let processor = BatchProcessor::new(BatchOptions::default());
assert!(!processor.is_cancelled());
processor.cancel();
assert!(processor.is_cancelled());
}
#[test]
fn test_empty_batch_execution() {
let processor = BatchProcessor::new(BatchOptions::default());
let summary = processor.execute().unwrap();
assert_eq!(summary.total_jobs, 0);
assert_eq!(summary.successful, 0);
assert_eq!(summary.failed, 0);
assert!(!summary.cancelled);
}
#[test]
fn test_batch_options_builder_advanced() {
let options = BatchOptions::default()
.with_parallelism(4)
.with_memory_limit(1024 * 1024)
.stop_on_error(true)
.with_job_timeout(Duration::from_secs(60));
assert_eq!(options.parallelism, 4);
assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
assert!(options.stop_on_error);
assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
}
#[test]
fn test_batch_processor_with_multiple_jobs() {
let mut processor = BatchProcessor::new(BatchOptions::default());
for i in 0..5 {
processor.add_job(BatchJob::Custom {
name: format!("job_{}", i),
operation: Box::new(move || {
thread::sleep(Duration::from_millis(10));
Ok(())
}),
});
}
let summary = processor.execute().unwrap();
assert_eq!(summary.total_jobs, 5);
assert_eq!(summary.successful, 5);
assert_eq!(summary.failed, 0);
}
#[test]
fn test_batch_processor_with_failing_jobs() {
let mut processor = BatchProcessor::new(BatchOptions::default());
processor.add_job(BatchJob::Custom {
name: "success".to_string(),
operation: Box::new(|| Ok(())),
});
processor.add_job(BatchJob::Custom {
name: "failure".to_string(),
operation: Box::new(|| {
Err(crate::error::PdfError::InvalidStructure(
"Test error".to_string(),
))
}),
});
let summary = processor.execute().unwrap();
assert_eq!(summary.total_jobs, 2);
assert_eq!(summary.successful, 1);
assert_eq!(summary.failed, 1);
}
#[test]
fn test_batch_processor_stop_on_error() {
let options = BatchOptions {
stop_on_error: true,
parallelism: 1,
..Default::default()
};
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "job1".to_string(),
operation: Box::new(|| Ok(())),
});
processor.add_job(BatchJob::Custom {
name: "job2".to_string(),
operation: Box::new(|| {
Err(crate::error::PdfError::Io(std::io::Error::other(
"Test error",
)))
}),
});
processor.add_job(BatchJob::Custom {
name: "job3".to_string(),
operation: Box::new(|| Ok(())),
});
let result = processor.execute();
assert!(result.is_err() || result.unwrap().failed > 0);
}
#[test]
fn test_batch_processor_parallelism() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let options = BatchOptions {
parallelism: 4,
..Default::default()
};
let mut processor = BatchProcessor::new(options);
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
for i in 0..10 {
let concurrent = concurrent_count.clone();
let max = max_concurrent.clone();
processor.add_job(BatchJob::Custom {
name: format!("job_{}", i),
operation: Box::new(move || {
let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
let mut max_val = max.load(Ordering::SeqCst);
while current > max_val {
match max.compare_exchange_weak(
max_val,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(x) => max_val = x,
}
}
thread::sleep(Duration::from_millis(50));
concurrent.fetch_sub(1, Ordering::SeqCst);
Ok(())
}),
});
}
let summary = processor.execute().unwrap();
assert_eq!(summary.successful, 10);
assert!(max_concurrent.load(Ordering::SeqCst) > 1);
assert!(max_concurrent.load(Ordering::SeqCst) <= 4);
}
#[test]
fn test_batch_processor_timeout() {
let options = BatchOptions {
job_timeout: Some(Duration::from_millis(50)),
parallelism: 1,
..Default::default()
};
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "timeout_job".to_string(),
operation: Box::new(|| {
thread::sleep(Duration::from_millis(200));
Ok(())
}),
});
let summary = processor.execute().unwrap();
assert_eq!(summary.failed, 0);
}
#[test]
fn test_batch_processor_memory_limit() {
let options = BatchOptions {
memory_limit_per_worker: 1024 * 1024, ..Default::default()
};
let processor = BatchProcessor::new(options);
assert_eq!(processor.options.memory_limit_per_worker, 1024 * 1024);
}
#[test]
fn test_batch_progress_tracking() {
use std::sync::{Arc, Mutex};
let progress_updates = Arc::new(Mutex::new(Vec::new()));
let progress_clone = progress_updates.clone();
let options = BatchOptions {
progress_callback: Some(Arc::new(move |info: &ProgressInfo| {
progress_clone.lock().unwrap().push(info.percentage());
})),
..Default::default()
};
let mut processor = BatchProcessor::new(options);
for i in 0..5 {
processor.add_job(BatchJob::Custom {
name: format!("job_{}", i),
operation: Box::new(move || {
thread::sleep(Duration::from_millis(10));
Ok(())
}),
});
}
processor.execute().unwrap();
let updates = progress_updates.lock().unwrap();
assert!(!updates.is_empty());
assert_eq!(*updates.last().unwrap(), 100.0);
}
#[test]
fn test_batch_processor_cancel_during_execution() {
let processor = BatchProcessor::new(BatchOptions::default());
assert!(!processor.is_cancelled());
processor.cancel();
assert!(processor.is_cancelled());
processor.cancel();
assert!(processor.is_cancelled());
}
#[test]
fn test_batch_processor_without_progress_callback() {
let options = BatchOptions::default(); let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "test_job".to_string(),
operation: Box::new(|| Ok(())),
});
let result = processor.execute();
assert!(result.is_ok());
let summary = result.unwrap();
assert_eq!(summary.successful, 1);
}
#[test]
fn test_batch_processor_early_completion_in_progress() {
use std::sync::{Arc, Mutex};
let progress_called = Arc::new(Mutex::new(false));
let progress_called_clone = Arc::clone(&progress_called);
let options = BatchOptions::default().with_progress_callback(move |info| {
*progress_called_clone.lock().unwrap() = true;
if info.is_complete() {
}
});
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "fast".to_string(),
operation: Box::new(|| Ok(())),
});
let result = processor.execute();
assert!(result.is_ok());
assert!(*progress_called.lock().unwrap());
}
#[test]
fn test_batch_options_all_builders() {
use std::time::Duration;
let callback_called = Arc::new(AtomicBool::new(false));
let callback_clone = Arc::clone(&callback_called);
let options = BatchOptions::default()
.with_parallelism(4)
.with_memory_limit(1024 * 1024)
.with_progress_callback(move |_| {
callback_clone.store(true, Ordering::SeqCst);
})
.stop_on_error(true)
.with_job_timeout(Duration::from_secs(10));
assert_eq!(options.parallelism, 4);
assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
assert!(options.stop_on_error);
assert_eq!(options.job_timeout, Some(Duration::from_secs(10)));
assert!(options.progress_callback.is_some());
}
#[test]
fn test_batch_processor_get_progress() {
let processor = BatchProcessor::new(BatchOptions::default());
let progress = processor.get_progress();
assert_eq!(progress.total_jobs, 0);
assert_eq!(progress.completed_jobs, 0);
assert_eq!(progress.failed_jobs, 0);
assert_eq!(progress.percentage(), 100.0); }
#[test]
fn test_batch_processor_with_real_timeout() {
let mut options = BatchOptions::default();
options.job_timeout = Some(Duration::from_millis(10));
options.parallelism = 1;
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "should_timeout".to_string(),
operation: Box::new(|| {
thread::sleep(Duration::from_millis(100));
Ok(())
}),
});
let summary = processor.execute().unwrap();
assert_eq!(summary.total_jobs, 1);
}
#[test]
fn test_batch_processor_memory_limit_enforcement() {
let mut options = BatchOptions::default();
options.memory_limit_per_worker = 1024; options.parallelism = 2;
let mut processor = BatchProcessor::new(options);
for i in 0..5 {
processor.add_job(BatchJob::Custom {
name: format!("memory_job_{}", i),
operation: Box::new(move || {
let _data = vec![0u8; 512];
Ok(())
}),
});
}
let summary = processor.execute().unwrap();
assert_eq!(summary.total_jobs, 5);
}
#[test]
fn test_batch_processor_stop_on_error_propagation() {
let mut options = BatchOptions::default();
options.stop_on_error = true;
options.parallelism = 1;
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "success_1".to_string(),
operation: Box::new(|| Ok(())),
});
processor.add_job(BatchJob::Custom {
name: "failure".to_string(),
operation: Box::new(|| {
Err(crate::error::PdfError::InvalidOperation(
"Intentional failure".to_string(),
))
}),
});
processor.add_job(BatchJob::Custom {
name: "should_not_run".to_string(),
operation: Box::new(|| Ok(())),
});
let result = processor.execute();
assert!(result.is_err() || result.unwrap().failed > 0);
}
#[test]
fn test_batch_processor_concurrent_limit() {
use std::sync::atomic::AtomicUsize;
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let mut options = BatchOptions::default();
options.parallelism = 2;
let mut processor = BatchProcessor::new(options);
for i in 0..10 {
let concurrent = Arc::clone(&concurrent_count);
let max = Arc::clone(&max_concurrent);
processor.add_job(BatchJob::Custom {
name: format!("concurrent_{}", i),
operation: Box::new(move || {
let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
let mut max_val = max.load(Ordering::SeqCst);
while current > max_val {
match max.compare_exchange_weak(
max_val,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(x) => max_val = x,
}
}
thread::sleep(Duration::from_millis(10));
concurrent.fetch_sub(1, Ordering::SeqCst);
Ok(())
}),
});
}
let summary = processor.execute().unwrap();
assert_eq!(summary.successful, 10);
let max_seen = max_concurrent.load(Ordering::SeqCst);
assert!(
max_seen <= 2,
"Max concurrent was {}, expected <= 2",
max_seen
);
}
#[test]
fn test_batch_processor_progress_with_failures() {
use std::sync::{Arc, Mutex};
let progress_updates = Arc::new(Mutex::new(Vec::new()));
let progress_clone = Arc::clone(&progress_updates);
let mut options = BatchOptions::default();
options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
let mut updates = progress_clone.lock().unwrap();
updates.push((info.completed_jobs, info.failed_jobs, info.total_jobs));
}));
let mut processor = BatchProcessor::new(options);
processor.add_job(BatchJob::Custom {
name: "success_1".to_string(),
operation: Box::new(|| Ok(())),
});
processor.add_job(BatchJob::Custom {
name: "fail_1".to_string(),
operation: Box::new(|| Err(crate::error::PdfError::InvalidFormat("test".to_string()))),
});
processor.add_job(BatchJob::Custom {
name: "success_2".to_string(),
operation: Box::new(|| Ok(())),
});
let summary = processor.execute().unwrap();
assert_eq!(summary.successful, 2);
assert_eq!(summary.failed, 1);
let updates = progress_updates.lock().unwrap();
assert!(!updates.is_empty());
if let Some(&(completed, failed, total)) = updates.last() {
assert_eq!(total, 3);
assert_eq!(completed + failed, 3);
}
}
}