Skip to main content

oxidize_pdf/batch/
worker.rs

1//! Worker pool for parallel batch processing
2
3use crate::batch::{BatchJob, BatchProgress, JobResult};
4use crate::error::PdfError;
5use crate::operations::page_extraction::extract_pages_to_file;
6use crate::operations::{merge_pdfs, split_pdf};
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{mpsc, Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13/// Options for worker pool
14#[derive(Debug, Clone)]
15pub struct WorkerOptions {
16    /// Number of worker threads
17    pub num_workers: usize,
18    /// Memory limit per worker (bytes)
19    pub memory_limit: usize,
20    /// Timeout for individual jobs
21    pub job_timeout: Option<Duration>,
22}
23
24/// Message sent to workers
25enum WorkerMessage {
26    Job(usize, BatchJob),
27    Shutdown,
28}
29
30/// Worker pool for parallel processing
31pub struct WorkerPool {
32    workers: Vec<Worker>,
33    sender: mpsc::Sender<WorkerMessage>,
34}
35
36impl WorkerPool {
37    /// Create a new worker pool
38    pub fn new(options: WorkerOptions) -> Self {
39        let (sender, receiver) = mpsc::channel();
40        let receiver = Arc::new(Mutex::new(receiver));
41
42        let mut workers = Vec::with_capacity(options.num_workers);
43
44        for id in 0..options.num_workers {
45            workers.push(Worker::new(
46                id,
47                Arc::clone(&receiver),
48                options.memory_limit,
49                options.job_timeout,
50            ));
51        }
52
53        Self { workers, sender }
54    }
55
56    /// Process a batch of jobs
57    pub fn process_jobs(
58        self,
59        jobs: Vec<BatchJob>,
60        progress: Arc<BatchProgress>,
61        cancelled: Arc<AtomicBool>,
62        stop_on_error: bool,
63    ) -> Vec<JobResult> {
64        let num_jobs = jobs.len();
65        let (result_sender, result_receiver) = mpsc::channel();
66
67        // Spawn result collector thread
68        let results = vec![None; num_jobs];
69        let results_handle = {
70            let mut results = results;
71            thread::spawn(move || {
72                for (idx, result) in result_receiver {
73                    results[idx] = Some(result);
74                }
75                results
76            })
77        };
78
79        // Send jobs to workers
80        for (idx, job) in jobs.into_iter().enumerate() {
81            if cancelled.load(Ordering::SeqCst) {
82                let _ = result_sender.send((
83                    idx,
84                    JobResult::Cancelled {
85                        job_name: job.display_name(),
86                    },
87                ));
88                continue;
89            }
90
91            let job_name = job.display_name();
92            let progress_clone = Arc::clone(&progress);
93            let result_sender_clone = result_sender.clone();
94            let cancelled_clone = Arc::clone(&cancelled);
95
96            // Wrap job with progress tracking
97            let wrapped_job = match job {
98                BatchJob::Custom { name, operation } => BatchJob::Custom {
99                    name,
100                    operation: Box::new(move || {
101                        progress_clone.start_job();
102                        let start = Instant::now();
103
104                        let result = if cancelled_clone.load(Ordering::SeqCst) {
105                            Err(PdfError::OperationCancelled)
106                        } else {
107                            operation()
108                        };
109
110                        let duration = start.elapsed();
111
112                        match result {
113                            Ok(()) => {
114                                progress_clone.complete_job();
115                                let _ = result_sender_clone.send((
116                                    idx,
117                                    JobResult::Success {
118                                        job_name: job_name.clone(),
119                                        duration,
120                                        output_files: vec![],
121                                    },
122                                ));
123                            }
124                            Err(ref e) => {
125                                progress_clone.fail_job();
126                                let _ = result_sender_clone.send((
127                                    idx,
128                                    JobResult::Failed {
129                                        job_name: job_name.clone(),
130                                        duration,
131                                        error: e.to_string(),
132                                    },
133                                ));
134                            }
135                        }
136
137                        result
138                    }),
139                },
140                _ => {
141                    // Handle other job types
142                    let progress_clone2 = Arc::clone(&progress);
143                    let result_sender_clone2 = result_sender.clone();
144
145                    BatchJob::Custom {
146                        name: job_name.clone(),
147                        operation: Box::new(move || {
148                            progress_clone2.start_job();
149                            let start = Instant::now();
150
151                            let result = execute_job(job);
152                            let duration = start.elapsed();
153
154                            match &result {
155                                Ok(output_files) => {
156                                    progress_clone2.complete_job();
157                                    let _ = result_sender_clone2.send((
158                                        idx,
159                                        JobResult::Success {
160                                            job_name: job_name.clone(),
161                                            duration,
162                                            output_files: output_files.clone(),
163                                        },
164                                    ));
165                                }
166                                Err(e) => {
167                                    progress_clone2.fail_job();
168                                    let _ = result_sender_clone2.send((
169                                        idx,
170                                        JobResult::Failed {
171                                            job_name: job_name.clone(),
172                                            duration,
173                                            error: e.to_string(),
174                                        },
175                                    ));
176
177                                    if stop_on_error {
178                                        cancelled_clone.store(true, Ordering::SeqCst);
179                                    }
180                                }
181                            }
182
183                            result.map(|_| ())
184                        }),
185                    }
186                }
187            };
188
189            if self
190                .sender
191                .send(WorkerMessage::Job(idx, wrapped_job))
192                .is_err()
193            {
194                break;
195            }
196        }
197
198        // Drop the original sender to close the channel
199        drop(result_sender);
200        drop(self.sender);
201
202        // Wait for workers to finish
203        for worker in self.workers {
204            worker.join();
205        }
206
207        // Collect results
208        let results = results_handle.join().unwrap_or_else(|_| {
209            tracing::debug!("Result collection thread panicked");
210            Vec::new()
211        });
212        results.into_iter().flatten().collect()
213    }
214
215    /// Shutdown the worker pool
216    pub fn shutdown(self) {
217        for _ in &self.workers {
218            let _ = self.sender.send(WorkerMessage::Shutdown);
219        }
220
221        for worker in self.workers {
222            worker.join();
223        }
224    }
225}
226
227/// Worker thread
228struct Worker {
229    #[allow(dead_code)]
230    id: usize,
231    thread: Option<thread::JoinHandle<()>>,
232}
233
234impl Worker {
235    /// Create a new worker
236    fn new(
237        id: usize,
238        receiver: Arc<Mutex<mpsc::Receiver<WorkerMessage>>>,
239        _memory_limit: usize,
240        job_timeout: Option<Duration>,
241    ) -> Self {
242        let thread = thread::spawn(move || {
243            loop {
244                let message = {
245                    let receiver = match receiver.lock() {
246                        Ok(r) => r,
247                        Err(_) => {
248                            tracing::debug!("Worker {} receiver lock poisoned", id);
249                            break;
250                        }
251                    };
252                    receiver.recv()
253                };
254
255                match message {
256                    Ok(WorkerMessage::Job(_idx, job)) => {
257                        // Execute job with optional timeout
258                        if let Some(_timeout) = job_timeout {
259                            // In a real implementation, we'd use a timeout mechanism
260                            // For now, just execute normally
261                            if let BatchJob::Custom { operation, .. } = job {
262                                let _ = operation();
263                            }
264                        } else if let BatchJob::Custom { operation, .. } = job {
265                            let _ = operation();
266                        }
267                    }
268                    Ok(WorkerMessage::Shutdown) => break,
269                    Err(_) => break,
270                }
271            }
272        });
273
274        Self {
275            id,
276            thread: Some(thread),
277        }
278    }
279
280    /// Wait for the worker to finish
281    fn join(mut self) {
282        if let Some(thread) = self.thread.take() {
283            let _ = thread.join();
284        }
285    }
286}
287
288/// Execute a non-custom job
289fn execute_job(job: BatchJob) -> std::result::Result<Vec<PathBuf>, PdfError> {
290    match job {
291        BatchJob::Split {
292            input,
293            output_pattern,
294            pages_per_file,
295        } => {
296            // Create split options
297            let options = crate::operations::SplitOptions {
298                mode: crate::operations::SplitMode::ChunkSize(pages_per_file),
299                output_pattern,
300                preserve_metadata: true,
301                optimize: false,
302            };
303
304            split_pdf(&input, options).map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
305
306            // Return generated files (simplified - would need to track actual outputs)
307            Ok(vec![])
308        }
309
310        BatchJob::Merge { inputs, output } => {
311            let merge_inputs: Vec<_> = inputs
312                .into_iter()
313                .map(crate::operations::MergeInput::new)
314                .collect();
315            let options = crate::operations::MergeOptions::default();
316            merge_pdfs(merge_inputs, &output, options)
317                .map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
318            Ok(vec![output])
319        }
320
321        BatchJob::Rotate {
322            input,
323            output,
324            rotation: _,
325            pages: _,
326        } => {
327            // Rotate not implemented in current API, just copy
328            std::fs::copy(&input, &output)?;
329            Ok(vec![output])
330        }
331
332        BatchJob::Extract {
333            input,
334            output,
335            pages,
336        } => {
337            extract_pages_to_file(&input, &pages, &output)
338                .map_err(|e| PdfError::InvalidStructure(e.to_string()))?;
339            Ok(vec![output])
340        }
341
342        BatchJob::Compress {
343            input,
344            output,
345            quality: _,
346        } => {
347            // Compression not implemented yet, just copy
348            std::fs::copy(&input, &output)?;
349            Ok(vec![output])
350        }
351
352        BatchJob::Custom { .. } => {
353            unreachable!("Custom jobs should be handled separately")
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_worker_pool_creation() {
364        let options = WorkerOptions {
365            num_workers: 2,
366            memory_limit: 1024 * 1024,
367            job_timeout: None,
368        };
369
370        let pool = WorkerPool::new(options);
371        assert_eq!(pool.workers.len(), 2);
372
373        pool.shutdown();
374    }
375
376    #[test]
377    fn test_worker_pool_empty_jobs() {
378        let options = WorkerOptions {
379            num_workers: 2,
380            memory_limit: 1024 * 1024,
381            job_timeout: None,
382        };
383
384        let pool = WorkerPool::new(options);
385        let progress = Arc::new(BatchProgress::new());
386        let cancelled = Arc::new(AtomicBool::new(false));
387
388        let results = pool.process_jobs(vec![], progress, cancelled, false);
389        assert_eq!(results.len(), 0);
390    }
391
392    #[test]
393    fn test_worker_pool_custom_jobs() {
394        let options = WorkerOptions {
395            num_workers: 2,
396            memory_limit: 1024 * 1024,
397            job_timeout: None,
398        };
399
400        let pool = WorkerPool::new(options);
401        let progress = Arc::new(BatchProgress::new());
402        let cancelled = Arc::new(AtomicBool::new(false));
403
404        let jobs = vec![
405            BatchJob::Custom {
406                name: "Test Job 1".to_string(),
407                operation: Box::new(|| Ok(())),
408            },
409            BatchJob::Custom {
410                name: "Test Job 2".to_string(),
411                operation: Box::new(|| Ok(())),
412            },
413        ];
414
415        progress.add_job();
416        progress.add_job();
417
418        let results = pool.process_jobs(jobs, progress, cancelled, false);
419
420        assert_eq!(results.len(), 2);
421        assert!(results.iter().all(|r| r.is_success()));
422    }
423
424    #[test]
425    fn test_worker_pool_with_failures() {
426        let options = WorkerOptions {
427            num_workers: 1,
428            memory_limit: 1024 * 1024,
429            job_timeout: None,
430        };
431
432        let pool = WorkerPool::new(options);
433        let progress = Arc::new(BatchProgress::new());
434        let cancelled = Arc::new(AtomicBool::new(false));
435
436        let jobs = vec![
437            BatchJob::Custom {
438                name: "Success Job".to_string(),
439                operation: Box::new(|| Ok(())),
440            },
441            BatchJob::Custom {
442                name: "Failing Job".to_string(),
443                operation: Box::new(|| Err(PdfError::InvalidStructure("Test error".to_string()))),
444            },
445        ];
446
447        progress.add_job();
448        progress.add_job();
449
450        let results = pool.process_jobs(jobs, progress, cancelled, false);
451
452        assert_eq!(results.len(), 2);
453        assert!(results[0].is_success());
454        assert!(results[1].is_failed());
455    }
456
457    #[test]
458    fn test_worker_pool_shutdown_with_active_jobs() {
459        // Test graceful shutdown while jobs are running
460        let options = WorkerOptions {
461            num_workers: 2,
462            memory_limit: 1024 * 1024,
463            job_timeout: None,
464        };
465
466        let pool = WorkerPool::new(options);
467        let progress = Arc::new(BatchProgress::new());
468        let cancelled = Arc::new(AtomicBool::new(false));
469
470        // Jobs that take time to complete
471        let jobs = vec![BatchJob::Custom {
472            name: "Long Running Job".to_string(),
473            operation: Box::new(|| {
474                std::thread::sleep(std::time::Duration::from_millis(50));
475                Ok(())
476            }),
477        }];
478
479        progress.add_job();
480
481        // Process jobs and shutdown - should complete gracefully
482        let results = pool.process_jobs(jobs, progress, cancelled, false);
483        assert_eq!(results.len(), 1);
484        assert!(results[0].is_success());
485    }
486
487    #[test]
488    fn test_worker_pool_job_panic_recovery() {
489        // Test that worker pool recovers from panicking jobs
490        let options = WorkerOptions {
491            num_workers: 1,
492            memory_limit: 1024 * 1024,
493            job_timeout: None,
494        };
495
496        let pool = WorkerPool::new(options);
497        let progress = Arc::new(BatchProgress::new());
498        let cancelled = Arc::new(AtomicBool::new(false));
499
500        let jobs = vec![
501            BatchJob::Custom {
502                name: "Panicking Job".to_string(),
503                operation: Box::new(|| {
504                    // Convert panic to error for testing
505                    Err(PdfError::InvalidStructure("Simulated panic".to_string()))
506                }),
507            },
508            BatchJob::Custom {
509                name: "Normal Job".to_string(),
510                operation: Box::new(|| Ok(())),
511            },
512        ];
513
514        progress.add_job();
515        progress.add_job();
516
517        let results = pool.process_jobs(jobs, progress, cancelled, false);
518
519        assert_eq!(results.len(), 2);
520        assert!(results[0].is_failed());
521        assert!(results[1].is_success());
522    }
523
524    #[test]
525    fn test_worker_pool_memory_pressure() {
526        // Test behavior under memory constraints
527        let options = WorkerOptions {
528            num_workers: 1,
529            memory_limit: 1024, // Very low limit
530            job_timeout: None,
531        };
532
533        let pool = WorkerPool::new(options);
534        let progress = Arc::new(BatchProgress::new());
535        let cancelled = Arc::new(AtomicBool::new(false));
536
537        let jobs = vec![BatchJob::Custom {
538            name: "Memory Test Job".to_string(),
539            operation: Box::new(|| {
540                // Simulate work that could use memory
541                let _data = vec![0u8; 512]; // Small allocation
542                Ok(())
543            }),
544        }];
545
546        progress.add_job();
547
548        let results = pool.process_jobs(jobs, progress, cancelled, false);
549        assert_eq!(results.len(), 1);
550        // Should succeed with small allocation
551        assert!(results[0].is_success());
552    }
553
554    #[test]
555    fn test_worker_pool_cancellation_during_processing() {
556        // Test cancellation while jobs are being processed
557        use std::sync::atomic::{AtomicBool, Ordering};
558        use std::sync::Arc;
559
560        let options = WorkerOptions {
561            num_workers: 1,
562            memory_limit: 1024 * 1024,
563            job_timeout: None,
564        };
565
566        let pool = WorkerPool::new(options);
567        let progress = Arc::new(BatchProgress::new());
568        let cancelled = Arc::new(AtomicBool::new(false));
569
570        let cancelled_clone = Arc::clone(&cancelled);
571        let jobs = vec![
572            BatchJob::Custom {
573                name: "Job Before Cancel".to_string(),
574                operation: Box::new(|| Ok(())),
575            },
576            BatchJob::Custom {
577                name: "Job After Cancel".to_string(),
578                operation: Box::new(move || {
579                    // This should be cancelled
580                    if cancelled_clone.load(Ordering::SeqCst) {
581                        Err(PdfError::InvalidStructure("Cancelled".to_string()))
582                    } else {
583                        Ok(())
584                    }
585                }),
586            },
587        ];
588
589        progress.add_job();
590        progress.add_job();
591
592        // Cancel after starting
593        cancelled.store(true, Ordering::SeqCst);
594
595        let results = pool.process_jobs(jobs, progress, cancelled, false);
596        assert_eq!(results.len(), 2);
597        // Some jobs might be cancelled
598    }
599
600    #[test]
601    fn test_worker_pool_timeout_handling() {
602        // Test job timeout handling
603        let options = WorkerOptions {
604            num_workers: 1,
605            memory_limit: 1024 * 1024,
606            job_timeout: Some(std::time::Duration::from_millis(10)), // Very short timeout
607        };
608
609        let pool = WorkerPool::new(options);
610        let progress = Arc::new(BatchProgress::new());
611        let cancelled = Arc::new(AtomicBool::new(false));
612
613        let jobs = vec![
614            BatchJob::Custom {
615                name: "Quick Job".to_string(),
616                operation: Box::new(|| Ok(())), // Should complete quickly
617            },
618            BatchJob::Custom {
619                name: "Slow Job".to_string(),
620                operation: Box::new(|| {
621                    // Simulate timeout scenario with error
622                    std::thread::sleep(std::time::Duration::from_millis(5));
623                    Ok(())
624                }),
625            },
626        ];
627
628        progress.add_job();
629        progress.add_job();
630
631        let results = pool.process_jobs(jobs, Arc::clone(&progress), cancelled, false);
632
633        assert_eq!(results.len(), 2);
634        assert_eq!(results.iter().filter(|r| r.is_success()).count(), 2);
635        assert_eq!(results.iter().filter(|r| r.is_failed()).count(), 0);
636
637        let info = progress.get_info();
638        assert_eq!(info.completed_jobs, 2);
639        assert_eq!(info.failed_jobs, 0);
640    }
641
642    #[test]
643    fn test_worker_pool_cancellation() {
644        let options = WorkerOptions {
645            num_workers: 1,
646            memory_limit: 1024 * 1024,
647            job_timeout: None,
648        };
649
650        let pool = WorkerPool::new(options);
651        let progress = Arc::new(BatchProgress::new());
652        let cancelled = Arc::new(AtomicBool::new(true)); // Pre-cancelled
653
654        let jobs = vec![BatchJob::Custom {
655            name: "Should be cancelled".to_string(),
656            operation: Box::new(|| Ok(())),
657        }];
658
659        progress.add_job();
660
661        let results = pool.process_jobs(jobs, progress, cancelled, false);
662
663        assert_eq!(results.len(), 1);
664        assert!(results[0].is_cancelled());
665    }
666}