1use crate::error::Result;
51use std::path::{Path, PathBuf};
52use std::sync::{
53 atomic::{AtomicBool, Ordering},
54 Arc, Mutex,
55};
56use std::thread;
57use std::time::{Duration, Instant};
58
59pub mod job;
60pub mod progress;
61pub mod result;
62pub mod worker;
63
64pub use job::{BatchJob, JobStatus, JobType};
66pub use progress::{BatchProgress, ProgressCallback, ProgressInfo};
67pub use result::{BatchResult, BatchSummary, JobResult};
68pub use worker::{WorkerOptions, WorkerPool};
69
70#[derive(Clone)]
72pub struct BatchOptions {
73 pub parallelism: usize,
75 pub memory_limit_per_worker: usize,
77 pub progress_interval: Duration,
79 pub stop_on_error: bool,
81 pub progress_callback: Option<Arc<dyn ProgressCallback>>,
83 pub job_timeout: Option<Duration>,
85}
86
87impl Default for BatchOptions {
88 fn default() -> Self {
89 Self {
90 parallelism: num_cpus::get().min(8),
91 memory_limit_per_worker: 512 * 1024 * 1024, progress_interval: Duration::from_millis(100),
93 stop_on_error: false,
94 progress_callback: None,
95 job_timeout: Some(Duration::from_secs(300)), }
97 }
98}
99
100impl BatchOptions {
101 pub fn with_parallelism(mut self, parallelism: usize) -> Self {
103 self.parallelism = parallelism.max(1);
104 self
105 }
106
107 pub fn with_memory_limit(mut self, bytes: usize) -> Self {
109 self.memory_limit_per_worker = bytes;
110 self
111 }
112
113 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
115 where
116 F: Fn(&ProgressInfo) + Send + Sync + 'static,
117 {
118 self.progress_callback = Some(Arc::new(callback));
119 self
120 }
121
122 pub fn stop_on_error(mut self, stop: bool) -> Self {
124 self.stop_on_error = stop;
125 self
126 }
127
128 pub fn with_job_timeout(mut self, timeout: Duration) -> Self {
130 self.job_timeout = Some(timeout);
131 self
132 }
133}
134
135pub struct BatchProcessor {
137 options: BatchOptions,
138 jobs: Vec<BatchJob>,
139 cancelled: Arc<AtomicBool>,
140 progress: Arc<BatchProgress>,
141}
142
143impl BatchProcessor {
144 pub fn new(options: BatchOptions) -> Self {
146 Self {
147 options,
148 jobs: Vec::new(),
149 cancelled: Arc::new(AtomicBool::new(false)),
150 progress: Arc::new(BatchProgress::new()),
151 }
152 }
153
154 pub fn add_job(&mut self, job: BatchJob) {
156 self.jobs.push(job);
157 self.progress.add_job();
158 }
159
160 pub fn add_jobs(&mut self, jobs: impl IntoIterator<Item = BatchJob>) {
162 for job in jobs {
163 self.add_job(job);
164 }
165 }
166
167 pub fn cancel(&self) {
169 self.cancelled.store(true, Ordering::SeqCst);
170 }
171
172 pub fn is_cancelled(&self) -> bool {
174 self.cancelled.load(Ordering::SeqCst)
175 }
176
177 pub fn execute(self) -> Result<BatchSummary> {
179 let start_time = Instant::now();
180 let total_jobs = self.jobs.len();
181
182 if total_jobs == 0 {
183 return Ok(BatchSummary::empty());
184 }
185
186 let worker_options = WorkerOptions {
188 num_workers: self.options.parallelism,
189 memory_limit: self.options.memory_limit_per_worker,
190 job_timeout: self.options.job_timeout,
191 };
192
193 let pool = WorkerPool::new(worker_options);
194 let _results = Arc::new(Mutex::new(Vec::<JobResult>::new()));
195 let _errors = Arc::new(Mutex::new(Vec::<String>::new()));
196
197 let progress_handle = if let Some(callback) = &self.options.progress_callback {
199 let progress = Arc::clone(&self.progress);
200 let callback = Arc::clone(callback);
201 let interval = self.options.progress_interval;
202 let cancelled = Arc::clone(&self.cancelled);
203
204 Some(thread::spawn(move || {
205 while !cancelled.load(Ordering::SeqCst) {
206 let info = progress.get_info();
207 callback.on_progress(&info);
208
209 if info.is_complete() {
210 break;
211 }
212
213 thread::sleep(interval);
214 }
215 }))
216 } else {
217 None
218 };
219
220 let job_results = pool.process_jobs(
222 self.jobs,
223 Arc::clone(&self.progress),
224 Arc::clone(&self.cancelled),
225 self.options.stop_on_error,
226 );
227
228 let mut successful = 0;
230 let mut failed = 0;
231 let mut all_results = Vec::new();
232
233 for result in job_results {
234 match &result {
235 JobResult::Success { .. } => successful += 1,
236 JobResult::Failed { .. } => failed += 1,
237 JobResult::Cancelled { .. } => {}
238 }
239 all_results.push(result);
240 }
241
242 if let Some(handle) = progress_handle {
244 let _ = handle.join();
245 }
246
247 if let Some(callback) = &self.options.progress_callback {
249 let final_info = self.progress.get_info();
250 callback.on_progress(&final_info);
251 }
252
253 Ok(BatchSummary {
254 total_jobs,
255 successful,
256 failed,
257 cancelled: self.cancelled.load(Ordering::SeqCst),
258 duration: start_time.elapsed(),
259 results: all_results,
260 })
261 }
262
263 pub fn get_progress(&self) -> ProgressInfo {
265 self.progress.get_info()
266 }
267}
268
269pub fn batch_process_files<P, F>(
271 files: Vec<P>,
272 operation: F,
273 options: BatchOptions,
274) -> Result<BatchSummary>
275where
276 P: AsRef<Path>,
277 F: Fn(&Path) -> Result<()> + Clone + Send + 'static,
278{
279 let mut processor = BatchProcessor::new(options);
280
281 for file in files {
282 let path = file.as_ref().to_path_buf();
283 let op = operation.clone();
284
285 processor.add_job(BatchJob::Custom {
286 name: format!("Process {}", path.display()),
287 operation: Box::new(move || op(&path)),
288 });
289 }
290
291 processor.execute()
292}
293
294pub fn batch_split_pdfs<P: AsRef<Path>>(
296 files: Vec<P>,
297 pages_per_file: usize,
298 options: BatchOptions,
299) -> Result<BatchSummary> {
300 let mut processor = BatchProcessor::new(options);
301
302 for file in files {
303 let path = file.as_ref();
304 processor.add_job(BatchJob::Split {
305 input: path.to_path_buf(),
306 output_pattern: format!(
307 "{}_page_%d.pdf",
308 path.file_stem().unwrap().to_str().unwrap()
309 ),
310 pages_per_file,
311 });
312 }
313
314 processor.execute()
315}
316
317pub fn batch_merge_pdfs(
319 merge_groups: Vec<(Vec<PathBuf>, PathBuf)>,
320 options: BatchOptions,
321) -> Result<BatchSummary> {
322 let mut processor = BatchProcessor::new(options);
323
324 for (inputs, output) in merge_groups {
325 processor.add_job(BatchJob::Merge { inputs, output });
326 }
327
328 processor.execute()
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_batch_options_default() {
337 let options = BatchOptions::default();
338 assert!(options.parallelism > 0);
339 assert!(options.parallelism <= 8);
340 assert_eq!(options.memory_limit_per_worker, 512 * 1024 * 1024);
341 assert!(!options.stop_on_error);
342 }
343
344 #[test]
345 fn test_batch_options_builder() {
346 let called = Arc::new(AtomicBool::new(false));
347 let called_clone = Arc::clone(&called);
348
349 let options = BatchOptions::default()
350 .with_parallelism(4)
351 .with_memory_limit(1024 * 1024 * 1024)
352 .stop_on_error(true)
353 .with_job_timeout(Duration::from_secs(60))
354 .with_progress_callback(move |_info| {
355 called_clone.store(true, Ordering::SeqCst);
356 });
357
358 assert_eq!(options.parallelism, 4);
359 assert_eq!(options.memory_limit_per_worker, 1024 * 1024 * 1024);
360 assert!(options.stop_on_error);
361 assert_eq!(options.job_timeout, Some(Duration::from_secs(60)));
362 assert!(options.progress_callback.is_some());
363 }
364
365 #[test]
366 fn test_batch_processor_creation() {
367 let processor = BatchProcessor::new(BatchOptions::default());
368 assert_eq!(processor.jobs.len(), 0);
369 assert!(!processor.is_cancelled());
370 }
371
372 #[test]
373 fn test_batch_processor_add_jobs() {
374 let mut processor = BatchProcessor::new(BatchOptions::default());
375
376 processor.add_job(BatchJob::Custom {
377 name: "Test Job 1".to_string(),
378 operation: Box::new(|| Ok(())),
379 });
380
381 processor.add_jobs(vec![
382 BatchJob::Custom {
383 name: "Test Job 2".to_string(),
384 operation: Box::new(|| Ok(())),
385 },
386 BatchJob::Custom {
387 name: "Test Job 3".to_string(),
388 operation: Box::new(|| Ok(())),
389 },
390 ]);
391
392 assert_eq!(processor.jobs.len(), 3);
393 }
394
395 #[test]
396 fn test_batch_processor_cancel() {
397 let processor = BatchProcessor::new(BatchOptions::default());
398 assert!(!processor.is_cancelled());
399
400 processor.cancel();
401 assert!(processor.is_cancelled());
402 }
403
404 #[test]
405 fn test_empty_batch_execution() {
406 let processor = BatchProcessor::new(BatchOptions::default());
407 let summary = processor.execute().unwrap();
408
409 assert_eq!(summary.total_jobs, 0);
410 assert_eq!(summary.successful, 0);
411 assert_eq!(summary.failed, 0);
412 assert!(!summary.cancelled);
413 }
414}