ferrous_forge/performance/
parallel.rs1use crate::Result;
4use crate::validation::Violation;
5use rayon::prelude::*;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tracing::{debug, info};
9
10pub struct ParallelValidator {
12 thread_count: usize,
14}
15
16impl ParallelValidator {
17 pub fn new(thread_count: usize) -> Self {
19 if thread_count > 0 {
20 rayon::ThreadPoolBuilder::new()
21 .num_threads(thread_count)
22 .build_global()
23 .ok();
24 }
25 Self { thread_count }
26 }
27
28 pub async fn validate_files(&self, files: Vec<PathBuf>) -> Result<Vec<Violation>> {
30 info!(
31 "Validating {} files in parallel (threads: {})",
32 files.len(),
33 if self.thread_count == 0 {
34 rayon::current_num_threads()
35 } else {
36 self.thread_count
37 }
38 );
39
40 let start = std::time::Instant::now();
41
42 let violations: Vec<Violation> = files
44 .into_par_iter()
45 .flat_map(|file| {
46 debug!("Validating file: {}", file.display());
47 match validate_single_file(&file) {
48 Ok(violations) => violations,
49 Err(e) => {
50 tracing::error!("Error validating {}: {}", file.display(), e);
51 vec![]
52 }
53 }
54 })
55 .collect();
56
57 let elapsed = start.elapsed();
58 info!(
59 "Parallel validation completed in {:.2}s ({} violations)",
60 elapsed.as_secs_f64(),
61 violations.len()
62 );
63
64 Ok(violations)
65 }
66
67 pub fn calculate_speedup(&self, serial_time: f64, parallel_time: f64) -> f64 {
69 if parallel_time > 0.0 {
70 serial_time / parallel_time
71 } else {
72 1.0
73 }
74 }
75}
76
77fn validate_single_file(path: &Path) -> Result<Vec<Violation>> {
79 use std::fs;
81
82 let content = fs::read_to_string(path).map_err(crate::Error::Io)?;
83
84 let mut violations = Vec::new();
86 let line_count = content.lines().count();
87
88 if line_count > 300 {
89 violations.push(Violation {
90 violation_type: crate::validation::ViolationType::FileTooLarge,
91 file: path.to_path_buf(),
92 line: line_count,
93 message: format!("File has {} lines, maximum allowed is 300", line_count),
94 severity: crate::validation::Severity::Error,
95 });
96 }
97
98 Ok(violations)
99}
100
101pub struct ParallelSafetyRunner {
103 thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl ParallelSafetyRunner {
107 pub fn new(thread_count: usize) -> Result<Self> {
109 let pool = if thread_count > 0 {
110 rayon::ThreadPoolBuilder::new()
111 .num_threads(thread_count)
112 .build()
113 .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
114 } else {
115 rayon::ThreadPoolBuilder::new()
116 .build()
117 .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
118 };
119
120 Ok(Self {
121 thread_pool: Arc::new(pool),
122 })
123 }
124
125 pub async fn run_checks_parallel<F, T>(&self, checks: Vec<F>) -> Vec<Result<T>>
127 where
128 F: Fn() -> Result<T> + Send + Sync + 'static,
129 T: Send + 'static,
130 {
131 let pool = self.thread_pool.clone();
132
133 tokio::task::spawn_blocking(move || {
134 pool.install(|| checks.into_par_iter().map(|check| check()).collect())
135 })
136 .await
137 .unwrap_or_else(|e| {
138 tracing::error!("Parallel execution failed: {}", e);
139 vec![]
140 })
141 }
142}
143
144pub struct BatchProcessor {
146 batch_size: usize,
147}
148
149impl BatchProcessor {
150 pub fn new(batch_size: usize) -> Self {
152 Self {
153 batch_size: batch_size.max(1),
154 }
155 }
156
157 pub fn process_batches<T, F, R>(&self, items: Vec<T>, processor: F) -> Vec<R>
159 where
160 T: Send + Clone,
161 F: Fn(Vec<T>) -> Vec<R> + Send + Sync,
162 R: Send,
163 {
164 let mut results = Vec::new();
165 for chunk in items.chunks(self.batch_size) {
166 results.extend(processor(chunk.to_vec()));
167 }
168 results
169 }
170}
171
172#[cfg(test)]
173#[allow(clippy::expect_used)]
174#[allow(clippy::unwrap_used)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_parallel_validator_creation() {
180 let validator = ParallelValidator::new(4);
181 assert_eq!(validator.thread_count, 4);
182 }
183
184 #[test]
185 fn test_speedup_calculation() {
186 let validator = ParallelValidator::new(0);
187 let speedup = validator.calculate_speedup(10.0, 2.5);
188 assert_eq!(speedup, 4.0);
189 }
190
191 #[test]
192 fn test_batch_processor() {
193 let processor = BatchProcessor::new(3);
194 let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
195
196 let results =
197 processor.process_batches(items, |batch| batch.iter().map(|x| x * 2).collect());
198
199 assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
200 }
201
202 #[tokio::test]
203 async fn test_parallel_safety_runner() {
204 let runner = ParallelSafetyRunner::new(2).expect("Failed to create runner");
205
206 let checks = vec![
207 || Ok::<i32, crate::Error>(1),
208 || Ok::<i32, crate::Error>(2),
209 || Ok::<i32, crate::Error>(3),
210 ];
211
212 let results = runner.run_checks_parallel(checks).await;
213 assert_eq!(results.len(), 3);
214 }
215}