Skip to main content

ferrous_forge/performance/
parallel.rs

1//! Parallel execution for validation tasks
2
3use crate::Result;
4use crate::validation::Violation;
5use rayon::prelude::*;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tracing::{debug, info};
9
10/// Parallel validator for concurrent file processing
11pub struct ParallelValidator {
12    /// Number of threads to use (0 = auto)
13    thread_count: usize,
14}
15
16impl ParallelValidator {
17    /// Create a new parallel validator
18    pub fn new(thread_count: usize) -> Self {
19        if thread_count > 0 {
20            rayon::ThreadPoolBuilder::new()
21                .num_threads(thread_count)
22                .build_global()
23                .ok();
24        }
25        Self { thread_count }
26    }
27
28    /// Validate multiple files in parallel
29    pub async fn validate_files(&self, files: Vec<PathBuf>) -> Result<Vec<Violation>> {
30        info!(
31            "Validating {} files in parallel (threads: {})",
32            files.len(),
33            if self.thread_count == 0 {
34                rayon::current_num_threads()
35            } else {
36                self.thread_count
37            }
38        );
39
40        let start = std::time::Instant::now();
41
42        // Process files in parallel
43        let violations: Vec<Violation> = files
44            .into_par_iter()
45            .flat_map(|file| {
46                debug!("Validating file: {}", file.display());
47                match validate_single_file(&file) {
48                    Ok(violations) => violations,
49                    Err(e) => {
50                        tracing::error!("Error validating {}: {}", file.display(), e);
51                        vec![]
52                    }
53                }
54            })
55            .collect();
56
57        let elapsed = start.elapsed();
58        info!(
59            "Parallel validation completed in {:.2}s ({} violations)",
60            elapsed.as_secs_f64(),
61            violations.len()
62        );
63
64        Ok(violations)
65    }
66
67    /// Calculate parallel speedup
68    pub fn calculate_speedup(&self, serial_time: f64, parallel_time: f64) -> f64 {
69        if parallel_time > 0.0 {
70            serial_time / parallel_time
71        } else {
72            1.0
73        }
74    }
75}
76
77/// Validate a single file (used in parallel processing)
78fn validate_single_file(path: &Path) -> Result<Vec<Violation>> {
79    // Use sync validation for parallel processing
80    use std::fs;
81
82    let content = fs::read_to_string(path).map_err(crate::Error::Io)?;
83
84    // Simple validation for now - check file size
85    let mut violations = Vec::new();
86    let line_count = content.lines().count();
87
88    if line_count > 300 {
89        violations.push(Violation {
90            violation_type: crate::validation::ViolationType::FileTooLarge,
91            file: path.to_path_buf(),
92            line: line_count,
93            message: format!("File has {} lines, maximum allowed is 300", line_count),
94            severity: crate::validation::Severity::Error,
95        });
96    }
97
98    Ok(violations)
99}
100
101/// Parallel safety check runner
102pub struct ParallelSafetyRunner {
103    thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl ParallelSafetyRunner {
107    /// Create a new parallel safety runner
108    pub fn new(thread_count: usize) -> Result<Self> {
109        let pool = if thread_count > 0 {
110            rayon::ThreadPoolBuilder::new()
111                .num_threads(thread_count)
112                .build()
113                .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
114        } else {
115            rayon::ThreadPoolBuilder::new()
116                .build()
117                .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
118        };
119
120        Ok(Self {
121            thread_pool: Arc::new(pool),
122        })
123    }
124
125    /// Run checks in parallel
126    pub async fn run_checks_parallel<F, T>(&self, checks: Vec<F>) -> Vec<Result<T>>
127    where
128        F: Fn() -> Result<T> + Send + Sync + 'static,
129        T: Send + 'static,
130    {
131        let pool = self.thread_pool.clone();
132
133        tokio::task::spawn_blocking(move || {
134            pool.install(|| checks.into_par_iter().map(|check| check()).collect())
135        })
136        .await
137        .unwrap_or_else(|e| {
138            tracing::error!("Parallel execution failed: {}", e);
139            vec![]
140        })
141    }
142}
143
144/// Batch processor for file operations
145pub struct BatchProcessor {
146    batch_size: usize,
147}
148
149impl BatchProcessor {
150    /// Create a new batch processor
151    pub fn new(batch_size: usize) -> Self {
152        Self {
153            batch_size: batch_size.max(1),
154        }
155    }
156
157    /// Process items in batches
158    pub fn process_batches<T, F, R>(&self, items: Vec<T>, processor: F) -> Vec<R>
159    where
160        T: Send + Clone,
161        F: Fn(Vec<T>) -> Vec<R> + Send + Sync,
162        R: Send,
163    {
164        let mut results = Vec::new();
165        for chunk in items.chunks(self.batch_size) {
166            results.extend(processor(chunk.to_vec()));
167        }
168        results
169    }
170}
171
172#[cfg(test)]
173#[allow(clippy::expect_used)]
174#[allow(clippy::unwrap_used)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_parallel_validator_creation() {
180        let validator = ParallelValidator::new(4);
181        assert_eq!(validator.thread_count, 4);
182    }
183
184    #[test]
185    fn test_speedup_calculation() {
186        let validator = ParallelValidator::new(0);
187        let speedup = validator.calculate_speedup(10.0, 2.5);
188        assert_eq!(speedup, 4.0);
189    }
190
191    #[test]
192    fn test_batch_processor() {
193        let processor = BatchProcessor::new(3);
194        let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
195
196        let results =
197            processor.process_batches(items, |batch| batch.iter().map(|x| x * 2).collect());
198
199        assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
200    }
201
202    #[tokio::test]
203    async fn test_parallel_safety_runner() {
204        let runner = ParallelSafetyRunner::new(2).expect("Failed to create runner");
205
206        let checks = vec![
207            || Ok::<i32, crate::Error>(1),
208            || Ok::<i32, crate::Error>(2),
209            || Ok::<i32, crate::Error>(3),
210        ];
211
212        let results = runner.run_checks_parallel(checks).await;
213        assert_eq!(results.len(), 3);
214    }
215}