1use crate::error::Result;
51use std::path::{Path, PathBuf};
52use std::sync::{
53 atomic::{AtomicBool, Ordering},
54 Arc, Mutex,
55};
56use std::thread;
57use std::time::{Duration, Instant};
58
59pub mod job;
60pub mod progress;
61pub mod result;
62pub mod worker;
63
64pub use job::{BatchJob, JobStatus, JobType};
66pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
67pub use result::{BatchResult, BatchSummary, JobResult};
68pub use worker::{WorkerOptions, WorkerPool};
69
70#[derive(Clone)]
72pub struct BatchOptions {
73 pub parallelism: usize,
75 pub memory_limit_per_worker: usize,
77 pub progress_interval: Duration,
79 pub stop_on_error: bool,
81 pub progress_callback: Option<Arc<dyn ProgressCallback>>,
83 pub job_timeout: Option<Duration>,
85}
86
87impl Default for BatchOptions {
88 fn default() -> Self {
89 Self {
90 parallelism: num_cpus::get().min(8),
91 memory_limit_per_worker: 512 * 1024 * 1024, progress_interval: Duration::from_millis(100),
93 stop_on_error: false,
94 progress_callback: None,
95 job_timeout: Some(Duration::from_secs(300)), }
97 }
98}
99
100impl BatchOptions {
101 pub fn with_parallelism(mut self, parallelism: usize) -> Self {
103 self.parallelism = parallelism.max(1);
104 self
105 }
106
107 pub fn with_memory_limit(mut self, bytes: usize) -> Self {
109 self.memory_limit_per_worker = bytes;
110 self
111 }
112
113 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
115 where
116 F: Fn(&ProgressInfo) + Send + Sync + 'static,
117 {
118 self.progress_callback = Some(Arc::new(callback));
119 self
120 }
121
122 pub fn stop_on_error(mut self, stop: bool) -> Self {
124 self.stop_on_error = stop;
125 self
126 }
127
128 pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
130 self.job_timeout = Some(timeout);
131 self
132 }
133}
134
135pub struct BatchProcessor {
137 options: BatchOptions,
138 jobs: Vec<BatchJob>,
139 cancelled: Arc<AtomicBool>,
140 progress: Arc<BatchProgress>,
141}
142
143impl BatchProcessor {
144 pub fn new(options: BatchOptions) -> Self {
146 Self {
147 options,
148 jobs: Vec::new(),
149 cancelled: Arc::new(AtomicBool::new(false)),
150 progress: Arc::new(BatchProgress::new()),
151 }
152 }
153
154 pub fn add_job(&mut self, job: BatchJob) {
156 self.jobs.push(job);
157 self.progress.add_job();
158 }
159
160 pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
162 for job in jobs {
163 self.add_job(job);
164 }
165 }
166
167 pub fn cancel(&self) {
169 self.cancelled.store(true, Ordering::SeqCst);
170 }
171
172 pub fn is_cancelled(&self) -> bool {
174 self.cancelled.load(Ordering::SeqCst)
175 }
176
177 pub fn execute(self) -> Result<BatchSummary> {
179 let start_time = Instant::now();
180 let total_jobs = self.jobs.len();
181
182 if total_jobs == 0 {
183 return Ok(BatchSummary::empty());
184 }
185
186 let worker_options = WorkerOptions {
188 num_workers: self.options.parallelism,
189 memory_limit: self.options.memory_limit_per_worker,
190 job_timeout: self.options.job_timeout,
191 };
192
193 let pool = WorkerPool::new(worker_options);
194 let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
195 let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
196
197 let progress_handle = if let Some(callback) = &self.options.progress_callback {
199 let progress = Arc::clone(&self.progress);
200 let callback = Arc::clone(callback);
201 let interval = self.options.progress_interval;
202 let cancelled = Arc::clone(&self.cancelled);
203
204 Some(thread::spawn(move || {
205 while !cancelled.load(Ordering::SeqCst) {
206 let info = progress.get_info();
207 callback.on_progress(&info);
208
209 if info.is_complete() {
210 break;
211 }
212
213 thread::sleep(interval);
214 }
215 }))
216 } else {
217 None
218 };
219
220 let job_results = pool.process_jobs(
222 self.jobs,
223 Arc::clone(&self.progress),
224 Arc::clone(&self.cancelled),
225 self.options.stop_on_error,
226 );
227
228 let mut successful = 0;
230 let mut failed = 0;
231 let mut all_results = Vec::new();
232
233 for result in job_results {
234 match &result {
235 JobResult::Success { .. } => successful += 1,
236 JobResult::Failed { .. } => failed += 1,
237 JobResult::Cancelled { .. } => {}
238 }
239 all_results.push(result);
240 }
241
242 if let Some(handle) = progress_handle {
244 let _ = handle.join();
245 }
246
247 if let Some(callback) = &self.options.progress_callback {
249 let final_info = self.progress.get_info();
250 callback.on_progress(&final_info);
251 }
252
253 Ok(BatchSummary {
254 total_jobs,
255 successful,
256 failed,
257 cancelled: self.cancelled.load(Ordering::SeqCst),
258 duration: start_time.elapsed(),
259 results: all_results,
260 })
261 }
262
263 pub fn get_progress(&self) -> ProgressInfo {
265 self.progress.get_info()
266 }
267}
268
269pub fn batch_process_files<P, F>(
271 files: Vec<P>,
272 operation: F,
273 options: BatchOptions,
274) -> Result<BatchSummary>
275where
276 P: AsRef<Path>,
277 F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
278{
279 let mut processor = BatchProcessor::new(options);
280
281 for file in files {
282 let path = file.as_ref().to_path_buf();
283 let op = operation.clone();
284
285 processor.add_job(BatchJob::Custom {
286 name: format!("Process {}", path.display()),
287 operation: Box::new(move || op(&path)),
288 });
289 }
290
291 processor.execute()
292}
293
294pub fn batch_split_pdfs<P: AsRef<Path>>(
296 files: Vec<P>,
297 pages_per_file: usize,
298 options: BatchOptions,
299) -> Result<BatchSummary> {
300 let mut processor = BatchProcessor::new(options);
301
302 for file in files {
303 let path = file.as_ref();
304 processor.add_job(BatchJob::Split {
305 input: path.to_path_buf(),
306 output_pattern: format!(
307 "{}_page_%d.pdf",
308 path.file_stem()
309 .and_then(|stem| stem.to_str())
310 .unwrap_or("output")
311 ),
312 pages_per_file,
313 });
314 }
315
316 processor.execute()
317}
318
319pub fn batch_merge_pdfs(
321 merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
322 options: BatchOptions,
323) -> Result<BatchSummary> {
324 let mut processor = BatchProcessor::new(options);
325
326 for (inputs, output) in merge_groups {
327 processor.add_job(BatchJob::Merge { inputs, output });
328 }
329
330 processor.execute()
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_batch_options_default() {
339 let options = BatchOptions::default();
340 assert!(options.parallelism > 0);
341 assert!(options.parallelism <= 8);
342 assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
343 assert!(!options.stop_on_error);
344 }
345
346 #[test]
347 fn test_batch_options_builder() {
348 let called = Arc::new(AtomicBool::new(false));
349 let called_clone = Arc::clone(&called);
350
351 let options = BatchOptions::default()
352 .with_parallelism(4)
353 .with_memory_limit(1024 * 1024 * 1024)
354 .stop_on_error(true)
355 .with_job_timeout(Duration::from_secs(60))
356 .with_progress_callback(move |_info| {
357 called_clone.store(true, Ordering::SeqCst);
358 });
359
360 assert_eq!(options.parallelism, 4);
361 assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
362 assert!(options.stop_on_error);
363 assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
364 assert!(options.progress_callback.is_some());
365 }
366
367 #[test]
368 fn test_batch_processor_creation() {
369 let processor = BatchProcessor::new(BatchOptions::default());
370 assert_eq!(processor.jobs.len(), 0);
371 assert!(!processor.is_cancelled());
372 }
373
374 #[test]
375 fn test_batch_processor_add_jobs() {
376 let mut processor = BatchProcessor::new(BatchOptions::default());
377
378 processor.add_job(BatchJob::Custom {
379 name: "Test Job 1".to_string(),
380 operation: Box::new(|| Ok(())),
381 });
382
383 processor.add_jobs(vec![
384 BatchJob::Custom {
385 name: "Test Job 2".to_string(),
386 operation: Box::new(|| Ok(())),
387 },
388 BatchJob::Custom {
389 name: "Test Job 3".to_string(),
390 operation: Box::new(|| Ok(())),
391 },
392 ]);
393
394 assert_eq!(processor.jobs.len(), 3);
395 }
396
397 #[test]
398 fn test_batch_processor_cancel() {
399 let processor = BatchProcessor::new(BatchOptions::default());
400 assert!(!processor.is_cancelled());
401
402 processor.cancel();
403 assert!(processor.is_cancelled());
404 }
405
406 #[test]
407 fn test_empty_batch_execution() {
408 let processor = BatchProcessor::new(BatchOptions::default());
409 let summary = processor.execute().unwrap();
410
411 assert_eq!(summary.total_jobs, 0);
412 assert_eq!(summary.successful, 0);
413 assert_eq!(summary.failed, 0);
414 assert!(!summary.cancelled);
415 }
416
417 #[test]
418 fn test_batch_options_builder_advanced() {
419 let options = BatchOptions::default()
420 .with_parallelism(4)
421 .with_memory_limit(1024 * 1024)
422 .stop_on_error(true)
423 .with_job_timeout(Duration::from_secs(60));
424
425 assert_eq!(options.parallelism, 4);
426 assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
427 assert!(options.stop_on_error);
428 assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
429 }
430
431 #[test]
432 fn test_batch_processor_with_multiple_jobs() {
433 let mut processor = BatchProcessor::new(BatchOptions::default());
434
435 for i in 0..5 {
437 processor.add_job(BatchJob::Custom {
438 name: format!("job_{}", i),
439 operation: Box::new(move || {
440 thread::sleep(Duration::from_millis(10));
442 Ok(())
443 }),
444 });
445 }
446
447 let summary = processor.execute().unwrap();
448 assert_eq!(summary.total_jobs, 5);
449 assert_eq!(summary.successful, 5);
450 assert_eq!(summary.failed, 0);
451 }
452
453 #[test]
454 fn test_batch_processor_with_failing_jobs() {
455 let mut processor = BatchProcessor::new(BatchOptions::default());
456
457 processor.add_job(BatchJob::Custom {
459 name: "success".to_string(),
460 operation: Box::new(|| Ok(())),
461 });
462
463 processor.add_job(BatchJob::Custom {
464 name: "failure".to_string(),
465 operation: Box::new(|| {
466 Err(crate::error::PdfError::InvalidStructure(
467 "Test error".to_string(),
468 ))
469 }),
470 });
471
472 let summary = processor.execute().unwrap();
473 assert_eq!(summary.total_jobs, 2);
474 assert_eq!(summary.successful, 1);
475 assert_eq!(summary.failed, 1);
476 }
477
478 #[test]
479 fn test_batch_processor_stop_on_error() {
480 let options = BatchOptions {
481 stop_on_error: true,
482 parallelism: 1,
483 ..Default::default()
484 };
485
486 let mut processor = BatchProcessor::new(options);
487
488 processor.add_job(BatchJob::Custom {
490 name: "job1".to_string(),
491 operation: Box::new(|| Ok(())),
492 });
493
494 processor.add_job(BatchJob::Custom {
495 name: "job2".to_string(),
496 operation: Box::new(|| {
497 Err(crate::error::PdfError::Io(std::io::Error::other(
498 "Test error",
499 )))
500 }),
501 });
502
503 processor.add_job(BatchJob::Custom {
504 name: "job3".to_string(),
505 operation: Box::new(|| Ok(())),
506 });
507
508 let result = processor.execute();
509 assert!(result.is_err() || result.unwrap().failed > 0);
510 }
511
512 #[test]
513 fn test_batch_processor_parallelism() {
514 use std::sync::atomic::{AtomicUsize, Ordering};
515 use std::sync::Arc;
516
517 let options = BatchOptions {
518 parallelism: 4,
519 ..Default::default()
520 };
521
522 let mut processor = BatchProcessor::new(options);
523 let concurrent_count = Arc::new(AtomicUsize::new(0));
524 let max_concurrent = Arc::new(AtomicUsize::new(0));
525
526 for i in 0..10 {
528 let concurrent = concurrent_count.clone();
529 let max = max_concurrent.clone();
530
531 processor.add_job(BatchJob::Custom {
532 name: format!("job_{}", i),
533 operation: Box::new(move || {
534 let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
535 let mut max_val = max.load(Ordering::SeqCst);
536 while current > max_val {
537 match max.compare_exchange_weak(
538 max_val,
539 current,
540 Ordering::SeqCst,
541 Ordering::SeqCst,
542 ) {
543 Ok(_) => break,
544 Err(x) => max_val = x,
545 }
546 }
547
548 thread::sleep(Duration::from_millis(50));
549 concurrent.fetch_sub(1, Ordering::SeqCst);
550
551 Ok(())
552 }),
553 });
554 }
555
556 let summary = processor.execute().unwrap();
557 assert_eq!(summary.successful, 10);
558
559 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
561 assert!(max_concurrent.load(Ordering::SeqCst) <= 4);
562 }
563
564 #[test]
565 fn test_batch_processor_timeout() {
566 let options = BatchOptions {
567 job_timeout: Some(Duration::from_millis(50)),
568 parallelism: 1,
569 ..Default::default()
570 };
571
572 let mut processor = BatchProcessor::new(options);
573
574 processor.add_job(BatchJob::Custom {
576 name: "timeout_job".to_string(),
577 operation: Box::new(|| {
578 thread::sleep(Duration::from_millis(200));
579 Ok(())
580 }),
581 });
582
583 let summary = processor.execute().unwrap();
584 assert_eq!(summary.failed, 0);
586 }
587
588 #[test]
589 fn test_batch_processor_memory_limit() {
590 let options = BatchOptions {
591 memory_limit_per_worker: 1024 * 1024, ..Default::default()
593 };
594
595 let processor = BatchProcessor::new(options);
596
597 assert_eq!(processor.options.memory_limit_per_worker, 1024 * 1024);
599 }
600
601 #[test]
602 fn test_batch_progress_tracking() {
603 use std::sync::{Arc, Mutex};
604
605 let progress_updates = Arc::new(Mutex::new(Vec::new()));
606 let progress_clone = progress_updates.clone();
607
608 let options = BatchOptions {
609 progress_callback: Some(Arc::new(move |info: &ProgressInfo| {
610 progress_clone.lock().unwrap().push(info.percentage());
611 })),
612 ..Default::default()
613 };
614
615 let mut processor = BatchProcessor::new(options);
616
617 for i in 0..5 {
619 processor.add_job(BatchJob::Custom {
620 name: format!("job_{}", i),
621 operation: Box::new(move || {
622 thread::sleep(Duration::from_millis(10));
623 Ok(())
624 }),
625 });
626 }
627
628 processor.execute().unwrap();
629
630 let updates = progress_updates.lock().unwrap();
632 assert!(!updates.is_empty());
633 assert_eq!(*updates.last().unwrap(), 100.0);
635 }
636
637 #[test]
638 fn test_batch_processor_cancel_during_execution() {
639 let processor = BatchProcessor::new(BatchOptions::default());
641
642 assert!(!processor.is_cancelled());
644
645 processor.cancel();
647
648 assert!(processor.is_cancelled());
650
651 processor.cancel();
653 assert!(processor.is_cancelled());
654 }
655
656 #[test]
657 fn test_batch_processor_without_progress_callback() {
658 let options = BatchOptions::default(); let mut processor = BatchProcessor::new(options);
661
662 processor.add_job(BatchJob::Custom {
663 name: "test_job".to_string(),
664 operation: Box::new(|| Ok(())),
665 });
666
667 let result = processor.execute();
668 assert!(result.is_ok());
669 let summary = result.unwrap();
670 assert_eq!(summary.successful, 1);
671 }
672
673 #[test]
674 fn test_batch_processor_early_completion_in_progress() {
675 use std::sync::{Arc, Mutex};
677
678 let progress_called = Arc::new(Mutex::new(false));
679 let progress_called_clone = Arc::clone(&progress_called);
680
681 let options = BatchOptions::default().with_progress_callback(move |info| {
682 *progress_called_clone.lock().unwrap() = true;
683 if info.is_complete() {
685 }
687 });
688
689 let mut processor = BatchProcessor::new(options);
690
691 processor.add_job(BatchJob::Custom {
693 name: "fast".to_string(),
694 operation: Box::new(|| Ok(())),
695 });
696
697 let result = processor.execute();
698 assert!(result.is_ok());
699
700 assert!(*progress_called.lock().unwrap());
702 }
703
704 #[test]
705 fn test_batch_options_all_builders() {
706 use std::time::Duration;
708
709 let callback_called = Arc::new(AtomicBool::new(false));
710 let callback_clone = Arc::clone(&callback_called);
711
712 let options = BatchOptions::default()
713 .with_parallelism(4)
714 .with_memory_limit(1024 * 1024)
715 .with_progress_callback(move |_| {
716 callback_clone.store(true, Ordering::SeqCst);
717 })
718 .stop_on_error(true)
719 .with_job_timeout(Duration::from_secs(10));
720
721 assert_eq!(options.parallelism, 4);
722 assert_eq!(options.memory_limit_per_worker, 1024 * 1024);
723 assert!(options.stop_on_error);
724 assert_eq!(options.job_timeout, Some(Duration::from_secs(10)));
725 assert!(options.progress_callback.is_some());
726 }
727
728 #[test]
729 fn test_batch_processor_get_progress() {
730 let processor = BatchProcessor::new(BatchOptions::default());
732
733 let progress = processor.get_progress();
734 assert_eq!(progress.total_jobs, 0);
735 assert_eq!(progress.completed_jobs, 0);
736 assert_eq!(progress.failed_jobs, 0);
737 assert_eq!(progress.percentage(), 100.0); }
739
740 #[test]
741 fn test_batch_processor_with_real_timeout() {
742 let mut options = BatchOptions::default();
744 options.job_timeout = Some(Duration::from_millis(10));
745 options.parallelism = 1;
746
747 let mut processor = BatchProcessor::new(options);
748
749 processor.add_job(BatchJob::Custom {
751 name: "should_timeout".to_string(),
752 operation: Box::new(|| {
753 thread::sleep(Duration::from_millis(100));
754 Ok(())
755 }),
756 });
757
758 let summary = processor.execute().unwrap();
759 assert_eq!(summary.total_jobs, 1);
761 }
762
763 #[test]
764 fn test_batch_processor_memory_limit_enforcement() {
765 let mut options = BatchOptions::default();
767 options.memory_limit_per_worker = 1024; options.parallelism = 2;
769
770 let mut processor = BatchProcessor::new(options);
771
772 for i in 0..5 {
774 processor.add_job(BatchJob::Custom {
775 name: format!("memory_job_{}", i),
776 operation: Box::new(move || {
777 let _data = vec![0u8; 512];
779 Ok(())
780 }),
781 });
782 }
783
784 let summary = processor.execute().unwrap();
785 assert_eq!(summary.total_jobs, 5);
786 }
787
788 #[test]
789 fn test_batch_processor_stop_on_error_propagation() {
790 let mut options = BatchOptions::default();
792 options.stop_on_error = true;
793 options.parallelism = 1; let mut processor = BatchProcessor::new(options);
796
797 processor.add_job(BatchJob::Custom {
799 name: "success_1".to_string(),
800 operation: Box::new(|| Ok(())),
801 });
802
803 processor.add_job(BatchJob::Custom {
805 name: "failure".to_string(),
806 operation: Box::new(|| {
807 Err(crate::error::PdfError::InvalidOperation(
808 "Intentional failure".to_string(),
809 ))
810 }),
811 });
812
813 processor.add_job(BatchJob::Custom {
815 name: "should_not_run".to_string(),
816 operation: Box::new(|| Ok(())),
817 });
818
819 let result = processor.execute();
820 assert!(result.is_err() || result.unwrap().failed > 0);
822 }
823
824 #[test]
825 fn test_batch_processor_concurrent_limit() {
826 use std::sync::atomic::AtomicUsize;
828
829 let concurrent_count = Arc::new(AtomicUsize::new(0));
830 let max_concurrent = Arc::new(AtomicUsize::new(0));
831
832 let mut options = BatchOptions::default();
833 options.parallelism = 2; let mut processor = BatchProcessor::new(options);
836
837 for i in 0..10 {
839 let concurrent = Arc::clone(&concurrent_count);
840 let max = Arc::clone(&max_concurrent);
841
842 processor.add_job(BatchJob::Custom {
843 name: format!("concurrent_{}", i),
844 operation: Box::new(move || {
845 let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
846
847 let mut max_val = max.load(Ordering::SeqCst);
849 while current > max_val {
850 match max.compare_exchange_weak(
851 max_val,
852 current,
853 Ordering::SeqCst,
854 Ordering::SeqCst,
855 ) {
856 Ok(_) => break,
857 Err(x) => max_val = x,
858 }
859 }
860
861 thread::sleep(Duration::from_millis(10));
862 concurrent.fetch_sub(1, Ordering::SeqCst);
863 Ok(())
864 }),
865 });
866 }
867
868 let summary = processor.execute().unwrap();
869 assert_eq!(summary.successful, 10);
870
871 let max_seen = max_concurrent.load(Ordering::SeqCst);
873 assert!(
874 max_seen <= 2,
875 "Max concurrent was {}, expected <= 2",
876 max_seen
877 );
878 }
879
880 #[test]
881 fn test_batch_processor_progress_with_failures() {
882 use std::sync::{Arc, Mutex};
884
885 let progress_updates = Arc::new(Mutex::new(Vec::new()));
886 let progress_clone = Arc::clone(&progress_updates);
887
888 let mut options = BatchOptions::default();
889 options.progress_callback = Some(Arc::new(move |info: &ProgressInfo| {
890 let mut updates = progress_clone.lock().unwrap();
891 updates.push((info.completed_jobs, info.failed_jobs, info.total_jobs));
892 }));
893
894 let mut processor = BatchProcessor::new(options);
895
896 processor.add_job(BatchJob::Custom {
898 name: "success_1".to_string(),
899 operation: Box::new(|| Ok(())),
900 });
901
902 processor.add_job(BatchJob::Custom {
903 name: "fail_1".to_string(),
904 operation: Box::new(|| Err(crate::error::PdfError::InvalidFormat("test".to_string()))),
905 });
906
907 processor.add_job(BatchJob::Custom {
908 name: "success_2".to_string(),
909 operation: Box::new(|| Ok(())),
910 });
911
912 let summary = processor.execute().unwrap();
913 assert_eq!(summary.successful, 2);
914 assert_eq!(summary.failed, 1);
915
916 let updates = progress_updates.lock().unwrap();
918 assert!(!updates.is_empty());
919
920 if let Some(&(completed, failed, total)) = updates.last() {
922 assert_eq!(total, 3);
923 assert_eq!(completed + failed, 3);
924 }
925 }
926}