oxidize_pdf/batch/
mod.rs

1//! Batch processing for multiple PDF operations
2//!
3//! This module provides efficient batch processing capabilities for handling
4//! multiple PDF files or operations in parallel with progress tracking.
5//!
6//! # Features
7//!
8//! - **Parallel Processing**: Process multiple PDFs concurrently
9//! - **Progress Tracking**: Real-time progress updates for batch operations
10//! - **Resource Management**: Automatic thread pool and memory management
11//! - **Error Collection**: Aggregate errors without stopping the batch
12//! - **Cancellation**: Support for cancelling long-running operations
13//! - **Result Aggregation**: Collect and summarize batch results
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use oxidize_pdf::batch::{BatchProcessor, BatchOptions, BatchJob};
19//! use oxidize_pdf::operations::split_pdf;
20//! use std::path::PathBuf;
21//!
22//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! let options = BatchOptions::default()
24//!     .with_parallelism(4)
25//!     .with_progress_callback(|progress| {
26//!         println!("Progress: {:.1}%", progress.percentage());
27//!     });
28//!
29//! let mut processor = BatchProcessor::new(options);
30//!
31//! // Add jobs to the batch
32//! let files = vec!["doc1.pdf", "doc2.pdf", "doc3.pdf"];
33//! for file in files {
34//!     processor.add_job(BatchJob::Split {
35//!         input: PathBuf::from(file),
36//!         output_pattern: format!("{}_page_%d.pdf", file),
37//!         pages_per_file: 1,
38//!     });
39//! }
40//!
41//! // Execute batch with progress tracking
42//! let results = processor.execute()?;
43//!
44//! println!("Processed {} files successfully", results.successful);
45//! println!("Failed: {}", results.failed);
46//! # Ok(())
47//! # }
48//! ```
49
50use crate::error::Result;
51use std::path::{Path, PathBuf};
52use std::sync::{
53    atomic::{AtomicBool, Ordering},
54    Arc, Mutex,
55};
56use std::thread;
57use std::time::{Duration, Instant};
58
59pub mod job;
60pub mod progress;
61pub mod result;
62pub mod worker;
63
64// Re-export main types
65pub use job::{BatchJob, JobStatus, JobType};
66pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
67pub use result::{BatchResult, BatchSummary, JobResult};
68pub use worker::{WorkerOptions, WorkerPool};
69
70/// Options for batch processing
71#[derive(Clone)]
72pub struct BatchOptions {
73    /// Number of parallel workers
74    pub parallelism: usize,
75    /// Maximum memory per worker (bytes)
76    pub memory_limit_per_worker: usize,
77    /// Progress update interval
78    pub progress_interval: Duration,
79    /// Whether to stop on first error
80    pub stop_on_error: bool,
81    /// Progress callback
82    pub progress_callback: Option<Arc<dyn ProgressCallback>>,
83    /// Timeout for individual jobs
84    pub job_timeout: Option<Duration>,
85}
86
87impl Default for BatchOptions {
88    fn default() -> Self {
89        Self {
90            parallelism: num_cpus::get().min(8),
91            memory_limit_per_worker: 512 * 1024 * 1024, // 512MB
92            progress_interval: Duration::from_millis(100),
93            stop_on_error: false,
94            progress_callback: None,
95            job_timeout: Some(Duration::from_secs(300)), // 5 minutes
96        }
97    }
98}
99
100impl BatchOptions {
101    /// Set the number of parallel workers
102    pub fn with_parallelism(mut self, parallelism: usize) -> Self {
103        self.parallelism = parallelism.max(1);
104        self
105    }
106
107    /// Set memory limit per worker
108    pub fn with_memory_limit(mut self, bytes: usize) -> Self {
109        self.memory_limit_per_worker = bytes;
110        self
111    }
112
113    /// Set progress callback
114    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
115    where
116        F: Fn(&ProgressInfo) + Send + Sync + 'static,
117    {
118        self.progress_callback = Some(Arc::new(callback));
119        self
120    }
121
122    /// Set whether to stop on first error
123    pub fn stop_on_error(mut self, stop: bool) -> Self {
124        self.stop_on_error = stop;
125        self
126    }
127
128    /// Set job timeout
129    pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
130        self.job_timeout = Some(timeout);
131        self
132    }
133}
134
135/// Batch processor for handling multiple PDF operations
136pub struct BatchProcessor {
137    options: BatchOptions,
138    jobs: Vec<BatchJob>,
139    cancelled: Arc<AtomicBool>,
140    progress: Arc<BatchProgress>,
141}
142
143impl BatchProcessor {
144    /// Create a new batch processor
145    pub fn new(options: BatchOptions) -> Self {
146        Self {
147            options,
148            jobs: Vec::new(),
149            cancelled: Arc::new(AtomicBool::new(false)),
150            progress: Arc::new(BatchProgress::new()),
151        }
152    }
153
154    /// Add a job to the batch
155    pub fn add_job(&mut self, job: BatchJob) {
156        self.jobs.push(job);
157        self.progress.add_job();
158    }
159
160    /// Add multiple jobs
161    pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
162        for job in jobs {
163            self.add_job(job);
164        }
165    }
166
167    /// Cancel the batch processing
168    pub fn cancel(&self) {
169        self.cancelled.store(true, Ordering::SeqCst);
170    }
171
172    /// Check if cancelled
173    pub fn is_cancelled(&self) -> bool {
174        self.cancelled.load(Ordering::SeqCst)
175    }
176
177    /// Execute the batch
178    pub fn execute(self) -> Result<BatchSummary> {
179        let start_time = Instant::now();
180        let total_jobs = self.jobs.len();
181
182        if total_jobs == 0 {
183            return Ok(BatchSummary::empty());
184        }
185
186        // Create worker pool
187        let worker_options = WorkerOptions {
188            num_workers: self.options.parallelism,
189            memory_limit: self.options.memory_limit_per_worker,
190            job_timeout: self.options.job_timeout,
191        };
192
193        let pool = WorkerPool::new(worker_options);
194        let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
195        let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
196
197        // Progress tracking thread
198        let progress_handle = if let Some(callback) = &self.options.progress_callback {
199            let progress = Arc::clone(&self.progress);
200            let callback = Arc::clone(callback);
201            let interval = self.options.progress_interval;
202            let cancelled = Arc::clone(&self.cancelled);
203
204            Some(thread::spawn(move || {
205                while !cancelled.load(Ordering::SeqCst) {
206                    let info = progress.get_info();
207                    callback.on_progress(&info);
208
209                    if info.is_complete() {
210                        break;
211                    }
212
213                    thread::sleep(interval);
214                }
215            }))
216        } else {
217            None
218        };
219
220        // Process jobs
221        let job_results = pool.process_jobs(
222            self.jobs,
223            Arc::clone(&self.progress),
224            Arc::clone(&self.cancelled),
225            self.options.stop_on_error,
226        );
227
228        // Collect results
229        let mut successful = 0;
230        let mut failed = 0;
231        let mut all_results = Vec::new();
232
233        for result in job_results {
234            match &result {
235                JobResult::Success { .. } => successful += 1,
236                JobResult::Failed { .. } => failed += 1,
237                JobResult::Cancelled { .. } => {}
238            }
239            all_results.push(result);
240        }
241
242        // Wait for progress thread
243        if let Some(handle) = progress_handle {
244            let _ = handle.join();
245        }
246
247        // Final progress callback
248        if let Some(callback) = &self.options.progress_callback {
249            let final_info = self.progress.get_info();
250            callback.on_progress(&final_info);
251        }
252
253        Ok(BatchSummary {
254            total_jobs,
255            successful,
256            failed,
257            cancelled: self.cancelled.load(Ordering::SeqCst),
258            duration: start_time.elapsed(),
259            results: all_results,
260        })
261    }
262
263    /// Get current progress
264    pub fn get_progress(&self) -> ProgressInfo {
265        self.progress.get_info()
266    }
267}
268
269/// Process multiple PDF files with a common operation
270pub fn batch_process_files<P, F>(
271    files: Vec<P>,
272    operation: F,
273    options: BatchOptions,
274) -> Result<BatchSummary>
275where
276    P: AsRef<Path>,
277    F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
278{
279    let mut processor = BatchProcessor::new(options);
280
281    for file in files {
282        let path = file.as_ref().to_path_buf();
283        let op = operation.clone();
284
285        processor.add_job(BatchJob::Custom {
286            name: format!("Process {}", path.display()),
287            operation: Box::new(move || op(&path)),
288        });
289    }
290
291    processor.execute()
292}
293
294/// Convenience function for batch splitting PDFs
295pub fn batch_split_pdfs<P: AsRef<Path>>(
296    files: Vec<P>,
297    pages_per_file: usize,
298    options: BatchOptions,
299) -> Result<BatchSummary> {
300    let mut processor = BatchProcessor::new(options);
301
302    for file in files {
303        let path = file.as_ref();
304        processor.add_job(BatchJob::Split {
305            input: path.to_path_buf(),
306            output_pattern: format!(
307                "{}_page_%d.pdf",
308                path.file_stem().unwrap().to_str().unwrap()
309            ),
310            pages_per_file,
311        });
312    }
313
314    processor.execute()
315}
316
317/// Convenience function for batch merging PDFs
318pub fn batch_merge_pdfs(
319    merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
320    options: BatchOptions,
321) -> Result<BatchSummary> {
322    let mut processor = BatchProcessor::new(options);
323
324    for (inputs, output) in merge_groups {
325        processor.add_job(BatchJob::Merge { inputs, output });
326    }
327
328    processor.execute()
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_batch_options_default() {
337        let options = BatchOptions::default();
338        assert!(options.parallelism > 0);
339        assert!(options.parallelism <= 8);
340        assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
341        assert!(!options.stop_on_error);
342    }
343
344    #[test]
345    fn test_batch_options_builder() {
346        let called = Arc::new(AtomicBool::new(false));
347        let called_clone = Arc::clone(&called);
348
349        let options = BatchOptions::default()
350            .with_parallelism(4)
351            .with_memory_limit(1024 * 1024 * 1024)
352            .stop_on_error(true)
353            .with_job_timeout(Duration::from_secs(60))
354            .with_progress_callback(move |_info| {
355                called_clone.store(true, Ordering::SeqCst);
356            });
357
358        assert_eq!(options.parallelism, 4);
359        assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
360        assert!(options.stop_on_error);
361        assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
362        assert!(options.progress_callback.is_some());
363    }
364
365    #[test]
366    fn test_batch_processor_creation() {
367        let processor = BatchProcessor::new(BatchOptions::default());
368        assert_eq!(processor.jobs.len(), 0);
369        assert!(!processor.is_cancelled());
370    }
371
372    #[test]
373    fn test_batch_processor_add_jobs() {
374        let mut processor = BatchProcessor::new(BatchOptions::default());
375
376        processor.add_job(BatchJob::Custom {
377            name: "Test Job 1".to_string(),
378            operation: Box::new(|| Ok(())),
379        });
380
381        processor.add_jobs(vec![
382            BatchJob::Custom {
383                name: "Test Job 2".to_string(),
384                operation: Box::new(|| Ok(())),
385            },
386            BatchJob::Custom {
387                name: "Test Job 3".to_string(),
388                operation: Box::new(|| Ok(())),
389            },
390        ]);
391
392        assert_eq!(processor.jobs.len(), 3);
393    }
394
395    #[test]
396    fn test_batch_processor_cancel() {
397        let processor = BatchProcessor::new(BatchOptions::default());
398        assert!(!processor.is_cancelled());
399
400        processor.cancel();
401        assert!(processor.is_cancelled());
402    }
403
404    #[test]
405    fn test_empty_batch_execution() {
406        let processor = BatchProcessor::new(BatchOptions::default());
407        let summary = processor.execute().unwrap();
408
409        assert_eq!(summary.total_jobs, 0);
410        assert_eq!(summary.successful, 0);
411        assert_eq!(summary.failed, 0);
412        assert!(!summary.cancelled);
413    }
414}