1use crate::{Module, ModuleResult};
9use rayon::prelude::*;
10use std::sync::{Arc, Mutex};
11
12#[derive(Debug, Clone)]
14pub struct ParallelConfig {
15 pub num_threads: usize,
17
18 pub parallel_modules: bool,
20
21 pub parallel_optimization: bool,
23
24 pub min_modules_for_parallel: usize,
26}
27
28impl Default for ParallelConfig {
29 fn default() -> Self {
30 Self {
31 num_threads: 0, parallel_modules: true,
33 parallel_optimization: true,
34 min_modules_for_parallel: 2,
35 }
36 }
37}
38
39impl ParallelConfig {
40 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn with_threads(mut self, num_threads: usize) -> Self {
47 self.num_threads = num_threads;
48 self
49 }
50
51 pub fn with_parallel_modules(mut self, enabled: bool) -> Self {
53 self.parallel_modules = enabled;
54 self
55 }
56
57 pub fn with_parallel_optimization(mut self, enabled: bool) -> Self {
59 self.parallel_optimization = enabled;
60 self
61 }
62
63 pub fn init_thread_pool(&self) -> Result<(), String> {
65 if self.num_threads > 0 {
66 rayon::ThreadPoolBuilder::new()
67 .num_threads(self.num_threads)
68 .build_global()
69 .map_err(|e| format!("Failed to initialize thread pool: {}", e))?;
70 }
71 Ok(())
72 }
73}
74
75pub struct ParallelCompiler {
77 config: ParallelConfig,
78}
79
80impl ParallelCompiler {
81 pub fn new(config: ParallelConfig) -> Self {
83 Self { config }
84 }
85
86 pub fn compile_modules_parallel<F>(
89 &self,
90 module_names: Vec<String>,
91 compile_fn: F,
92 ) -> Vec<ModuleResult<Module>>
93 where
94 F: Fn(&str) -> ModuleResult<Module> + Sync + Send,
95 {
96 if !self.config.parallel_modules
97 || module_names.len() < self.config.min_modules_for_parallel
98 {
99 return module_names.iter().map(|name| compile_fn(name)).collect();
101 }
102
103 module_names
105 .par_iter()
106 .map(|name| compile_fn(name))
107 .collect()
108 }
109
110 pub fn optimize_parallel<T, F>(&self, items: Vec<T>, optimize_fn: F) -> Vec<T>
112 where
113 T: Send,
114 F: Fn(T) -> T + Sync + Send,
115 {
116 if !self.config.parallel_optimization || items.len() < 2 {
117 return items.into_iter().map(optimize_fn).collect();
118 }
119
120 items.into_par_iter().map(optimize_fn).collect()
121 }
122
123 pub fn load_ppu_files_parallel<F>(
125 &self,
126 unit_names: Vec<String>,
127 load_fn: F,
128 ) -> Vec<ModuleResult<crate::ast::Unit>>
129 where
130 F: Fn(&str) -> ModuleResult<crate::ast::Unit> + Sync + Send,
131 {
132 if unit_names.len() < self.config.min_modules_for_parallel {
133 return unit_names.iter().map(|name| load_fn(name)).collect();
134 }
135
136 unit_names.par_iter().map(|name| load_fn(name)).collect()
137 }
138}
139
140pub struct ProgressTracker {
142 total: usize,
143 completed: Arc<Mutex<usize>>,
144 errors: Arc<Mutex<Vec<String>>>,
145}
146
147impl ProgressTracker {
148 pub fn new(total: usize) -> Self {
150 Self {
151 total,
152 completed: Arc::new(Mutex::new(0)),
153 errors: Arc::new(Mutex::new(Vec::new())),
154 }
155 }
156
157 pub fn complete_one(&self) {
159 let mut completed = self.completed.lock().unwrap();
160 *completed += 1;
161 }
162
163 pub fn add_error(&self, error: String) {
165 let mut errors = self.errors.lock().unwrap();
166 errors.push(error);
167 }
168
169 pub fn progress(&self) -> f64 {
171 let completed = *self.completed.lock().unwrap();
172 if self.total == 0 {
173 1.0
174 } else {
175 completed as f64 / self.total as f64
176 }
177 }
178
179 pub fn completed(&self) -> usize {
181 *self.completed.lock().unwrap()
182 }
183
184 pub fn errors(&self) -> Vec<String> {
186 self.errors.lock().unwrap().clone()
187 }
188
189 pub fn is_complete(&self) -> bool {
191 *self.completed.lock().unwrap() >= self.total
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use std::sync::atomic::{AtomicUsize, Ordering};
199 use std::thread;
200 use crate::ModuleError;
201 use std::time::Duration;
202
203 fn create_test_module(name: &str) -> Module {
205 Module {
206 name: name.to_string(),
207 unit: crate::ast::Unit {
208 name: name.to_string(),
209 uses: vec![],
210 interface: crate::ast::UnitInterface {
211 uses: vec![],
212 types: vec![],
213 constants: vec![],
214 variables: vec![],
215 procedures: vec![],
216 functions: vec![],
217 classes: vec![],
218 interfaces: vec![],
219 },
220 implementation: crate::ast::UnitImplementation {
221 uses: vec![],
222 types: vec![],
223 constants: vec![],
224 variables: vec![],
225 procedures: vec![],
226 functions: vec![],
227 classes: vec![],
228 interfaces: vec![],
229 initialization: None,
230 finalization: None,
231 },
232 },
233 dependencies: vec![],
234 }
235 }
236
237 #[test]
238 fn test_parallel_config_default() {
239 let config = ParallelConfig::default();
240
241 assert_eq!(config.num_threads, 0); assert!(config.parallel_modules);
243 assert!(config.parallel_optimization);
244 assert_eq!(config.min_modules_for_parallel, 2);
245 }
246
247 #[test]
248 fn test_parallel_config_builder() {
249 let config = ParallelConfig::new()
250 .with_threads(4)
251 .with_parallel_modules(true)
252 .with_parallel_optimization(false);
253
254 assert_eq!(config.num_threads, 4);
255 assert!(config.parallel_modules);
256 assert!(!config.parallel_optimization);
257 }
258
259 #[test]
260 fn test_parallel_config_chaining() {
261 let config = ParallelConfig::new()
262 .with_threads(8)
263 .with_parallel_modules(false)
264 .with_parallel_optimization(true);
265
266 assert_eq!(config.num_threads, 8);
267 assert!(!config.parallel_modules);
268 assert!(config.parallel_optimization);
269 }
270
271 #[test]
272 fn test_progress_tracker_basic() {
273 let tracker = ProgressTracker::new(10);
274
275 assert_eq!(tracker.completed(), 0);
276 assert_eq!(tracker.progress(), 0.0);
277 assert!(!tracker.is_complete());
278
279 tracker.complete_one();
280 assert_eq!(tracker.completed(), 1);
281 assert_eq!(tracker.progress(), 0.1);
282
283 tracker.add_error("Test error".to_string());
284 assert_eq!(tracker.errors().len(), 1);
285 }
286
287 #[test]
288 fn test_progress_tracker_completion() {
289 let tracker = ProgressTracker::new(5);
290
291 for _ in 0..5 {
292 tracker.complete_one();
293 }
294
295 assert_eq!(tracker.completed(), 5);
296 assert_eq!(tracker.progress(), 1.0);
297 assert!(tracker.is_complete());
298 }
299
300 #[test]
301 fn test_progress_tracker_zero_total() {
302 let tracker = ProgressTracker::new(0);
303
304 assert_eq!(tracker.progress(), 1.0);
305 assert!(tracker.is_complete());
306 }
307
308 #[test]
309 fn test_progress_tracker_multiple_errors() {
310 let tracker = ProgressTracker::new(10);
311
312 tracker.add_error("Error 1".to_string());
313 tracker.add_error("Error 2".to_string());
314 tracker.add_error("Error 3".to_string());
315
316 let errors = tracker.errors();
317 assert_eq!(errors.len(), 3);
318 assert_eq!(errors[0], "Error 1");
319 assert_eq!(errors[1], "Error 2");
320 assert_eq!(errors[2], "Error 3");
321 }
322
323 #[test]
324 fn test_progress_tracker_thread_safety() {
325 let tracker = ProgressTracker::new(100);
326 let mut handles = vec![];
327
328 for _ in 0..10 {
330 let tracker_clone = ProgressTracker {
331 total: tracker.total,
332 completed: Arc::clone(&tracker.completed),
333 errors: Arc::clone(&tracker.errors),
334 };
335
336 let handle = thread::spawn(move || {
337 for _ in 0..10 {
338 tracker_clone.complete_one();
339 thread::sleep(Duration::from_micros(1));
340 }
341 });
342 handles.push(handle);
343 }
344
345 for handle in handles {
347 handle.join().unwrap();
348 }
349
350 assert_eq!(tracker.completed(), 100);
351 assert_eq!(tracker.progress(), 1.0);
352 }
353
354 #[test]
355 fn test_parallel_compiler_basic() {
356 let config = ParallelConfig::new();
357 let compiler = ParallelCompiler::new(config);
358
359 let items = vec![1, 2, 3, 4, 5];
360 let results = compiler.optimize_parallel(items, |x| x * 2);
361
362 assert_eq!(results, vec![2, 4, 6, 8, 10]);
363 }
364
365 #[test]
366 fn test_parallel_compiler_large_dataset() {
367 let config = ParallelConfig::new();
368 let compiler = ParallelCompiler::new(config);
369
370 let items: Vec<i32> = (1..=1000).collect();
371 let results = compiler.optimize_parallel(items, |x| x * x);
372
373 assert_eq!(results.len(), 1000);
374 assert_eq!(results[0], 1);
375 assert_eq!(results[999], 1000000);
376 }
377
378 #[test]
379 fn test_parallel_compiler_sequential_fallback() {
380 let config = ParallelConfig::new().with_parallel_optimization(false);
381 let compiler = ParallelCompiler::new(config);
382
383 let items = vec![1, 2, 3];
384 let results = compiler.optimize_parallel(items, |x| x + 1);
385
386 assert_eq!(results, vec![2, 3, 4]);
387 }
388
389 #[test]
390 fn test_parallel_compiler_small_workload_fallback() {
391 let config = ParallelConfig::new();
392 let compiler = ParallelCompiler::new(config);
393
394 let items = vec![42];
396 let results = compiler.optimize_parallel(items, |x| x * 2);
397
398 assert_eq!(results, vec![84]);
399 }
400
401 #[test]
402 fn test_compile_modules_parallel_success() {
403 let config = ParallelConfig::new();
404 let compiler = ParallelCompiler::new(config);
405
406 let modules = vec![
407 "Module1".to_string(),
408 "Module2".to_string(),
409 "Module3".to_string(),
410 ];
411
412 let results = compiler.compile_modules_parallel(modules, |name| {
413 Ok(create_test_module(name))
415 });
416
417 assert_eq!(results.len(), 3);
418 assert!(results.iter().all(|r| r.is_ok()));
419 }
420
421 #[test]
422 fn test_compile_modules_parallel_with_errors() {
423 let config = ParallelConfig::new();
424 let compiler = ParallelCompiler::new(config);
425
426 let modules = vec!["Good1".to_string(), "Bad".to_string(), "Good2".to_string()];
427
428 let results = compiler.compile_modules_parallel(modules, |name| {
429 if name == "Bad" {
430 Err(ModuleError::LoadError(
431 name.to_string(),
432 "Simulated error".to_string(),
433 ))
434 } else {
435 Ok(create_test_module(name))
436 }
437 });
438
439 assert_eq!(results.len(), 3);
440 assert!(results[0].is_ok());
441 assert!(results[1].is_err());
442 assert!(results[2].is_ok());
443 }
444
445 #[test]
446 fn test_compile_modules_sequential_for_small_workload() {
447 let config = ParallelConfig::new();
448 let compiler = ParallelCompiler::new(config);
449
450 let modules = vec!["SingleModule".to_string()];
452 let call_count = Arc::new(AtomicUsize::new(0));
453 let call_count_clone = Arc::clone(&call_count);
454
455 let results = compiler.compile_modules_parallel(modules, move |name| {
456 call_count_clone.fetch_add(1, Ordering::SeqCst);
457 Ok(create_test_module(name))
458 });
459
460 assert_eq!(results.len(), 1);
461 assert_eq!(call_count.load(Ordering::SeqCst), 1);
462 }
463
464 #[test]
465 fn test_parallel_optimization_preserves_order() {
466 let config = ParallelConfig::new();
467 let compiler = ParallelCompiler::new(config);
468
469 let items: Vec<usize> = (0..100).collect();
470 let results = compiler.optimize_parallel(items, |x| x);
471
472 for (i, &val) in results.iter().enumerate() {
474 assert_eq!(i, val);
475 }
476 }
477
478 #[test]
479 fn test_parallel_compiler_with_complex_computation() {
480 let config = ParallelConfig::new();
481 let compiler = ParallelCompiler::new(config);
482
483 let items = vec![10, 20, 30, 40, 50];
484 let results = compiler.optimize_parallel(items, |x| {
485 let mut sum = 0;
487 for i in 0..x {
488 sum += i;
489 }
490 sum
491 });
492
493 assert_eq!(results.len(), 5);
494 assert!(results.iter().all(|&x| x >= 0));
495 }
496}