Skip to main content

superbook_pdf/
parallel.rs

1//! Parallel processing utilities for image pipeline
2//!
3//! # Overview
4//!
5//! This module provides parallel processing capabilities using Rayon
6//! to speed up image processing operations across multiple CPU cores.
7//!
8//! # Example
9//!
10//! ```ignore
11//! use superbook_pdf::parallel::{ParallelOptions, parallel_process};
12//!
13//! let options = ParallelOptions::default();
14//! let results = parallel_process(&image_paths, |path| {
15//!     process_image(path)
16//! }, &options);
17//! ```
18
19use rayon::prelude::*;
20use std::error::Error;
21use std::fmt;
22use std::path::{Path, PathBuf};
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26
27/// Error type for parallel processing operations
28#[derive(Debug, Clone)]
29pub enum ParallelError {
30    /// Thread pool creation failed
31    ThreadPoolError(String),
32    /// Processing error with page index
33    ProcessingError { index: usize, message: String },
34    /// All tasks failed
35    AllTasksFailed(usize),
36}
37
38impl fmt::Display for ParallelError {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            Self::ThreadPoolError(msg) => write!(f, "Thread pool error: {}", msg),
42            Self::ProcessingError { index, message } => {
43                write!(f, "Processing error at index {}: {}", index, message)
44            }
45            Self::AllTasksFailed(count) => write!(f, "All {} tasks failed", count),
46        }
47    }
48}
49
50impl Error for ParallelError {}
51
52/// Options for parallel processing
53#[derive(Debug, Clone)]
54pub struct ParallelOptions {
55    /// Number of threads (0 = auto-detect based on CPU cores)
56    pub num_threads: usize,
57    /// Chunk size for memory-controlled processing (0 = process all at once)
58    pub chunk_size: usize,
59    /// Whether to continue on errors
60    pub continue_on_error: bool,
61}
62
63impl Default for ParallelOptions {
64    fn default() -> Self {
65        Self {
66            num_threads: 0,
67            chunk_size: 0,
68            continue_on_error: true,
69        }
70    }
71}
72
73impl ParallelOptions {
74    /// Create options with specific thread count
75    pub fn with_threads(num_threads: usize) -> Self {
76        Self {
77            num_threads,
78            ..Default::default()
79        }
80    }
81
82    /// Create options with chunk processing for memory control
83    pub fn with_chunks(chunk_size: usize) -> Self {
84        Self {
85            chunk_size,
86            ..Default::default()
87        }
88    }
89
90    /// Get effective thread count
91    pub fn effective_threads(&self) -> usize {
92        if self.num_threads == 0 {
93            num_cpus::get()
94        } else {
95            self.num_threads
96        }
97    }
98}
99
100/// Result of parallel processing
101#[derive(Debug)]
102pub struct ParallelResult<T> {
103    /// Successful results with their original indices
104    pub results: Vec<(usize, T)>,
105    /// Errors with their indices and messages
106    pub errors: Vec<(usize, String)>,
107    /// Total processing duration
108    pub duration: Duration,
109    /// Number of items processed
110    pub processed_count: usize,
111}
112
113impl<T> ParallelResult<T> {
114    /// Check if all items were processed successfully
115    pub fn is_success(&self) -> bool {
116        self.errors.is_empty()
117    }
118
119    /// Get success rate as a percentage
120    pub fn success_rate(&self) -> f64 {
121        if self.processed_count == 0 {
122            return 0.0;
123        }
124        (self.results.len() as f64 / self.processed_count as f64) * 100.0
125    }
126
127    /// Get ordered results (sorted by original index)
128    pub fn ordered_results(mut self) -> Vec<T> {
129        self.results.sort_by_key(|(idx, _)| *idx);
130        self.results.into_iter().map(|(_, v)| v).collect()
131    }
132}
133
134/// Progress callback type
135pub type ProgressCallback = Arc<dyn Fn(usize, usize) + Send + Sync>;
136
137/// Parallel processor for batch operations
138pub struct ParallelProcessor {
139    options: ParallelOptions,
140    progress_callback: Option<ProgressCallback>,
141}
142
143impl ParallelProcessor {
144    /// Create a new parallel processor with default options
145    pub fn new() -> Self {
146        Self {
147            options: ParallelOptions::default(),
148            progress_callback: None,
149        }
150    }
151
152    /// Create a parallel processor with specific options
153    pub fn with_options(options: ParallelOptions) -> Self {
154        Self {
155            options,
156            progress_callback: None,
157        }
158    }
159
160    /// Set a progress callback
161    pub fn with_progress<F>(mut self, callback: F) -> Self
162    where
163        F: Fn(usize, usize) + Send + Sync + 'static,
164    {
165        self.progress_callback = Some(Arc::new(callback));
166        self
167    }
168
169    /// Process items in parallel
170    pub fn process<T, E, F>(&self, items: &[PathBuf], processor: F) -> ParallelResult<T>
171    where
172        F: Fn(&Path) -> Result<T, E> + Sync + Send,
173        E: std::fmt::Display,
174        T: Send,
175    {
176        let start = Instant::now();
177        let total = items.len();
178
179        if total == 0 {
180            return ParallelResult {
181                results: vec![],
182                errors: vec![],
183                duration: Duration::ZERO,
184                processed_count: 0,
185            };
186        }
187
188        let completed = Arc::new(AtomicUsize::new(0));
189        let progress_callback = self.progress_callback.clone();
190
191        // Build thread pool if custom thread count specified
192        let pool = if self.options.num_threads > 0 {
193            rayon::ThreadPoolBuilder::new()
194                .num_threads(self.options.num_threads)
195                .build()
196                .ok()
197        } else {
198            None
199        };
200
201        let process_chunk = |chunk: &[(usize, &PathBuf)]| -> Vec<(usize, Result<T, String>)> {
202            chunk
203                .par_iter()
204                .map(|(idx, path)| {
205                    let result = processor(path).map_err(|e| e.to_string());
206
207                    // Update progress
208                    let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
209                    if let Some(ref cb) = progress_callback {
210                        cb(done, total);
211                    }
212
213                    (*idx, result)
214                })
215                .collect()
216        };
217
218        let indexed_items: Vec<_> = items.iter().enumerate().collect();
219
220        let all_results = if self.options.chunk_size > 0 {
221            // Process in chunks for memory control
222            let mut all_results = Vec::with_capacity(total);
223            for chunk in indexed_items.chunks(self.options.chunk_size) {
224                let chunk_results = if let Some(ref pool) = pool {
225                    pool.install(|| process_chunk(chunk))
226                } else {
227                    process_chunk(chunk)
228                };
229                all_results.extend(chunk_results);
230            }
231            all_results
232        } else {
233            // Process all at once
234            if let Some(ref pool) = pool {
235                pool.install(|| process_chunk(&indexed_items))
236            } else {
237                process_chunk(&indexed_items)
238            }
239        };
240
241        // Separate successes and errors
242        let mut results = Vec::new();
243        let mut errors = Vec::new();
244
245        for (idx, result) in all_results {
246            match result {
247                Ok(value) => results.push((idx, value)),
248                Err(msg) => errors.push((idx, msg)),
249            }
250        }
251
252        ParallelResult {
253            results,
254            errors,
255            duration: start.elapsed(),
256            processed_count: total,
257        }
258    }
259
260    /// Process items with a simple function (no error handling)
261    pub fn map<T, F>(&self, items: &[PathBuf], mapper: F) -> Vec<T>
262    where
263        F: Fn(&Path) -> T + Sync + Send,
264        T: Send,
265    {
266        if self.options.num_threads > 0 {
267            if let Ok(pool) = rayon::ThreadPoolBuilder::new()
268                .num_threads(self.options.num_threads)
269                .build()
270            {
271                return pool.install(|| items.par_iter().map(|p| mapper(p)).collect());
272            }
273        }
274
275        items.par_iter().map(|p| mapper(p)).collect()
276    }
277}
278
279impl Default for ParallelProcessor {
280    fn default() -> Self {
281        Self::new()
282    }
283}
284
285/// Convenience function for parallel processing
286pub fn parallel_process<T, E, F>(
287    inputs: &[PathBuf],
288    processor: F,
289    options: &ParallelOptions,
290) -> ParallelResult<T>
291where
292    F: Fn(&Path) -> Result<T, E> + Sync + Send,
293    E: std::fmt::Display,
294    T: Send,
295{
296    ParallelProcessor::with_options(options.clone()).process(inputs, processor)
297}
298
299/// Parallel map with simple function
300pub fn parallel_map<T, F>(inputs: &[PathBuf], mapper: F, num_threads: usize) -> Vec<T>
301where
302    F: Fn(&Path) -> T + Sync + Send,
303    T: Send,
304{
305    ParallelProcessor::with_options(ParallelOptions::with_threads(num_threads)).map(inputs, mapper)
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use std::fs::File;
312    use std::io::Write;
313    use tempfile::tempdir;
314
315    // ============ TC PAR-001: Basic parallel processing ============
316
317    #[test]
318    fn test_par001_parallel_process_basic() {
319        let dir = tempdir().unwrap();
320        let paths: Vec<PathBuf> = (0..10)
321            .map(|i| {
322                let path = dir.path().join(format!("file_{}.txt", i));
323                let mut f = File::create(&path).unwrap();
324                writeln!(f, "content {}", i).unwrap();
325                path
326            })
327            .collect();
328
329        let options = ParallelOptions::default();
330        let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
331
332        assert!(result.is_success());
333        assert_eq!(result.results.len(), 10);
334        assert!(result.success_rate() > 99.0);
335    }
336
337    // ============ TC PAR-006: Thread count control ============
338
339    #[test]
340    fn test_par006_thread_count_options() {
341        let options = ParallelOptions::with_threads(4);
342        assert_eq!(options.effective_threads(), 4);
343
344        let auto_options = ParallelOptions::default();
345        assert!(auto_options.effective_threads() >= 1);
346    }
347
348    #[test]
349    fn test_par006_zero_threads_uses_cpu_count() {
350        let options = ParallelOptions::with_threads(0);
351        assert_eq!(options.effective_threads(), num_cpus::get());
352    }
353
354    // ============ TC PAR-007: Error handling ============
355
356    #[test]
357    fn test_par007_partial_failure() {
358        let paths: Vec<PathBuf> = (0..5)
359            .map(|i| PathBuf::from(format!("/nonexistent/path_{}", i)))
360            .collect();
361
362        let options = ParallelOptions::default();
363        let result = parallel_process(
364            &paths,
365            |path| {
366                if path.exists() {
367                    Ok(true)
368                } else {
369                    Err("File not found")
370                }
371            },
372            &options,
373        );
374
375        assert!(!result.is_success());
376        assert_eq!(result.errors.len(), 5);
377    }
378
379    #[test]
380    fn test_par007_continue_on_error() {
381        let dir = tempdir().unwrap();
382        let mut paths: Vec<PathBuf> = vec![];
383
384        // Create some valid files
385        for i in 0..3 {
386            let path = dir.path().join(format!("valid_{}.txt", i));
387            File::create(&path).unwrap();
388            paths.push(path);
389        }
390        // Add some invalid paths
391        for i in 0..2 {
392            paths.push(PathBuf::from(format!("/invalid_{}", i)));
393        }
394
395        let options = ParallelOptions {
396            continue_on_error: true,
397            ..Default::default()
398        };
399
400        let result = parallel_process(
401            &paths,
402            |path| {
403                if path.exists() {
404                    Ok(true)
405                } else {
406                    Err("Not found")
407                }
408            },
409            &options,
410        );
411
412        assert_eq!(result.results.len(), 3);
413        assert_eq!(result.errors.len(), 2);
414    }
415
416    // ============ TC PAR-009: Memory control with chunks ============
417
418    #[test]
419    fn test_par009_chunk_processing() {
420        let dir = tempdir().unwrap();
421        let paths: Vec<PathBuf> = (0..20)
422            .map(|i| {
423                let path = dir.path().join(format!("file_{}.txt", i));
424                File::create(&path).unwrap();
425                path
426            })
427            .collect();
428
429        let options = ParallelOptions::with_chunks(5);
430        let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
431
432        assert!(result.is_success());
433        assert_eq!(result.results.len(), 20);
434    }
435
436    // ============ Additional tests ============
437
438    #[test]
439    fn test_empty_input() {
440        let paths: Vec<PathBuf> = vec![];
441        let options = ParallelOptions::default();
442        let result = parallel_process(&paths, |_| Ok::<_, String>(true), &options);
443
444        assert!(result.is_success());
445        assert_eq!(result.processed_count, 0);
446    }
447
448    #[test]
449    fn test_ordered_results() {
450        let dir = tempdir().unwrap();
451        let paths: Vec<PathBuf> = (0..5)
452            .map(|i| {
453                let path = dir.path().join(format!("{}.txt", i));
454                File::create(&path).unwrap();
455                path
456            })
457            .collect();
458
459        let options = ParallelOptions::default();
460        let result = parallel_process(
461            &paths,
462            |path| {
463                let name = path.file_stem().unwrap().to_str().unwrap();
464                Ok::<_, String>(name.parse::<usize>().unwrap())
465            },
466            &options,
467        );
468
469        let ordered = result.ordered_results();
470        assert_eq!(ordered, vec![0, 1, 2, 3, 4]);
471    }
472
473    #[test]
474    fn test_parallel_processor_builder() {
475        let processor = ParallelProcessor::new();
476        assert!(processor.progress_callback.is_none());
477
478        let processor_with_progress =
479            ParallelProcessor::new().with_progress(|done, total| println!("{}/{}", done, total));
480        assert!(processor_with_progress.progress_callback.is_some());
481    }
482
483    #[test]
484    fn test_parallel_map() {
485        let paths: Vec<PathBuf> = (0..5).map(|i| PathBuf::from(format!("{}", i))).collect();
486
487        let results = parallel_map(&paths, |p| p.to_string_lossy().parse::<i32>().unwrap(), 2);
488
489        assert_eq!(results.len(), 5);
490    }
491
492    #[test]
493    fn test_success_rate_calculation() {
494        let result: ParallelResult<bool> = ParallelResult {
495            results: vec![(0, true), (1, true)],
496            errors: vec![(2, "error".to_string())],
497            duration: Duration::ZERO,
498            processed_count: 3,
499        };
500
501        let rate = result.success_rate();
502        assert!((rate - 66.67).abs() < 1.0);
503    }
504
505    #[test]
506    fn test_error_types() {
507        let err1 = ParallelError::ThreadPoolError("test".to_string());
508        assert!(err1.to_string().contains("Thread pool"));
509
510        let err2 = ParallelError::ProcessingError {
511            index: 5,
512            message: "fail".to_string(),
513        };
514        assert!(err2.to_string().contains("index 5"));
515
516        let err3 = ParallelError::AllTasksFailed(10);
517        assert!(err3.to_string().contains("10 tasks"));
518    }
519
520    #[test]
521    fn test_default_options() {
522        let options = ParallelOptions::default();
523        assert_eq!(options.num_threads, 0);
524        assert_eq!(options.chunk_size, 0);
525        assert!(options.continue_on_error);
526    }
527
528    #[test]
529    fn test_thread_safety() {
530        use std::sync::atomic::AtomicUsize;
531        use std::sync::Arc;
532
533        let counter = Arc::new(AtomicUsize::new(0));
534        let paths: Vec<PathBuf> = (0..100).map(|i| PathBuf::from(format!("{}", i))).collect();
535
536        let counter_clone = Arc::clone(&counter);
537        let results = parallel_map(
538            &paths,
539            move |_| {
540                counter_clone.fetch_add(1, Ordering::Relaxed);
541                true
542            },
543            4,
544        );
545
546        assert_eq!(results.len(), 100);
547        assert_eq!(counter.load(Ordering::Relaxed), 100);
548    }
549
550    #[test]
551    fn test_progress_callback() {
552        use std::sync::atomic::AtomicUsize;
553        use std::sync::Arc;
554
555        let progress_count = Arc::new(AtomicUsize::new(0));
556        let progress_clone = Arc::clone(&progress_count);
557
558        let dir = tempdir().unwrap();
559        let paths: Vec<PathBuf> = (0..10)
560            .map(|i| {
561                let path = dir.path().join(format!("{}.txt", i));
562                File::create(&path).unwrap();
563                path
564            })
565            .collect();
566
567        let processor = ParallelProcessor::new().with_progress(move |_, _| {
568            progress_clone.fetch_add(1, Ordering::Relaxed);
569        });
570
571        let _ = processor.process(&paths, |p| Ok::<_, String>(p.exists()));
572
573        assert_eq!(progress_count.load(Ordering::Relaxed), 10);
574    }
575
576    // ============ Additional edge case tests ============
577
578    #[test]
579    fn test_custom_thread_pool() {
580        let dir = tempdir().unwrap();
581        let paths: Vec<PathBuf> = (0..20)
582            .map(|i| {
583                let path = dir.path().join(format!("file_{}.txt", i));
584                File::create(&path).unwrap();
585                path
586            })
587            .collect();
588
589        // Use exactly 2 threads
590        let options = ParallelOptions::with_threads(2);
591        let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
592
593        assert!(result.is_success());
594        assert_eq!(result.results.len(), 20);
595    }
596
597    #[test]
598    fn test_large_chunk_size() {
599        let dir = tempdir().unwrap();
600        let paths: Vec<PathBuf> = (0..50)
601            .map(|i| {
602                let path = dir.path().join(format!("file_{}.txt", i));
603                File::create(&path).unwrap();
604                path
605            })
606            .collect();
607
608        // Chunk size larger than input
609        let options = ParallelOptions::with_chunks(100);
610        let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
611
612        assert!(result.is_success());
613        assert_eq!(result.results.len(), 50);
614    }
615
616    #[test]
617    fn test_mixed_success_failure() {
618        let dir = tempdir().unwrap();
619        let mut paths = Vec::new();
620
621        // Create 5 valid files
622        for i in 0..5 {
623            let path = dir.path().join(format!("valid_{}.txt", i));
624            File::create(&path).unwrap();
625            paths.push(path);
626        }
627        // Add 5 invalid paths
628        for i in 0..5 {
629            paths.push(PathBuf::from(format!("/nonexistent/path_{}", i)));
630        }
631
632        let options = ParallelOptions::default();
633        let result = parallel_process(
634            &paths,
635            |path| {
636                if path.exists() {
637                    Ok(path.to_string_lossy().to_string())
638                } else {
639                    Err("File not found")
640                }
641            },
642            &options,
643        );
644
645        assert_eq!(result.results.len(), 5);
646        assert_eq!(result.errors.len(), 5);
647        assert!((result.success_rate() - 50.0).abs() < 0.1);
648    }
649
650    #[test]
651    fn test_processor_with_custom_options() {
652        let dir = tempdir().unwrap();
653        let paths: Vec<PathBuf> = (0..10)
654            .map(|i| {
655                let path = dir.path().join(format!("{}.txt", i));
656                File::create(&path).unwrap();
657                path
658            })
659            .collect();
660
661        let options = ParallelOptions {
662            num_threads: 4,
663            chunk_size: 3,
664            continue_on_error: true,
665        };
666
667        let processor = ParallelProcessor::with_options(options);
668        let result = processor.process(&paths, |p| Ok::<_, String>(p.exists()));
669
670        assert!(result.is_success());
671        assert_eq!(result.processed_count, 10);
672    }
673
674    #[test]
675    fn test_parallel_result_duration() {
676        let dir = tempdir().unwrap();
677        let paths: Vec<PathBuf> = (0..5)
678            .map(|i| {
679                let path = dir.path().join(format!("{}.txt", i));
680                File::create(&path).unwrap();
681                path
682            })
683            .collect();
684
685        let options = ParallelOptions::default();
686        let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
687
688        // Duration should be non-zero
689        assert!(result.duration.as_nanos() > 0);
690    }
691
692    #[test]
693    fn test_single_item_processing() {
694        let dir = tempdir().unwrap();
695        let path = dir.path().join("single.txt");
696        File::create(&path).unwrap();
697        let paths = vec![path];
698
699        let options = ParallelOptions::default();
700        let result = parallel_process(&paths, |p| Ok::<_, String>(p.exists()), &options);
701
702        assert!(result.is_success());
703        assert_eq!(result.results.len(), 1);
704        assert_eq!(result.processed_count, 1);
705    }
706
707    #[test]
708    fn test_parallel_map_preserves_order() {
709        let paths: Vec<PathBuf> = (0..20).map(|i| PathBuf::from(format!("{:02}", i))).collect();
710
711        let results = parallel_map(&paths, |p| p.to_string_lossy().to_string(), 4);
712
713        // Results should be returned (order may vary due to parallelism)
714        assert_eq!(results.len(), 20);
715        // Each result should be valid
716        for result in &results {
717            assert!(result.len() == 2); // "00" to "19"
718        }
719    }
720}