Skip to main content

ferrous_forge/performance/
parallel.rs

1//! Parallel execution for validation tasks
2
3use crate::Result;
4use crate::validation::Violation;
5use rayon::prelude::*;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tracing::{debug, info};
9
10/// Parallel validator for concurrent file processing
11pub struct ParallelValidator {
12    /// Number of threads to use (0 = auto)
13    thread_count: usize,
14}
15
16impl ParallelValidator {
17    /// Create a new parallel validator
18    pub fn new(thread_count: usize) -> Self {
19        if thread_count > 0 {
20            rayon::ThreadPoolBuilder::new()
21                .num_threads(thread_count)
22                .build_global()
23                .ok();
24        }
25        Self { thread_count }
26    }
27
28    /// Validate multiple files in parallel
29    ///
30    /// # Errors
31    ///
32    /// This method is infallible in practice; individual file validation
33    /// errors are logged and the file is skipped.
34    pub async fn validate_files(&self, files: Vec<PathBuf>) -> Result<Vec<Violation>> {
35        info!(
36            "Validating {} files in parallel (threads: {})",
37            files.len(),
38            if self.thread_count == 0 {
39                rayon::current_num_threads()
40            } else {
41                self.thread_count
42            }
43        );
44
45        let start = std::time::Instant::now();
46
47        // Process files in parallel
48        let violations: Vec<Violation> = files
49            .into_par_iter()
50            .flat_map(|file| {
51                debug!("Validating file: {}", file.display());
52                match validate_single_file(&file) {
53                    Ok(violations) => violations,
54                    Err(e) => {
55                        tracing::error!("Error validating {}: {}", file.display(), e);
56                        vec![]
57                    }
58                }
59            })
60            .collect();
61
62        let elapsed = start.elapsed();
63        info!(
64            "Parallel validation completed in {:.2}s ({} violations)",
65            elapsed.as_secs_f64(),
66            violations.len()
67        );
68
69        Ok(violations)
70    }
71
72    /// Calculate parallel speedup
73    pub fn calculate_speedup(&self, serial_time: f64, parallel_time: f64) -> f64 {
74        if parallel_time > 0.0 {
75            serial_time / parallel_time
76        } else {
77            1.0
78        }
79    }
80}
81
82/// Validate a single file (used in parallel processing)
83fn validate_single_file(path: &Path) -> Result<Vec<Violation>> {
84    // Use sync validation for parallel processing
85    use std::fs;
86
87    let content = fs::read_to_string(path).map_err(crate::Error::Io)?;
88
89    // Simple validation for now - check file size
90    let mut violations = Vec::new();
91    let line_count = content.lines().count();
92
93    if line_count > 300 {
94        violations.push(Violation {
95            violation_type: crate::validation::ViolationType::FileTooLarge,
96            file: path.to_path_buf(),
97            line: line_count,
98            message: format!("File has {} lines, maximum allowed is 300", line_count),
99            severity: crate::validation::Severity::Error,
100        });
101    }
102
103    Ok(violations)
104}
105
106/// Parallel safety check runner
107pub struct ParallelSafetyRunner {
108    thread_pool: Arc<rayon::ThreadPool>,
109}
110
111impl ParallelSafetyRunner {
112    /// Create a new parallel safety runner
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the thread pool cannot be created.
117    pub fn new(thread_count: usize) -> Result<Self> {
118        let pool = if thread_count > 0 {
119            rayon::ThreadPoolBuilder::new()
120                .num_threads(thread_count)
121                .build()
122                .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
123        } else {
124            rayon::ThreadPoolBuilder::new()
125                .build()
126                .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
127        };
128
129        Ok(Self {
130            thread_pool: Arc::new(pool),
131        })
132    }
133
134    /// Run checks in parallel
135    pub async fn run_checks_parallel<F, T>(&self, checks: Vec<F>) -> Vec<Result<T>>
136    where
137        F: Fn() -> Result<T> + Send + Sync + 'static,
138        T: Send + 'static,
139    {
140        let pool = self.thread_pool.clone();
141
142        tokio::task::spawn_blocking(move || {
143            pool.install(|| checks.into_par_iter().map(|check| check()).collect())
144        })
145        .await
146        .unwrap_or_else(|e| {
147            tracing::error!("Parallel execution failed: {}", e);
148            vec![]
149        })
150    }
151}
152
153/// Batch processor for file operations
154pub struct BatchProcessor {
155    batch_size: usize,
156}
157
158impl BatchProcessor {
159    /// Create a new batch processor
160    pub fn new(batch_size: usize) -> Self {
161        Self {
162            batch_size: batch_size.max(1),
163        }
164    }
165
166    /// Process items in batches
167    pub fn process_batches<T, F, R>(&self, items: Vec<T>, processor: F) -> Vec<R>
168    where
169        T: Send + Clone,
170        F: Fn(Vec<T>) -> Vec<R> + Send + Sync,
171        R: Send,
172    {
173        let mut results = Vec::new();
174        for chunk in items.chunks(self.batch_size) {
175            results.extend(processor(chunk.to_vec()));
176        }
177        results
178    }
179}
180
181#[cfg(test)]
182#[allow(clippy::expect_used)]
183#[allow(clippy::unwrap_used)]
184mod tests {
185    use super::*;
186
187    #[test]
188    fn test_parallel_validator_creation() {
189        let validator = ParallelValidator::new(4);
190        assert_eq!(validator.thread_count, 4);
191    }
192
193    #[test]
194    fn test_speedup_calculation() {
195        let validator = ParallelValidator::new(0);
196        let speedup = validator.calculate_speedup(10.0, 2.5);
197        assert_eq!(speedup, 4.0);
198    }
199
200    #[test]
201    fn test_batch_processor() {
202        let processor = BatchProcessor::new(3);
203        let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
204
205        let results =
206            processor.process_batches(items, |batch| batch.iter().map(|x| x * 2).collect());
207
208        assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
209    }
210
211    #[tokio::test]
212    async fn test_parallel_safety_runner() {
213        let runner = ParallelSafetyRunner::new(2).expect("Failed to create runner");
214
215        let checks = vec![
216            || Ok::<i32, crate::Error>(1),
217            || Ok::<i32, crate::Error>(2),
218            || Ok::<i32, crate::Error>(3),
219        ];
220
221        let results = runner.run_checks_parallel(checks).await;
222        assert_eq!(results.len(), 3);
223    }
224}