1use crate::batch::{BatchJob, BatchProgress, JobResult};
4use crate::error::PdfError;
5use crate::operations::page_extraction::extract_pages_to_file;
6use crate::operations::{merge_pdfs, split_pdf};
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone)]
15pub struct WorkerOptions {
16 pub num_workers: usize,
18 pub memory_limit: usize,
20 pub job_timeout: Option<Duration>,
22}
23
24enum WorkerMessage {
26 Job(usize, BatchJob),
27 Shutdown,
28}
29
30pub struct WorkerPool {
32 workers: Vec<Worker>,
33 sender: mpsc::Sender<WorkerMessage>,
34}
35
36impl WorkerPool {
37 pub fn new(options: WorkerOptions) -> Self {
39 let (sender, receiver) = mpsc::channel();
40 let receiver = Arc::new(Mutex::new(receiver));
41
42 let mut workers = Vec::with_capacity(options.num_workers);
43
44 for id in 0..options.num_workers {
45 workers.push(Worker::new(
46 id,
47 Arc::clone(&receiver),
48 options.memory_limit,
49 options.job_timeout,
50 ));
51 }
52
53 Self { workers, sender }
54 }
55
56 pub fn process_jobs(
58 self,
59 jobs: Vec<BatchJob>,
60 progress: Arc<BatchProgress>,
61 cancelled: Arc<AtomicBool>,
62 stop_on_error: bool,
63 ) -> Vec<JobResult> {
64 let num_jobs = jobs.len();
65 let (result_sender, result_receiver) = mpsc::channel();
66
67 let results = vec![None; num_jobs];
69 let results_handle = {
70 let mut results = results;
71 thread::spawn(move || {
72 for (idx, result) in result_receiver {
73 results[idx] = Some(result);
74 }
75 results
76 })
77 };
78
79 for (idx, job) in jobs.into_iter().enumerate() {
81 if cancelled.load(Ordering::SeqCst) {
82 let _ = result_sender.send((
83 idx,
84 JobResult::Cancelled {
85 job_name: job.display_name(),
86 },
87 ));
88 continue;
89 }
90
91 let job_name = job.display_name();
92 let progress_clone = Arc::clone(&progress);
93 let result_sender_clone = result_sender.clone();
94 let cancelled_clone = Arc::clone(&cancelled);
95
96 let wrapped_job = match job {
98 BatchJob::Custom { name, operation } => BatchJob::Custom {
99 name,
100 operation: Box::new(move || {
101 progress_clone.start_job();
102 let start = Instant::now();
103
104 let result = if cancelled_clone.load(Ordering::SeqCst) {
105 Err(PdfError::OperationCancelled)
106 } else {
107 operation()
108 };
109
110 let duration = start.elapsed();
111
112 match result {
113 Ok(()) => {
114 progress_clone.complete_job();
115 let _ = result_sender_clone.send((
116 idx,
117 JobResult::Success {
118 job_name: job_name.clone(),
119 duration,
120 output_files: vec![],
121 },
122 ));
123 }
124 Err(ref e) => {
125 progress_clone.fail_job();
126 let _ = result_sender_clone.send((
127 idx,
128 JobResult::Failed {
129 job_name: job_name.clone(),
130 duration,
131 error: e.to_string(),
132 },
133 ));
134 }
135 }
136
137 result
138 }),
139 },
140 _ => {
141 let progress_clone2 = Arc::clone(&progress);
143 let result_sender_clone2 = result_sender.clone();
144
145 BatchJob::Custom {
146 name: job_name.clone(),
147 operation: Box::new(move || {
148 progress_clone2.start_job();
149 let start = Instant::now();
150
151 let result = execute_job(job);
152 let duration = start.elapsed();
153
154 match &result {
155 Ok(output_files) => {
156 progress_clone2.complete_job();
157 let _ = result_sender_clone2.send((
158 idx,
159 JobResult::Success {
160 job_name: job_name.clone(),
161 duration,
162 output_files: output_files.clone(),
163 },
164 ));
165 }
166 Err(e) => {
167 progress_clone2.fail_job();
168 let _ = result_sender_clone2.send((
169 idx,
170 JobResult::Failed {
171 job_name: job_name.clone(),
172 duration,
173 error: e.to_string(),
174 },
175 ));
176
177 if stop_on_error {
178 cancelled_clone.store(true, Ordering::SeqCst);
179 }
180 }
181 }
182
183 result.map(|_| ())
184 }),
185 }
186 }
187 };
188
189 if self
190 .sender
191 .send(WorkerMessage::Job(idx, wrapped_job))
192 .is_err()
193 {
194 break;
195 }
196 }
197
198 drop(result_sender);
200 drop(self.sender);
201
202 for worker in self.workers {
204 worker.join();
205 }
206
207 let results = results_handle.join().unwrap_or_else(|_| {
209 tracing::debug!("Result collection thread panicked");
210 Vec::new()
211 });
212 results.into_iter().flatten().collect()
213 }
214
215 pub fn shutdown(self) {
217 for _ in &self.workers {
218 let _ = self.sender.send(WorkerMessage::Shutdown);
219 }
220
221 for worker in self.workers {
222 worker.join();
223 }
224 }
225}
226
227struct Worker {
229 #[allow(dead_code)]
230 id: usize,
231 thread: Option<thread::JoinHandle<()>>,
232}
233
234impl Worker {
235 fn new(
237 id: usize,
238 receiver: Arc<Mutex<mpsc::Receiver<WorkerMessage>>>,
239 _memory_limit: usize,
240 job_timeout: Option<Duration>,
241 ) -> Self {
242 let thread = thread::spawn(move || {
243 loop {
244 let message = {
245 let receiver = match receiver.lock() {
246 Ok(r) => r,
247 Err(_) => {
248 tracing::debug!("Worker {} receiver lock poisoned", id);
249 break;
250 }
251 };
252 receiver.recv()
253 };
254
255 match message {
256 Ok(WorkerMessage::Job(_idx, job)) => {
257 if let Some(_timeout) = job_timeout {
259 if let BatchJob::Custom { operation, .. } = job {
262 let _ = operation();
263 }
264 } else if let BatchJob::Custom { operation, .. } = job {
265 let _ = operation();
266 }
267 }
268 Ok(WorkerMessage::Shutdown) => break,
269 Err(_) => break,
270 }
271 }
272 });
273
274 Self {
275 id,
276 thread: Some(thread),
277 }
278 }
279
280 fn join(mut self) {
282 if let Some(thread) = self.thread.take() {
283 let _ = thread.join();
284 }
285 }
286}
287
288fn execute_job(job: BatchJob) -> std::result::Result<Vec<PathBuf>, PdfError> {
290 match job {
291 BatchJob::Split {
292 input,
293 output_pattern,
294 pages_per_file,
295 } => {
296 let options = crate::operations::SplitOptions {
298 mode: crate::operations::SplitMode::ChunkSize(pages_per_file),
299 output_pattern,
300 preserve_metadata: true,
301 optimize: false,
302 };
303
304 split_pdf(&input, options).map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
305
306 Ok(vec![])
308 }
309
310 BatchJob::Merge { inputs, output } => {
311 let merge_inputs: Vec<_> = inputs
312 .into_iter()
313 .map(crate::operations::MergeInput::new)
314 .collect();
315 let options = crate::operations::MergeOptions::default();
316 merge_pdfs(merge_inputs, &output, options)
317 .map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
318 Ok(vec![output])
319 }
320
321 BatchJob::Rotate {
322 input,
323 output,
324 rotation: _,
325 pages: _,
326 } => {
327 std::fs::copy(&input, &output)?;
329 Ok(vec![output])
330 }
331
332 BatchJob::Extract {
333 input,
334 output,
335 pages,
336 } => {
337 extract_pages_to_file(&input, &pages, &output)
338 .map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
339 Ok(vec![output])
340 }
341
342 BatchJob::Compress {
343 input,
344 output,
345 quality: _,
346 } => {
347 std::fs::copy(&input, &output)?;
349 Ok(vec![output])
350 }
351
352 BatchJob::Custom { .. } => {
353 unreachable!("Custom jobs should be handled separately")
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_worker_pool_creation() {
364 let options = WorkerOptions {
365 num_workers: 2,
366 memory_limit: 1024 * 1024,
367 job_timeout: None,
368 };
369
370 let pool = WorkerPool::new(options);
371 assert_eq!(pool.workers.len(), 2);
372
373 pool.shutdown();
374 }
375
376 #[test]
377 fn test_worker_pool_empty_jobs() {
378 let options = WorkerOptions {
379 num_workers: 2,
380 memory_limit: 1024 * 1024,
381 job_timeout: None,
382 };
383
384 let pool = WorkerPool::new(options);
385 let progress = Arc::new(BatchProgress::new());
386 let cancelled = Arc::new(AtomicBool::new(false));
387
388 let results = pool.process_jobs(vec![], progress, cancelled, false);
389 assert_eq!(results.len(), 0);
390 }
391
392 #[test]
393 fn test_worker_pool_custom_jobs() {
394 let options = WorkerOptions {
395 num_workers: 2,
396 memory_limit: 1024 * 1024,
397 job_timeout: None,
398 };
399
400 let pool = WorkerPool::new(options);
401 let progress = Arc::new(BatchProgress::new());
402 let cancelled = Arc::new(AtomicBool::new(false));
403
404 let jobs = vec![
405 BatchJob::Custom {
406 name: "Test Job 1".to_string(),
407 operation: Box::new(|| Ok(())),
408 },
409 BatchJob::Custom {
410 name: "Test Job 2".to_string(),
411 operation: Box::new(|| Ok(())),
412 },
413 ];
414
415 progress.add_job();
416 progress.add_job();
417
418 let results = pool.process_jobs(jobs, progress, cancelled, false);
419
420 assert_eq!(results.len(), 2);
421 assert!(results.iter().all(|r| r.is_success()));
422 }
423
424 #[test]
425 fn test_worker_pool_with_failures() {
426 let options = WorkerOptions {
427 num_workers: 1,
428 memory_limit: 1024 * 1024,
429 job_timeout: None,
430 };
431
432 let pool = WorkerPool::new(options);
433 let progress = Arc::new(BatchProgress::new());
434 let cancelled = Arc::new(AtomicBool::new(false));
435
436 let jobs = vec![
437 BatchJob::Custom {
438 name: "Success Job".to_string(),
439 operation: Box::new(|| Ok(())),
440 },
441 BatchJob::Custom {
442 name: "Failing Job".to_string(),
443 operation: Box::new(|| Err(PdfError::InvalidStructure("Test error".to_string()))),
444 },
445 ];
446
447 progress.add_job();
448 progress.add_job();
449
450 let results = pool.process_jobs(jobs, progress, cancelled, false);
451
452 assert_eq!(results.len(), 2);
453 assert!(results[0].is_success());
454 assert!(results[1].is_failed());
455 }
456
457 #[test]
458 fn test_worker_pool_shutdown_with_active_jobs() {
459 let options = WorkerOptions {
461 num_workers: 2,
462 memory_limit: 1024 * 1024,
463 job_timeout: None,
464 };
465
466 let pool = WorkerPool::new(options);
467 let progress = Arc::new(BatchProgress::new());
468 let cancelled = Arc::new(AtomicBool::new(false));
469
470 let jobs = vec![BatchJob::Custom {
472 name: "Long Running Job".to_string(),
473 operation: Box::new(|| {
474 std::thread::sleep(std::time::Duration::from_millis(50));
475 Ok(())
476 }),
477 }];
478
479 progress.add_job();
480
481 let results = pool.process_jobs(jobs, progress, cancelled, false);
483 assert_eq!(results.len(), 1);
484 assert!(results[0].is_success());
485 }
486
487 #[test]
488 fn test_worker_pool_job_panic_recovery() {
489 let options = WorkerOptions {
491 num_workers: 1,
492 memory_limit: 1024 * 1024,
493 job_timeout: None,
494 };
495
496 let pool = WorkerPool::new(options);
497 let progress = Arc::new(BatchProgress::new());
498 let cancelled = Arc::new(AtomicBool::new(false));
499
500 let jobs = vec![
501 BatchJob::Custom {
502 name: "Panicking Job".to_string(),
503 operation: Box::new(|| {
504 Err(PdfError::InvalidStructure("Simulated panic".to_string()))
506 }),
507 },
508 BatchJob::Custom {
509 name: "Normal Job".to_string(),
510 operation: Box::new(|| Ok(())),
511 },
512 ];
513
514 progress.add_job();
515 progress.add_job();
516
517 let results = pool.process_jobs(jobs, progress, cancelled, false);
518
519 assert_eq!(results.len(), 2);
520 assert!(results[0].is_failed());
521 assert!(results[1].is_success());
522 }
523
524 #[test]
525 fn test_worker_pool_memory_pressure() {
526 let options = WorkerOptions {
528 num_workers: 1,
529 memory_limit: 1024, job_timeout: None,
531 };
532
533 let pool = WorkerPool::new(options);
534 let progress = Arc::new(BatchProgress::new());
535 let cancelled = Arc::new(AtomicBool::new(false));
536
537 let jobs = vec![BatchJob::Custom {
538 name: "Memory Test Job".to_string(),
539 operation: Box::new(|| {
540 let _data = vec![0u8; 512]; Ok(())
543 }),
544 }];
545
546 progress.add_job();
547
548 let results = pool.process_jobs(jobs, progress, cancelled, false);
549 assert_eq!(results.len(), 1);
550 assert!(results[0].is_success());
552 }
553
554 #[test]
555 fn test_worker_pool_cancellation_during_processing() {
556 use std::sync::atomic::{AtomicBool, Ordering};
558 use std::sync::Arc;
559
560 let options = WorkerOptions {
561 num_workers: 1,
562 memory_limit: 1024 * 1024,
563 job_timeout: None,
564 };
565
566 let pool = WorkerPool::new(options);
567 let progress = Arc::new(BatchProgress::new());
568 let cancelled = Arc::new(AtomicBool::new(false));
569
570 let cancelled_clone = Arc::clone(&cancelled);
571 let jobs = vec![
572 BatchJob::Custom {
573 name: "Job Before Cancel".to_string(),
574 operation: Box::new(|| Ok(())),
575 },
576 BatchJob::Custom {
577 name: "Job After Cancel".to_string(),
578 operation: Box::new(move || {
579 if cancelled_clone.load(Ordering::SeqCst) {
581 Err(PdfError::InvalidStructure("Cancelled".to_string()))
582 } else {
583 Ok(())
584 }
585 }),
586 },
587 ];
588
589 progress.add_job();
590 progress.add_job();
591
592 cancelled.store(true, Ordering::SeqCst);
594
595 let results = pool.process_jobs(jobs, progress, cancelled, false);
596 assert_eq!(results.len(), 2);
597 }
599
600 #[test]
601 fn test_worker_pool_timeout_handling() {
602 let options = WorkerOptions {
604 num_workers: 1,
605 memory_limit: 1024 * 1024,
606 job_timeout: Some(std::time::Duration::from_millis(10)), };
608
609 let pool = WorkerPool::new(options);
610 let progress = Arc::new(BatchProgress::new());
611 let cancelled = Arc::new(AtomicBool::new(false));
612
613 let jobs = vec![
614 BatchJob::Custom {
615 name: "Quick Job".to_string(),
616 operation: Box::new(|| Ok(())), },
618 BatchJob::Custom {
619 name: "Slow Job".to_string(),
620 operation: Box::new(|| {
621 std::thread::sleep(std::time::Duration::from_millis(5));
623 Ok(())
624 }),
625 },
626 ];
627
628 progress.add_job();
629 progress.add_job();
630
631 let results = pool.process_jobs(jobs, Arc::clone(&progress), cancelled, false);
632
633 assert_eq!(results.len(), 2);
634 assert_eq!(results.iter().filter(|r| r.is_success()).count(), 2);
635 assert_eq!(results.iter().filter(|r| r.is_failed()).count(), 0);
636
637 let info = progress.get_info();
638 assert_eq!(info.completed_jobs, 2);
639 assert_eq!(info.failed_jobs, 0);
640 }
641
642 #[test]
643 fn test_worker_pool_cancellation() {
644 let options = WorkerOptions {
645 num_workers: 1,
646 memory_limit: 1024 * 1024,
647 job_timeout: None,
648 };
649
650 let pool = WorkerPool::new(options);
651 let progress = Arc::new(BatchProgress::new());
652 let cancelled = Arc::new(AtomicBool::new(true)); let jobs = vec![BatchJob::Custom {
655 name: "Should be cancelled".to_string(),
656 operation: Box::new(|| Ok(())),
657 }];
658
659 progress.add_job();
660
661 let results = pool.process_jobs(jobs, progress, cancelled, false);
662
663 assert_eq!(results.len(), 1);
664 assert!(results[0].is_cancelled());
665 }
666}