oxidize_pdf/batch/
mod.rs

1//! Batch processing for multiple PDF operations
2//!
3//! This module provides efficient batch processing capabilities for handling
4//! multiple PDF files or operations in parallel with progress tracking.
5//!
6//! # Features
7//!
8//! - **Parallel Processing**: Process multiple PDFs concurrently
9//! - **Progress Tracking**: Real-time progress updates for batch operations
10//! - **Resource Management**: Automatic thread pool and memory management
11//! - **Error Collection**: Aggregate errors without stopping the batch
12//! - **Cancellation**: Support for cancelling long-running operations
13//! - **Result Aggregation**: Collect and summarize batch results
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use oxidize_pdf::batch::{BatchProcessor, BatchOptions, BatchJob};
19//! use oxidize_pdf::operations::split_pdf;
20//! use std::path::PathBuf;
21//!
22//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! let options = BatchOptions::default()
24//!     .with_parallelism(4)
25//!     .with_progress_callback(|progress| {
26//!         println!("Progress: {:.1}%", progress.percentage());
27//!     });
28//!
29//! let mut processor = BatchProcessor::new(options);
30//!
31//! // Add jobs to the batch
32//! let files = vec!["doc1.pdf", "doc2.pdf", "doc3.pdf"];
33//! for file in files {
34//!     processor.add_job(BatchJob::Split {
35//!         input: PathBuf::from(file),
36//!         output_pattern: format!("{}_page_%d.pdf", file),
37//!         pages_per_file: 1,
38//!     });
39//! }
40//!
41//! // Execute batch with progress tracking
42//! let results = processor.execute()?;
43//!
44//! println!("Processed {} files successfully", results.successful);
45//! println!("Failed: {}", results.failed);
46//! # Ok(())
47//! # }
48//! ```
49
50use crate::error::Result;
51use std::path::{Path, PathBuf};
52use std::sync::{
53    atomic::{AtomicBool, Ordering},
54    Arc, Mutex,
55};
56use std::thread;
57use std::time::{Duration, Instant};
58
59pub mod job;
60pub mod progress;
61pub mod result;
62pub mod worker;
63
64// Re-export main types
65pub use job::{BatchJob, JobStatus, JobType};
66pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
67pub use result::{BatchResult, BatchSummary, JobResult};
68pub use worker::{WorkerOptions, WorkerPool};
69
70/// Options for batch processing
71#[derive(Clone)]
72pub struct BatchOptions {
73    /// Number of parallel workers
74    pub parallelism: usize,
75    /// Maximum memory per worker (bytes)
76    pub memory_limit_per_worker: usize,
77    /// Progress update interval
78    pub progress_interval: Duration,
79    /// Whether to stop on first error
80    pub stop_on_error: bool,
81    /// Progress callback
82    pub progress_callback: Option<Arc<dyn ProgressCallback>>,
83    /// Timeout for individual jobs
84    pub job_timeout: Option<Duration>,
85}
86
87impl Default for BatchOptions {
88    fn default() -> Self {
89        Self {
90            parallelism: num_cpus::get().min(8),
91            memory_limit_per_worker: 512 * 1024 * 1024, // 512MB
92            progress_interval: Duration::from_millis(100),
93            stop_on_error: false,
94            progress_callback: None,
95            job_timeout: Some(Duration::from_secs(300)), // 5 minutes
96        }
97    }
98}
99
100impl BatchOptions {
101    /// Set the number of parallel workers
102    pub fn with_parallelism(mut self, parallelism: usize) -> Self {
103        self.parallelism = parallelism.max(1);
104        self
105    }
106
107    /// Set memory limit per worker
108    pub fn with_memory_limit(mut self, bytes: usize) -> Self {
109        self.memory_limit_per_worker = bytes;
110        self
111    }
112
113    /// Set progress callback
114    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
115    where
116        F: Fn(&ProgressInfo) + Send + Sync + 'static,
117    {
118        self.progress_callback = Some(Arc::new(callback));
119        self
120    }
121
122    /// Set whether to stop on first error
123    pub fn stop_on_error(mut self, stop: bool) -> Self {
124        self.stop_on_error = stop;
125        self
126    }
127
128    /// Set job timeout
129    pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
130        self.job_timeout = Some(timeout);
131        self
132    }
133}
134
135/// Batch processor for handling multiple PDF operations
136pub struct BatchProcessor {
137    options: BatchOptions,
138    jobs: Vec<BatchJob>,
139    cancelled: Arc<AtomicBool>,
140    progress: Arc<BatchProgress>,
141}
142
143impl BatchProcessor {
144    /// Create a new batch processor
145    pub fn new(options: BatchOptions) -> Self {
146        Self {
147            options,
148            jobs: Vec::new(),
149            cancelled: Arc::new(AtomicBool::new(false)),
150            progress: Arc::new(BatchProgress::new()),
151        }
152    }
153
154    /// Add a job to the batch
155    pub fn add_job(&mut self, job: BatchJob) {
156        self.jobs.push(job);
157        self.progress.add_job();
158    }
159
160    /// Add multiple jobs
161    pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
162        for job in jobs {
163            self.add_job(job);
164        }
165    }
166
167    /// Cancel the batch processing
168    pub fn cancel(&self) {
169        self.cancelled.store(true, Ordering::SeqCst);
170    }
171
172    /// Check if cancelled
173    pub fn is_cancelled(&self) -> bool {
174        self.cancelled.load(Ordering::SeqCst)
175    }
176
177    /// Execute the batch
178    pub fn execute(self) -> Result<BatchSummary> {
179        let start_time = Instant::now();
180        let total_jobs = self.jobs.len();
181
182        if total_jobs == 0 {
183            return Ok(BatchSummary::empty());
184        }
185
186        // Create worker pool
187        let worker_options = WorkerOptions {
188            num_workers: self.options.parallelism,
189            memory_limit: self.options.memory_limit_per_worker,
190            job_timeout: self.options.job_timeout,
191        };
192
193        let pool = WorkerPool::new(worker_options);
194        let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
195        let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
196
197        // Progress tracking thread
198        let progress_handle = if let Some(callback) = &self.options.progress_callback {
199            let progress = Arc::clone(&self.progress);
200            let callback = Arc::clone(callback);
201            let interval = self.options.progress_interval;
202            let cancelled = Arc::clone(&self.cancelled);
203
204            Some(thread::spawn(move || {
205                while !cancelled.load(Ordering::SeqCst) {
206                    let info = progress.get_info();
207                    callback.on_progress(&info);
208
209                    if info.is_complete() {
210                        break;
211                    }
212
213                    thread::sleep(interval);
214                }
215            }))
216        } else {
217            None
218        };
219
220        // Process jobs
221        let job_results = pool.process_jobs(
222            self.jobs,
223            Arc::clone(&self.progress),
224            Arc::clone(&self.cancelled),
225            self.options.stop_on_error,
226        );
227
228        // Collect results
229        let mut successful = 0;
230        let mut failed = 0;
231        let mut all_results = Vec::new();
232
233        for result in job_results {
234            match &result {
235                JobResult::Success { .. } => successful += 1,
236                JobResult::Failed { .. } => failed += 1,
237                JobResult::Cancelled { .. } => {}
238            }
239            all_results.push(result);
240        }
241
242        // Wait for progress thread
243        if let Some(handle) = progress_handle {
244            let _ = handle.join();
245        }
246
247        // Final progress callback
248        if let Some(callback) = &self.options.progress_callback {
249            let final_info = self.progress.get_info();
250            callback.on_progress(&final_info);
251        }
252
253        Ok(BatchSummary {
254            total_jobs,
255            successful,
256            failed,
257            cancelled: self.cancelled.load(Ordering::SeqCst),
258            duration: start_time.elapsed(),
259            results: all_results,
260        })
261    }
262
263    /// Get current progress
264    pub fn get_progress(&self) -> ProgressInfo {
265        self.progress.get_info()
266    }
267}
268
269/// Process multiple PDF files with a common operation
270pub fn batch_process_files<P, F>(
271    files: Vec<P>,
272    operation: F,
273    options: BatchOptions,
274) -> Result<BatchSummary>
275where
276    P: AsRef<Path>,
277    F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
278{
279    let mut processor = BatchProcessor::new(options);
280
281    for file in files {
282        let path = file.as_ref().to_path_buf();
283        let op = operation.clone();
284
285        processor.add_job(BatchJob::Custom {
286            name: format!("Process {}", path.display()),
287            operation: Box::new(move || op(&path)),
288        });
289    }
290
291    processor.execute()
292}
293
294/// Convenience function for batch splitting PDFs
295pub fn batch_split_pdfs<P: AsRef<Path>>(
296    files: Vec<P>,
297    pages_per_file: usize,
298    options: BatchOptions,
299) -> Result<BatchSummary> {
300    let mut processor = BatchProcessor::new(options);
301
302    for file in files {
303        let path = file.as_ref();
304        processor.add_job(BatchJob::Split {
305            input: path.to_path_buf(),
306            output_pattern: format!(
307                "{}_page_%d.pdf",
308                path.file_stem()
309                    .and_then(|stem| stem.to_str())
310                    .unwrap_or("output")
311            ),
312            pages_per_file,
313        });
314    }
315
316    processor.execute()
317}
318
319/// Convenience function for batch merging PDFs
320pub fn batch_merge_pdfs(
321    merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
322    options: BatchOptions,
323) -> Result<BatchSummary> {
324    let mut processor = BatchProcessor::new(options);
325
326    for (inputs, output) in merge_groups {
327        processor.add_job(BatchJob::Merge { inputs, output });
328    }
329
330    processor.execute()
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_batch_options_default() {
339        let options = BatchOptions::default();
340        assert!(options.parallelism > 0);
341        assert!(options.parallelism <= 8);
342        assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
343        assert!(!options.stop_on_error);
344    }
345
346    #[test]
347    fn test_batch_options_builder() {
348        let called = Arc::new(AtomicBool::new(false));
349        let called_clone = Arc::clone(&called);
350
351        let options = BatchOptions::default()
352            .with_parallelism(4)
353            .with_memory_limit(1024 * 1024 * 1024)
354            .stop_on_error(true)
355            .with_job_timeout(Duration::from_secs(60))
356            .with_progress_callback(move |_info| {
357                called_clone.store(true, Ordering::SeqCst);
358            });
359
360        assert_eq!(options.parallelism, 4);
361        assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
362        assert!(options.stop_on_error);
363        assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
364        assert!(options.progress_callback.is_some());
365    }
366
367    #[test]
368    fn test_batch_processor_creation() {
369        let processor = BatchProcessor::new(BatchOptions::default());
370        assert_eq!(processor.jobs.len(), 0);
371        assert!(!processor.is_cancelled());
372    }
373
374    #[test]
375    fn test_batch_processor_add_jobs() {
376        let mut processor = BatchProcessor::new(BatchOptions::default());
377
378        processor.add_job(BatchJob::Custom {
379            name: "Test Job 1".to_string(),
380            operation: Box::new(|| Ok(())),
381        });
382
383        processor.add_jobs(vec![
384            BatchJob::Custom {
385                name: "Test Job 2".to_string(),
386                operation: Box::new(|| Ok(())),
387            },
388            BatchJob::Custom {
389                name: "Test Job 3".to_string(),
390                operation: Box::new(|| Ok(())),
391            },
392        ]);
393
394        assert_eq!(processor.jobs.len(), 3);
395    }
396
397    #[test]
398    fn test_batch_processor_cancel() {
399        let processor = BatchProcessor::new(BatchOptions::default());
400        assert!(!processor.is_cancelled());
401
402        processor.cancel();
403        assert!(processor.is_cancelled());
404    }
405
406    #[test]
407    fn test_empty_batch_execution() {
408        let processor = BatchProcessor::new(BatchOptions::default());
409        let summary = processor.execute().unwrap();
410
411        assert_eq!(summary.total_jobs, 0);
412        assert_eq!(summary.successful, 0);
413        assert_eq!(summary.failed, 0);
414        assert!(!summary.cancelled);
415    }
416
417    #[test]
418    fn test_batch_options_builder_advanced() {
419        let options = BatchOptions::default()
420            .with_parallelism(4)
421            .with_memory_limit(1024 * 1024)
422            .stop_on_error(true)
423            .with_job_timeout(Duration::from_secs(60));
424
425        assert_eq!(options.parallelism, 4);
426        assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
427        assert!(options.stop_on_error);
428        assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
429    }
430
431    #[test]
432    fn test_batch_processor_with_multiple_jobs() {
433        let mut processor = BatchProcessor::new(BatchOptions::default());
434
435        // Add multiple test jobs
436        for i in 0..5 {
437            processor.add_job(BatchJob::Custom {
438                name: format!("job_{}", i),
439                operation: Box::new(move || {
440                    // Simulate some work
441                    thread::sleep(Duration::from_millis(10));
442                    Ok(())
443                }),
444            });
445        }
446
447        let summary = processor.execute().unwrap();
448        assert_eq!(summary.total_jobs, 5);
449        assert_eq!(summary.successful, 5);
450        assert_eq!(summary.failed, 0);
451    }
452
453    #[test]
454    fn test_batch_processor_with_failing_jobs() {
455        let mut processor = BatchProcessor::new(BatchOptions::default());
456
457        // Add a mix of successful and failing jobs
458        processor.add_job(BatchJob::Custom {
459            name: "success".to_string(),
460            operation: Box::new(|| Ok(())),
461        });
462
463        processor.add_job(BatchJob::Custom {
464            name: "failure".to_string(),
465            operation: Box::new(|| {
466                Err(crate::error::PdfError::InvalidStructure(
467                    "Test error".to_string(),
468                ))
469            }),
470        });
471
472        let summary = processor.execute().unwrap();
473        assert_eq!(summary.total_jobs, 2);
474        assert_eq!(summary.successful, 1);
475        assert_eq!(summary.failed, 1);
476    }
477
478    #[test]
479    fn test_batch_processor_stop_on_error() {
480        let mut options = BatchOptions::default();
481        options.stop_on_error = true;
482        options.parallelism = 1; // Sequential to ensure order
483
484        let mut processor = BatchProcessor::new(options);
485
486        // Add jobs where the second one fails
487        processor.add_job(BatchJob::Custom {
488            name: "job1".to_string(),
489            operation: Box::new(|| Ok(())),
490        });
491
492        processor.add_job(BatchJob::Custom {
493            name: "job2".to_string(),
494            operation: Box::new(|| {
495                Err(crate::error::PdfError::Io(std::io::Error::new(
496                    std::io::ErrorKind::Other,
497                    "Test error",
498                )))
499            }),
500        });
501
502        processor.add_job(BatchJob::Custom {
503            name: "job3".to_string(),
504            operation: Box::new(|| Ok(())),
505        });
506
507        let result = processor.execute();
508        assert!(result.is_err() || result.unwrap().failed > 0);
509    }
510
511    #[test]
512    fn test_batch_processor_parallelism() {
513        use std::sync::atomic::{AtomicUsize, Ordering};
514        use std::sync::Arc;
515
516        let mut options = BatchOptions::default();
517        options.parallelism = 4;
518
519        let mut processor = BatchProcessor::new(options);
520        let concurrent_count = Arc::new(AtomicUsize::new(0));
521        let max_concurrent = Arc::new(AtomicUsize::new(0));
522
523        // Add jobs that track concurrency
524        for i in 0..10 {
525            let concurrent = concurrent_count.clone();
526            let max = max_concurrent.clone();
527
528            processor.add_job(BatchJob::Custom {
529                name: format!("job_{}", i),
530                operation: Box::new(move || {
531                    let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
532                    let mut max_val = max.load(Ordering::SeqCst);
533                    while current > max_val {
534                        match max.compare_exchange_weak(
535                            max_val,
536                            current,
537                            Ordering::SeqCst,
538                            Ordering::SeqCst,
539                        ) {
540                            Ok(_) => break,
541                            Err(x) => max_val = x,
542                        }
543                    }
544
545                    thread::sleep(Duration::from_millis(50));
546                    concurrent.fetch_sub(1, Ordering::SeqCst);
547
548                    Ok(())
549                }),
550            });
551        }
552
553        let summary = processor.execute().unwrap();
554        assert_eq!(summary.successful, 10);
555
556        // Verify parallelism was used (max concurrent should be > 1)
557        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
558        assert!(max_concurrent.load(Ordering::SeqCst) <= 4);
559    }
560
561    #[test]
562    fn test_batch_processor_timeout() {
563        let mut options = BatchOptions::default();
564        options.job_timeout = Some(Duration::from_millis(50));
565        options.parallelism = 1;
566
567        let mut processor = BatchProcessor::new(options);
568
569        // Add a job that takes too long
570        processor.add_job(BatchJob::Custom {
571            name: "timeout_job".to_string(),
572            operation: Box::new(|| {
573                thread::sleep(Duration::from_millis(200));
574                Ok(())
575            }),
576        });
577
578        let summary = processor.execute().unwrap();
579        // Job should complete (timeout not implemented yet)
580        assert_eq!(summary.failed, 0);
581    }
582
583    #[test]
584    fn test_batch_processor_memory_limit() {
585        let mut options = BatchOptions::default();
586        options.memory_limit_per_worker = 1024 * 1024; // 1MB
587
588        let processor = BatchProcessor::new(options);
589
590        // Verify memory limit is set
591        assert_eq!(processor.options.memory_limit_per_worker, 1024 * 1024);
592    }
593
594    #[test]
595    fn test_batch_progress_tracking() {
596        use std::sync::{Arc, Mutex};
597
598        let progress_updates = Arc::new(Mutex::new(Vec::new()));
599        let progress_clone = progress_updates.clone();
600
601        let mut options = BatchOptions::default();
602        options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
603            progress_clone.lock().unwrap().push(info.percentage());
604        }));
605
606        let mut processor = BatchProcessor::new(options);
607
608        // Add some jobs
609        for i in 0..5 {
610            processor.add_job(BatchJob::Custom {
611                name: format!("job_{}", i),
612                operation: Box::new(move || {
613                    thread::sleep(Duration::from_millis(10));
614                    Ok(())
615                }),
616            });
617        }
618
619        processor.execute().unwrap();
620
621        // Should have received progress updates
622        let updates = progress_updates.lock().unwrap();
623        assert!(!updates.is_empty());
624        // Final progress should be 100%
625        assert_eq!(*updates.last().unwrap(), 100.0);
626    }
627
628    #[test]
629    fn test_batch_processor_cancel_during_execution() {
630        // Test the cancel() and is_cancelled() methods
631        let processor = BatchProcessor::new(BatchOptions::default());
632
633        // Initially not cancelled
634        assert!(!processor.is_cancelled());
635
636        // Cancel the processor
637        processor.cancel();
638
639        // Should be cancelled now
640        assert!(processor.is_cancelled());
641
642        // Cancel again should be idempotent
643        processor.cancel();
644        assert!(processor.is_cancelled());
645    }
646
647    #[test]
648    fn test_batch_processor_without_progress_callback() {
649        // Test execution without progress callback (lines 216-218)
650        let options = BatchOptions::default(); // No progress callback
651        let mut processor = BatchProcessor::new(options);
652
653        processor.add_job(BatchJob::Custom {
654            name: "test_job".to_string(),
655            operation: Box::new(|| Ok(())),
656        });
657
658        let result = processor.execute();
659        assert!(result.is_ok());
660        let summary = result.unwrap();
661        assert_eq!(summary.successful, 1);
662    }
663
664    #[test]
665    fn test_batch_processor_early_completion_in_progress() {
666        // Test the is_complete() branch in progress tracking (lines 209-211)
667        use std::sync::{Arc, Mutex};
668
669        let progress_called = Arc::new(Mutex::new(false));
670        let progress_called_clone = Arc::clone(&progress_called);
671
672        let options = BatchOptions::default().with_progress_callback(move |info| {
673            *progress_called_clone.lock().unwrap() = true;
674            // Check if complete
675            if info.is_complete() {
676                // Progress is complete
677            }
678        });
679
680        let mut processor = BatchProcessor::new(options);
681
682        // Add a fast job that completes quickly
683        processor.add_job(BatchJob::Custom {
684            name: "fast".to_string(),
685            operation: Box::new(|| Ok(())),
686        });
687
688        let result = processor.execute();
689        assert!(result.is_ok());
690
691        // Progress callback should have been called
692        assert!(*progress_called.lock().unwrap());
693    }
694
695    #[test]
696    fn test_batch_options_all_builders() {
697        // Test all builder methods for BatchOptions
698        use std::time::Duration;
699
700        let callback_called = Arc::new(AtomicBool::new(false));
701        let callback_clone = Arc::clone(&callback_called);
702
703        let options = BatchOptions::default()
704            .with_parallelism(4)
705            .with_memory_limit(1024 * 1024)
706            .with_progress_callback(move |_| {
707                callback_clone.store(true, Ordering::SeqCst);
708            })
709            .stop_on_error(true)
710            .with_job_timeout(Duration::from_secs(10));
711
712        assert_eq!(options.parallelism, 4);
713        assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
714        assert!(options.stop_on_error);
715        assert_eq!(options.job_timeout, Some(Duration::from_secs(10)));
716        assert!(options.progress_callback.is_some());
717    }
718
719    #[test]
720    fn test_batch_processor_get_progress() {
721        // Test the get_progress() method (line 264)
722        let processor = BatchProcessor::new(BatchOptions::default());
723
724        let progress = processor.get_progress();
725        assert_eq!(progress.total_jobs, 0);
726        assert_eq!(progress.completed_jobs, 0);
727        assert_eq!(progress.failed_jobs, 0);
728        assert_eq!(progress.percentage(), 100.0); // 0 jobs = 100% complete
729    }
730
731    #[test]
732    fn test_batch_processor_with_real_timeout() {
733        // Test job timeout actually working (lines 189-191)
734        let mut options = BatchOptions::default();
735        options.job_timeout = Some(Duration::from_millis(10));
736        options.parallelism = 1;
737
738        let mut processor = BatchProcessor::new(options);
739
740        // Add a job that should timeout
741        processor.add_job(BatchJob::Custom {
742            name: "should_timeout".to_string(),
743            operation: Box::new(|| {
744                thread::sleep(Duration::from_millis(100));
745                Ok(())
746            }),
747        });
748
749        let summary = processor.execute().unwrap();
750        // Currently timeout is not enforced, but test the setup
751        assert_eq!(summary.total_jobs, 1);
752    }
753
754    #[test]
755    fn test_batch_processor_memory_limit_enforcement() {
756        // Test memory limit per worker (lines 188-189)
757        let mut options = BatchOptions::default();
758        options.memory_limit_per_worker = 1024; // Very small limit
759        options.parallelism = 2;
760
761        let mut processor = BatchProcessor::new(options);
762
763        // Add jobs that would use memory
764        for i in 0..5 {
765            processor.add_job(BatchJob::Custom {
766                name: format!("memory_job_{}", i),
767                operation: Box::new(move || {
768                    // Simulate memory usage
769                    let _data = vec![0u8; 512];
770                    Ok(())
771                }),
772            });
773        }
774
775        let summary = processor.execute().unwrap();
776        assert_eq!(summary.total_jobs, 5);
777    }
778
779    #[test]
780    fn test_batch_processor_stop_on_error_propagation() {
781        // Test stop_on_error with worker pool (lines 223-226)
782        let mut options = BatchOptions::default();
783        options.stop_on_error = true;
784        options.parallelism = 1; // Sequential to ensure order
785
786        let mut processor = BatchProcessor::new(options);
787
788        // Add job that succeeds
789        processor.add_job(BatchJob::Custom {
790            name: "success_1".to_string(),
791            operation: Box::new(|| Ok(())),
792        });
793
794        // Add job that fails
795        processor.add_job(BatchJob::Custom {
796            name: "failure".to_string(),
797            operation: Box::new(|| {
798                Err(crate::error::PdfError::InvalidOperation(
799                    "Intentional failure".to_string(),
800                ))
801            }),
802        });
803
804        // Add job that should not execute
805        processor.add_job(BatchJob::Custom {
806            name: "should_not_run".to_string(),
807            operation: Box::new(|| Ok(())),
808        });
809
810        let result = processor.execute();
811        // With stop_on_error, execution should stop after failure
812        assert!(result.is_err() || result.unwrap().failed > 0);
813    }
814
815    #[test]
816    fn test_batch_processor_concurrent_limit() {
817        // Test concurrent execution limit (lines 515-516)
818        use std::sync::atomic::AtomicUsize;
819
820        let concurrent_count = Arc::new(AtomicUsize::new(0));
821        let max_concurrent = Arc::new(AtomicUsize::new(0));
822
823        let mut options = BatchOptions::default();
824        options.parallelism = 2; // Limit to 2 concurrent jobs
825
826        let mut processor = BatchProcessor::new(options);
827
828        // Add jobs that track concurrency
829        for i in 0..10 {
830            let concurrent = Arc::clone(&concurrent_count);
831            let max = Arc::clone(&max_concurrent);
832
833            processor.add_job(BatchJob::Custom {
834                name: format!("concurrent_{}", i),
835                operation: Box::new(move || {
836                    let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
837
838                    // Update max if needed
839                    let mut max_val = max.load(Ordering::SeqCst);
840                    while current > max_val {
841                        match max.compare_exchange_weak(
842                            max_val,
843                            current,
844                            Ordering::SeqCst,
845                            Ordering::SeqCst,
846                        ) {
847                            Ok(_) => break,
848                            Err(x) => max_val = x,
849                        }
850                    }
851
852                    thread::sleep(Duration::from_millis(10));
853                    concurrent.fetch_sub(1, Ordering::SeqCst);
854                    Ok(())
855                }),
856            });
857        }
858
859        let summary = processor.execute().unwrap();
860        assert_eq!(summary.successful, 10);
861
862        // Verify parallelism limit was respected
863        let max_seen = max_concurrent.load(Ordering::SeqCst);
864        assert!(
865            max_seen <= 2,
866            "Max concurrent was {}, expected <= 2",
867            max_seen
868        );
869    }
870
871    #[test]
872    fn test_batch_processor_progress_with_failures() {
873        // Test progress tracking with failed jobs (lines 233-240)
874        use std::sync::{Arc, Mutex};
875
876        let progress_updates = Arc::new(Mutex::new(Vec::new()));
877        let progress_clone = Arc::clone(&progress_updates);
878
879        let mut options = BatchOptions::default();
880        options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
881            let mut updates = progress_clone.lock().unwrap();
882            updates.push((info.completed_jobs, info.failed_jobs, info.total_jobs));
883        }));
884
885        let mut processor = BatchProcessor::new(options);
886
887        // Add mix of successful and failing jobs
888        processor.add_job(BatchJob::Custom {
889            name: "success_1".to_string(),
890            operation: Box::new(|| Ok(())),
891        });
892
893        processor.add_job(BatchJob::Custom {
894            name: "fail_1".to_string(),
895            operation: Box::new(|| Err(crate::error::PdfError::InvalidFormat("test".to_string()))),
896        });
897
898        processor.add_job(BatchJob::Custom {
899            name: "success_2".to_string(),
900            operation: Box::new(|| Ok(())),
901        });
902
903        let summary = processor.execute().unwrap();
904        assert_eq!(summary.successful, 2);
905        assert_eq!(summary.failed, 1);
906
907        // Check that progress was tracked correctly
908        let updates = progress_updates.lock().unwrap();
909        assert!(!updates.is_empty());
910
911        // Final update should show correct counts
912        if let Some(&(completed, failed, total)) = updates.last() {
913            assert_eq!(total, 3);
914            assert_eq!(completed + failed, 3);
915        }
916    }
917}