ferrous_forge/performance/
parallel.rs1use crate::Result;
4use crate::validation::Violation;
5use rayon::prelude::*;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tracing::{debug, info};
9
10pub struct ParallelValidator {
12 thread_count: usize,
14}
15
16impl ParallelValidator {
17 pub fn new(thread_count: usize) -> Self {
19 if thread_count > 0 {
20 rayon::ThreadPoolBuilder::new()
21 .num_threads(thread_count)
22 .build_global()
23 .ok();
24 }
25 Self { thread_count }
26 }
27
28 pub async fn validate_files(&self, files: Vec<PathBuf>) -> Result<Vec<Violation>> {
35 info!(
36 "Validating {} files in parallel (threads: {})",
37 files.len(),
38 if self.thread_count == 0 {
39 rayon::current_num_threads()
40 } else {
41 self.thread_count
42 }
43 );
44
45 let start = std::time::Instant::now();
46
47 let violations: Vec<Violation> = files
49 .into_par_iter()
50 .flat_map(|file| {
51 debug!("Validating file: {}", file.display());
52 match validate_single_file(&file) {
53 Ok(violations) => violations,
54 Err(e) => {
55 tracing::error!("Error validating {}: {}", file.display(), e);
56 vec![]
57 }
58 }
59 })
60 .collect();
61
62 let elapsed = start.elapsed();
63 info!(
64 "Parallel validation completed in {:.2}s ({} violations)",
65 elapsed.as_secs_f64(),
66 violations.len()
67 );
68
69 Ok(violations)
70 }
71
72 pub fn calculate_speedup(&self, serial_time: f64, parallel_time: f64) -> f64 {
74 if parallel_time > 0.0 {
75 serial_time / parallel_time
76 } else {
77 1.0
78 }
79 }
80}
81
82fn validate_single_file(path: &Path) -> Result<Vec<Violation>> {
84 use std::fs;
86
87 let content = fs::read_to_string(path).map_err(crate::Error::Io)?;
88
89 let mut violations = Vec::new();
91 let line_count = content.lines().count();
92
93 if line_count > 300 {
94 violations.push(Violation {
95 violation_type: crate::validation::ViolationType::FileTooLarge,
96 file: path.to_path_buf(),
97 line: line_count,
98 message: format!("File has {} lines, maximum allowed is 300", line_count),
99 severity: crate::validation::Severity::Error,
100 });
101 }
102
103 Ok(violations)
104}
105
106pub struct ParallelSafetyRunner {
108 thread_pool: Arc<rayon::ThreadPool>,
109}
110
111impl ParallelSafetyRunner {
112 pub fn new(thread_count: usize) -> Result<Self> {
118 let pool = if thread_count > 0 {
119 rayon::ThreadPoolBuilder::new()
120 .num_threads(thread_count)
121 .build()
122 .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
123 } else {
124 rayon::ThreadPoolBuilder::new()
125 .build()
126 .map_err(|e| crate::Error::config(format!("Failed to create thread pool: {}", e)))?
127 };
128
129 Ok(Self {
130 thread_pool: Arc::new(pool),
131 })
132 }
133
134 pub async fn run_checks_parallel<F, T>(&self, checks: Vec<F>) -> Vec<Result<T>>
136 where
137 F: Fn() -> Result<T> + Send + Sync + 'static,
138 T: Send + 'static,
139 {
140 let pool = self.thread_pool.clone();
141
142 tokio::task::spawn_blocking(move || {
143 pool.install(|| checks.into_par_iter().map(|check| check()).collect())
144 })
145 .await
146 .unwrap_or_else(|e| {
147 tracing::error!("Parallel execution failed: {}", e);
148 vec![]
149 })
150 }
151}
152
153pub struct BatchProcessor {
155 batch_size: usize,
156}
157
158impl BatchProcessor {
159 pub fn new(batch_size: usize) -> Self {
161 Self {
162 batch_size: batch_size.max(1),
163 }
164 }
165
166 pub fn process_batches<T, F, R>(&self, items: Vec<T>, processor: F) -> Vec<R>
168 where
169 T: Send + Clone,
170 F: Fn(Vec<T>) -> Vec<R> + Send + Sync,
171 R: Send,
172 {
173 let mut results = Vec::new();
174 for chunk in items.chunks(self.batch_size) {
175 results.extend(processor(chunk.to_vec()));
176 }
177 results
178 }
179}
180
181#[cfg(test)]
182#[allow(clippy::expect_used)]
183#[allow(clippy::unwrap_used)]
184mod tests {
185 use super::*;
186
187 #[test]
188 fn test_parallel_validator_creation() {
189 let validator = ParallelValidator::new(4);
190 assert_eq!(validator.thread_count, 4);
191 }
192
193 #[test]
194 fn test_speedup_calculation() {
195 let validator = ParallelValidator::new(0);
196 let speedup = validator.calculate_speedup(10.0, 2.5);
197 assert_eq!(speedup, 4.0);
198 }
199
200 #[test]
201 fn test_batch_processor() {
202 let processor = BatchProcessor::new(3);
203 let items = vec![1, 2, 3, 4, 5, 6, 7, 8];
204
205 let results =
206 processor.process_batches(items, |batch| batch.iter().map(|x| x * 2).collect());
207
208 assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
209 }
210
211 #[tokio::test]
212 async fn test_parallel_safety_runner() {
213 let runner = ParallelSafetyRunner::new(2).expect("Failed to create runner");
214
215 let checks = vec![
216 || Ok::<i32, crate::Error>(1),
217 || Ok::<i32, crate::Error>(2),
218 || Ok::<i32, crate::Error>(3),
219 ];
220
221 let results = runner.run_checks_parallel(checks).await;
222 assert_eq!(results.len(), 3);
223 }
224}