1use crate::error::Result;
51use std::path::{Path, PathBuf};
52use std::sync::{
53 atomic::{AtomicBool, Ordering},
54 Arc, Mutex,
55};
56use std::thread;
57use std::time::{Duration, Instant};
58
59pub mod job;
60pub mod progress;
61pub mod result;
62pub mod worker;
63
64pub use job::{BatchJob, JobStatus, JobType};
66pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
67pub use result::{BatchResult, BatchSummary, JobResult};
68pub use worker::{WorkerOptions, WorkerPool};
69
70#[derive(Clone)]
72pub struct BatchOptions {
73 pub parallelism: usize,
75 pub memory_limit_per_worker: usize,
77 pub progress_interval: Duration,
79 pub stop_on_error: bool,
81 pub progress_callback: Option<Arc<dyn ProgressCallback>>,
83 pub job_timeout: Option<Duration>,
85}
86
87impl Default for BatchOptions {
88 fn default() -> Self {
89 Self {
90 parallelism: num_cpus::get().min(8),
91 memory_limit_per_worker: 512 * 1024 * 1024, progress_interval: Duration::from_millis(100),
93 stop_on_error: false,
94 progress_callback: None,
95 job_timeout: Some(Duration::from_secs(300)), }
97 }
98}
99
100impl BatchOptions {
101 pub fn with_parallelism(mut self, parallelism: usize) -> Self {
103 self.parallelism = parallelism.max(1);
104 self
105 }
106
107 pub fn with_memory_limit(mut self, bytes: usize) -> Self {
109 self.memory_limit_per_worker = bytes;
110 self
111 }
112
113 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
115 where
116 F: Fn(&ProgressInfo) + Send + Sync + 'static,
117 {
118 self.progress_callback = Some(Arc::new(callback));
119 self
120 }
121
122 pub fn stop_on_error(mut self, stop: bool) -> Self {
124 self.stop_on_error = stop;
125 self
126 }
127
128 pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
130 self.job_timeout = Some(timeout);
131 self
132 }
133}
134
135pub struct BatchProcessor {
137 options: BatchOptions,
138 jobs: Vec<BatchJob>,
139 cancelled: Arc<AtomicBool>,
140 progress: Arc<BatchProgress>,
141}
142
143impl BatchProcessor {
144 pub fn new(options: BatchOptions) -> Self {
146 Self {
147 options,
148 jobs: Vec::new(),
149 cancelled: Arc::new(AtomicBool::new(false)),
150 progress: Arc::new(BatchProgress::new()),
151 }
152 }
153
154 pub fn add_job(&mut self, job: BatchJob) {
156 self.jobs.push(job);
157 self.progress.add_job();
158 }
159
160 pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
162 for job in jobs {
163 self.add_job(job);
164 }
165 }
166
167 pub fn cancel(&self) {
169 self.cancelled.store(true, Ordering::SeqCst);
170 }
171
172 pub fn is_cancelled(&self) -> bool {
174 self.cancelled.load(Ordering::SeqCst)
175 }
176
177 pub fn execute(self) -> Result<BatchSummary> {
179 let start_time = Instant::now();
180 let total_jobs = self.jobs.len();
181
182 if total_jobs == 0 {
183 return Ok(BatchSummary::empty());
184 }
185
186 let worker_options = WorkerOptions {
188 num_workers: self.options.parallelism,
189 memory_limit: self.options.memory_limit_per_worker,
190 job_timeout: self.options.job_timeout,
191 };
192
193 let pool = WorkerPool::new(worker_options);
194 let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
195 let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
196
197 let progress_handle = if let Some(callback) = &self.options.progress_callback {
199 let progress = Arc::clone(&self.progress);
200 let callback = Arc::clone(callback);
201 let interval = self.options.progress_interval;
202 let cancelled = Arc::clone(&self.cancelled);
203
204 Some(thread::spawn(move || {
205 while !cancelled.load(Ordering::SeqCst) {
206 let info = progress.get_info();
207 callback.on_progress(&info);
208
209 if info.is_complete() {
210 break;
211 }
212
213 thread::sleep(interval);
214 }
215 }))
216 } else {
217 None
218 };
219
220 let job_results = pool.process_jobs(
222 self.jobs,
223 Arc::clone(&self.progress),
224 Arc::clone(&self.cancelled),
225 self.options.stop_on_error,
226 );
227
228 let mut successful = 0;
230 let mut failed = 0;
231 let mut all_results = Vec::new();
232
233 for result in job_results {
234 match &result {
235 JobResult::Success { .. } => successful += 1,
236 JobResult::Failed { .. } => failed += 1,
237 JobResult::Cancelled { .. } => {}
238 }
239 all_results.push(result);
240 }
241
242 if let Some(handle) = progress_handle {
244 let _ = handle.join();
245 }
246
247 if let Some(callback) = &self.options.progress_callback {
249 let final_info = self.progress.get_info();
250 callback.on_progress(&final_info);
251 }
252
253 Ok(BatchSummary {
254 total_jobs,
255 successful,
256 failed,
257 cancelled: self.cancelled.load(Ordering::SeqCst),
258 duration: start_time.elapsed(),
259 results: all_results,
260 })
261 }
262
263 pub fn get_progress(&self) -> ProgressInfo {
265 self.progress.get_info()
266 }
267}
268
269pub fn batch_process_files<P, F>(
271 files: Vec<P>,
272 operation: F,
273 options: BatchOptions,
274) -> Result<BatchSummary>
275where
276 P: AsRef<Path>,
277 F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
278{
279 let mut processor = BatchProcessor::new(options);
280
281 for file in files {
282 let path = file.as_ref().to_path_buf();
283 let op = operation.clone();
284
285 processor.add_job(BatchJob::Custom {
286 name: format!("Process {}", path.display()),
287 operation: Box::new(move || op(&path)),
288 });
289 }
290
291 processor.execute()
292}
293
294pub fn batch_split_pdfs<P: AsRef<Path>>(
296 files: Vec<P>,
297 pages_per_file: usize,
298 options: BatchOptions,
299) -> Result<BatchSummary> {
300 let mut processor = BatchProcessor::new(options);
301
302 for file in files {
303 let path = file.as_ref();
304 processor.add_job(BatchJob::Split {
305 input: path.to_path_buf(),
306 output_pattern: format!(
307 "{}_page_%d.pdf",
308 path.file_stem()
309 .and_then(|stem| stem.to_str())
310 .unwrap_or("output")
311 ),
312 pages_per_file,
313 });
314 }
315
316 processor.execute()
317}
318
319pub fn batch_merge_pdfs(
321 merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
322 options: BatchOptions,
323) -> Result<BatchSummary> {
324 let mut processor = BatchProcessor::new(options);
325
326 for (inputs, output) in merge_groups {
327 processor.add_job(BatchJob::Merge { inputs, output });
328 }
329
330 processor.execute()
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_batch_options_default() {
339 let options = BatchOptions::default();
340 assert!(options.parallelism > 0);
341 assert!(options.parallelism <= 8);
342 assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
343 assert!(!options.stop_on_error);
344 }
345
346 #[test]
347 fn test_batch_options_builder() {
348 let called = Arc::new(AtomicBool::new(false));
349 let called_clone = Arc::clone(&called);
350
351 let options = BatchOptions::default()
352 .with_parallelism(4)
353 .with_memory_limit(1024 * 1024 * 1024)
354 .stop_on_error(true)
355 .with_job_timeout(Duration::from_secs(60))
356 .with_progress_callback(move |_info| {
357 called_clone.store(true, Ordering::SeqCst);
358 });
359
360 assert_eq!(options.parallelism, 4);
361 assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
362 assert!(options.stop_on_error);
363 assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
364 assert!(options.progress_callback.is_some());
365 }
366
367 #[test]
368 fn test_batch_processor_creation() {
369 let processor = BatchProcessor::new(BatchOptions::default());
370 assert_eq!(processor.jobs.len(), 0);
371 assert!(!processor.is_cancelled());
372 }
373
374 #[test]
375 fn test_batch_processor_add_jobs() {
376 let mut processor = BatchProcessor::new(BatchOptions::default());
377
378 processor.add_job(BatchJob::Custom {
379 name: "Test Job 1".to_string(),
380 operation: Box::new(|| Ok(())),
381 });
382
383 processor.add_jobs(vec![
384 BatchJob::Custom {
385 name: "Test Job 2".to_string(),
386 operation: Box::new(|| Ok(())),
387 },
388 BatchJob::Custom {
389 name: "Test Job 3".to_string(),
390 operation: Box::new(|| Ok(())),
391 },
392 ]);
393
394 assert_eq!(processor.jobs.len(), 3);
395 }
396
397 #[test]
398 fn test_batch_processor_cancel() {
399 let processor = BatchProcessor::new(BatchOptions::default());
400 assert!(!processor.is_cancelled());
401
402 processor.cancel();
403 assert!(processor.is_cancelled());
404 }
405
406 #[test]
407 fn test_empty_batch_execution() {
408 let processor = BatchProcessor::new(BatchOptions::default());
409 let summary = processor.execute().unwrap();
410
411 assert_eq!(summary.total_jobs, 0);
412 assert_eq!(summary.successful, 0);
413 assert_eq!(summary.failed, 0);
414 assert!(!summary.cancelled);
415 }
416
417 #[test]
418 fn test_batch_options_builder_advanced() {
419 let options = BatchOptions::default()
420 .with_parallelism(4)
421 .with_memory_limit(1024 * 1024)
422 .stop_on_error(true)
423 .with_job_timeout(Duration::from_secs(60));
424
425 assert_eq!(options.parallelism, 4);
426 assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
427 assert!(options.stop_on_error);
428 assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
429 }
430
431 #[test]
432 fn test_batch_processor_with_multiple_jobs() {
433 let mut processor = BatchProcessor::new(BatchOptions::default());
434
435 for i in 0..5 {
437 processor.add_job(BatchJob::Custom {
438 name: format!("job_{}", i),
439 operation: Box::new(move || {
440 thread::sleep(Duration::from_millis(10));
442 Ok(())
443 }),
444 });
445 }
446
447 let summary = processor.execute().unwrap();
448 assert_eq!(summary.total_jobs, 5);
449 assert_eq!(summary.successful, 5);
450 assert_eq!(summary.failed, 0);
451 }
452
453 #[test]
454 fn test_batch_processor_with_failing_jobs() {
455 let mut processor = BatchProcessor::new(BatchOptions::default());
456
457 processor.add_job(BatchJob::Custom {
459 name: "success".to_string(),
460 operation: Box::new(|| Ok(())),
461 });
462
463 processor.add_job(BatchJob::Custom {
464 name: "failure".to_string(),
465 operation: Box::new(|| {
466 Err(crate::error::PdfError::InvalidStructure(
467 "Test error".to_string(),
468 ))
469 }),
470 });
471
472 let summary = processor.execute().unwrap();
473 assert_eq!(summary.total_jobs, 2);
474 assert_eq!(summary.successful, 1);
475 assert_eq!(summary.failed, 1);
476 }
477
478 #[test]
479 fn test_batch_processor_stop_on_error() {
480 let mut options = BatchOptions::default();
481 options.stop_on_error = true;
482 options.parallelism = 1; let mut processor = BatchProcessor::new(options);
485
486 processor.add_job(BatchJob::Custom {
488 name: "job1".to_string(),
489 operation: Box::new(|| Ok(())),
490 });
491
492 processor.add_job(BatchJob::Custom {
493 name: "job2".to_string(),
494 operation: Box::new(|| {
495 Err(crate::error::PdfError::Io(std::io::Error::new(
496 std::io::ErrorKind::Other,
497 "Test error",
498 )))
499 }),
500 });
501
502 processor.add_job(BatchJob::Custom {
503 name: "job3".to_string(),
504 operation: Box::new(|| Ok(())),
505 });
506
507 let result = processor.execute();
508 assert!(result.is_err() || result.unwrap().failed > 0);
509 }
510
511 #[test]
512 fn test_batch_processor_parallelism() {
513 use std::sync::atomic::{AtomicUsize, Ordering};
514 use std::sync::Arc;
515
516 let mut options = BatchOptions::default();
517 options.parallelism = 4;
518
519 let mut processor = BatchProcessor::new(options);
520 let concurrent_count = Arc::new(AtomicUsize::new(0));
521 let max_concurrent = Arc::new(AtomicUsize::new(0));
522
523 for i in 0..10 {
525 let concurrent = concurrent_count.clone();
526 let max = max_concurrent.clone();
527
528 processor.add_job(BatchJob::Custom {
529 name: format!("job_{}", i),
530 operation: Box::new(move || {
531 let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
532 let mut max_val = max.load(Ordering::SeqCst);
533 while current > max_val {
534 match max.compare_exchange_weak(
535 max_val,
536 current,
537 Ordering::SeqCst,
538 Ordering::SeqCst,
539 ) {
540 Ok(_) => break,
541 Err(x) => max_val = x,
542 }
543 }
544
545 thread::sleep(Duration::from_millis(50));
546 concurrent.fetch_sub(1, Ordering::SeqCst);
547
548 Ok(())
549 }),
550 });
551 }
552
553 let summary = processor.execute().unwrap();
554 assert_eq!(summary.successful, 10);
555
556 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
558 assert!(max_concurrent.load(Ordering::SeqCst) <= 4);
559 }
560
561 #[test]
562 fn test_batch_processor_timeout() {
563 let mut options = BatchOptions::default();
564 options.job_timeout = Some(Duration::from_millis(50));
565 options.parallelism = 1;
566
567 let mut processor = BatchProcessor::new(options);
568
569 processor.add_job(BatchJob::Custom {
571 name: "timeout_job".to_string(),
572 operation: Box::new(|| {
573 thread::sleep(Duration::from_millis(200));
574 Ok(())
575 }),
576 });
577
578 let summary = processor.execute().unwrap();
579 assert_eq!(summary.failed, 0);
581 }
582
583 #[test]
584 fn test_batch_processor_memory_limit() {
585 let mut options = BatchOptions::default();
586 options.memory_limit_per_worker = 1024 * 1024; let processor = BatchProcessor::new(options);
589
590 assert_eq!(processor.options.memory_limit_per_worker, 1024 * 1024);
592 }
593
594 #[test]
595 fn test_batch_progress_tracking() {
596 use std::sync::{Arc, Mutex};
597
598 let progress_updates = Arc::new(Mutex::new(Vec::new()));
599 let progress_clone = progress_updates.clone();
600
601 let mut options = BatchOptions::default();
602 options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
603 progress_clone.lock().unwrap().push(info.percentage());
604 }));
605
606 let mut processor = BatchProcessor::new(options);
607
608 for i in 0..5 {
610 processor.add_job(BatchJob::Custom {
611 name: format!("job_{}", i),
612 operation: Box::new(move || {
613 thread::sleep(Duration::from_millis(10));
614 Ok(())
615 }),
616 });
617 }
618
619 processor.execute().unwrap();
620
621 let updates = progress_updates.lock().unwrap();
623 assert!(!updates.is_empty());
624 assert_eq!(*updates.last().unwrap(), 100.0);
626 }
627
628 #[test]
629 fn test_batch_processor_cancel_during_execution() {
630 let processor = BatchProcessor::new(BatchOptions::default());
632
633 assert!(!processor.is_cancelled());
635
636 processor.cancel();
638
639 assert!(processor.is_cancelled());
641
642 processor.cancel();
644 assert!(processor.is_cancelled());
645 }
646
647 #[test]
648 fn test_batch_processor_without_progress_callback() {
649 let options = BatchOptions::default(); let mut processor = BatchProcessor::new(options);
652
653 processor.add_job(BatchJob::Custom {
654 name: "test_job".to_string(),
655 operation: Box::new(|| Ok(())),
656 });
657
658 let result = processor.execute();
659 assert!(result.is_ok());
660 let summary = result.unwrap();
661 assert_eq!(summary.successful, 1);
662 }
663
664 #[test]
665 fn test_batch_processor_early_completion_in_progress() {
666 use std::sync::{Arc, Mutex};
668
669 let progress_called = Arc::new(Mutex::new(false));
670 let progress_called_clone = Arc::clone(&progress_called);
671
672 let options = BatchOptions::default().with_progress_callback(move |info| {
673 *progress_called_clone.lock().unwrap() = true;
674 if info.is_complete() {
676 }
678 });
679
680 let mut processor = BatchProcessor::new(options);
681
682 processor.add_job(BatchJob::Custom {
684 name: "fast".to_string(),
685 operation: Box::new(|| Ok(())),
686 });
687
688 let result = processor.execute();
689 assert!(result.is_ok());
690
691 assert!(*progress_called.lock().unwrap());
693 }
694
695 #[test]
696 fn test_batch_options_all_builders() {
697 use std::time::Duration;
699
700 let callback_called = Arc::new(AtomicBool::new(false));
701 let callback_clone = Arc::clone(&callback_called);
702
703 let options = BatchOptions::default()
704 .with_parallelism(4)
705 .with_memory_limit(1024 * 1024)
706 .with_progress_callback(move |_| {
707 callback_clone.store(true, Ordering::SeqCst);
708 })
709 .stop_on_error(true)
710 .with_job_timeout(Duration::from_secs(10));
711
712 assert_eq!(options.parallelism, 4);
713 assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
714 assert!(options.stop_on_error);
715 assert_eq!(options.job_timeout, Some(Duration::from_secs(10)));
716 assert!(options.progress_callback.is_some());
717 }
718
719 #[test]
720 fn test_batch_processor_get_progress() {
721 let processor = BatchProcessor::new(BatchOptions::default());
723
724 let progress = processor.get_progress();
725 assert_eq!(progress.total_jobs, 0);
726 assert_eq!(progress.completed_jobs, 0);
727 assert_eq!(progress.failed_jobs, 0);
728 assert_eq!(progress.percentage(), 100.0); }
730
731 #[test]
732 fn test_batch_processor_with_real_timeout() {
733 let mut options = BatchOptions::default();
735 options.job_timeout = Some(Duration::from_millis(10));
736 options.parallelism = 1;
737
738 let mut processor = BatchProcessor::new(options);
739
740 processor.add_job(BatchJob::Custom {
742 name: "should_timeout".to_string(),
743 operation: Box::new(|| {
744 thread::sleep(Duration::from_millis(100));
745 Ok(())
746 }),
747 });
748
749 let summary = processor.execute().unwrap();
750 assert_eq!(summary.total_jobs, 1);
752 }
753
754 #[test]
755 fn test_batch_processor_memory_limit_enforcement() {
756 let mut options = BatchOptions::default();
758 options.memory_limit_per_worker = 1024; options.parallelism = 2;
760
761 let mut processor = BatchProcessor::new(options);
762
763 for i in 0..5 {
765 processor.add_job(BatchJob::Custom {
766 name: format!("memory_job_{}", i),
767 operation: Box::new(move || {
768 let _data = vec![0u8; 512];
770 Ok(())
771 }),
772 });
773 }
774
775 let summary = processor.execute().unwrap();
776 assert_eq!(summary.total_jobs, 5);
777 }
778
779 #[test]
780 fn test_batch_processor_stop_on_error_propagation() {
781 let mut options = BatchOptions::default();
783 options.stop_on_error = true;
784 options.parallelism = 1; let mut processor = BatchProcessor::new(options);
787
788 processor.add_job(BatchJob::Custom {
790 name: "success_1".to_string(),
791 operation: Box::new(|| Ok(())),
792 });
793
794 processor.add_job(BatchJob::Custom {
796 name: "failure".to_string(),
797 operation: Box::new(|| {
798 Err(crate::error::PdfError::InvalidOperation(
799 "Intentional failure".to_string(),
800 ))
801 }),
802 });
803
804 processor.add_job(BatchJob::Custom {
806 name: "should_not_run".to_string(),
807 operation: Box::new(|| Ok(())),
808 });
809
810 let result = processor.execute();
811 assert!(result.is_err() || result.unwrap().failed > 0);
813 }
814
815 #[test]
816 fn test_batch_processor_concurrent_limit() {
817 use std::sync::atomic::AtomicUsize;
819
820 let concurrent_count = Arc::new(AtomicUsize::new(0));
821 let max_concurrent = Arc::new(AtomicUsize::new(0));
822
823 let mut options = BatchOptions::default();
824 options.parallelism = 2; let mut processor = BatchProcessor::new(options);
827
828 for i in 0..10 {
830 let concurrent = Arc::clone(&concurrent_count);
831 let max = Arc::clone(&max_concurrent);
832
833 processor.add_job(BatchJob::Custom {
834 name: format!("concurrent_{}", i),
835 operation: Box::new(move || {
836 let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
837
838 let mut max_val = max.load(Ordering::SeqCst);
840 while current > max_val {
841 match max.compare_exchange_weak(
842 max_val,
843 current,
844 Ordering::SeqCst,
845 Ordering::SeqCst,
846 ) {
847 Ok(_) => break,
848 Err(x) => max_val = x,
849 }
850 }
851
852 thread::sleep(Duration::from_millis(10));
853 concurrent.fetch_sub(1, Ordering::SeqCst);
854 Ok(())
855 }),
856 });
857 }
858
859 let summary = processor.execute().unwrap();
860 assert_eq!(summary.successful, 10);
861
862 let max_seen = max_concurrent.load(Ordering::SeqCst);
864 assert!(
865 max_seen <= 2,
866 "Max concurrent was {}, expected <= 2",
867 max_seen
868 );
869 }
870
871 #[test]
872 fn test_batch_processor_progress_with_failures() {
873 use std::sync::{Arc, Mutex};
875
876 let progress_updates = Arc::new(Mutex::new(Vec::new()));
877 let progress_clone = Arc::clone(&progress_updates);
878
879 let mut options = BatchOptions::default();
880 options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
881 let mut updates = progress_clone.lock().unwrap();
882 updates.push((info.completed_jobs, info.failed_jobs, info.total_jobs));
883 }));
884
885 let mut processor = BatchProcessor::new(options);
886
887 processor.add_job(BatchJob::Custom {
889 name: "success_1".to_string(),
890 operation: Box::new(|| Ok(())),
891 });
892
893 processor.add_job(BatchJob::Custom {
894 name: "fail_1".to_string(),
895 operation: Box::new(|| Err(crate::error::PdfError::InvalidFormat("test".to_string()))),
896 });
897
898 processor.add_job(BatchJob::Custom {
899 name: "success_2".to_string(),
900 operation: Box::new(|| Ok(())),
901 });
902
903 let summary = processor.execute().unwrap();
904 assert_eq!(summary.successful, 2);
905 assert_eq!(summary.failed, 1);
906
907 let updates = progress_updates.lock().unwrap();
909 assert!(!updates.is_empty());
910
911 if let Some(&(completed, failed, total)) = updates.last() {
913 assert_eq!(total, 3);
914 assert_eq!(completed + failed, 3);
915 }
916 }
917}