Skip to main content

pascal/
parallel.rs

1//! Parallel compilation support using rayon
2//!
3//! This module provides multi-threaded compilation capabilities for:
4//! - Parallel module compilation
5//! - Parallel optimization passes
6//! - Concurrent PPU file loading
7
8use crate::{Module, ModuleResult};
9use rayon::prelude::*;
10use std::sync::{Arc, Mutex};
11
12/// Configuration for parallel compilation
13#[derive(Debug, Clone)]
14pub struct ParallelConfig {
15    /// Number of threads to use (0 = auto-detect)
16    pub num_threads: usize,
17
18    /// Enable parallel module compilation
19    pub parallel_modules: bool,
20
21    /// Enable parallel optimization passes
22    pub parallel_optimization: bool,
23
24    /// Minimum number of modules to enable parallelization
25    pub min_modules_for_parallel: usize,
26}
27
28impl Default for ParallelConfig {
29    fn default() -> Self {
30        Self {
31            num_threads: 0, // Auto-detect
32            parallel_modules: true,
33            parallel_optimization: true,
34            min_modules_for_parallel: 2,
35        }
36    }
37}
38
39impl ParallelConfig {
40    /// Create a new parallel configuration
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Set the number of threads
46    pub fn with_threads(mut self, num_threads: usize) -> Self {
47        self.num_threads = num_threads;
48        self
49    }
50
51    /// Enable or disable parallel module compilation
52    pub fn with_parallel_modules(mut self, enabled: bool) -> Self {
53        self.parallel_modules = enabled;
54        self
55    }
56
57    /// Enable or disable parallel optimization
58    pub fn with_parallel_optimization(mut self, enabled: bool) -> Self {
59        self.parallel_optimization = enabled;
60        self
61    }
62
63    /// Initialize the thread pool
64    pub fn init_thread_pool(&self) -> Result<(), String> {
65        if self.num_threads > 0 {
66            rayon::ThreadPoolBuilder::new()
67                .num_threads(self.num_threads)
68                .build_global()
69                .map_err(|e| format!("Failed to initialize thread pool: {}", e))?;
70        }
71        Ok(())
72    }
73}
74
75/// Parallel module compiler
76pub struct ParallelCompiler {
77    config: ParallelConfig,
78}
79
80impl ParallelCompiler {
81    /// Create a new parallel compiler
82    pub fn new(config: ParallelConfig) -> Self {
83        Self { config }
84    }
85
86    /// Compile multiple modules in parallel
87    /// Returns compiled modules or errors
88    pub fn compile_modules_parallel<F>(
89        &self,
90        module_names: Vec<String>,
91        compile_fn: F,
92    ) -> Vec<ModuleResult<Module>>
93    where
94        F: Fn(&str) -> ModuleResult<Module> + Sync + Send,
95    {
96        if !self.config.parallel_modules
97            || module_names.len() < self.config.min_modules_for_parallel
98        {
99            // Sequential compilation for small workloads
100            return module_names.iter().map(|name| compile_fn(name)).collect();
101        }
102
103        // Parallel compilation
104        module_names
105            .par_iter()
106            .map(|name| compile_fn(name))
107            .collect()
108    }
109
110    /// Process optimization passes in parallel
111    pub fn optimize_parallel<T, F>(&self, items: Vec<T>, optimize_fn: F) -> Vec<T>
112    where
113        T: Send,
114        F: Fn(T) -> T + Sync + Send,
115    {
116        if !self.config.parallel_optimization || items.len() < 2 {
117            return items.into_iter().map(optimize_fn).collect();
118        }
119
120        items.into_par_iter().map(optimize_fn).collect()
121    }
122
123    /// Load multiple PPU files in parallel
124    pub fn load_ppu_files_parallel<F>(
125        &self,
126        unit_names: Vec<String>,
127        load_fn: F,
128    ) -> Vec<ModuleResult<crate::ast::Unit>>
129    where
130        F: Fn(&str) -> ModuleResult<crate::ast::Unit> + Sync + Send,
131    {
132        if unit_names.len() < self.config.min_modules_for_parallel {
133            return unit_names.iter().map(|name| load_fn(name)).collect();
134        }
135
136        unit_names.par_iter().map(|name| load_fn(name)).collect()
137    }
138}
139
140/// Thread-safe compilation progress tracker
141pub struct ProgressTracker {
142    total: usize,
143    completed: Arc<Mutex<usize>>,
144    errors: Arc<Mutex<Vec<String>>>,
145}
146
147impl ProgressTracker {
148    /// Create a new progress tracker
149    pub fn new(total: usize) -> Self {
150        Self {
151            total,
152            completed: Arc::new(Mutex::new(0)),
153            errors: Arc::new(Mutex::new(Vec::new())),
154        }
155    }
156
157    /// Mark one item as completed
158    pub fn complete_one(&self) {
159        let mut completed = self.completed.lock().unwrap();
160        *completed += 1;
161    }
162
163    /// Add an error
164    pub fn add_error(&self, error: String) {
165        let mut errors = self.errors.lock().unwrap();
166        errors.push(error);
167    }
168
169    /// Get the current progress (0.0 to 1.0)
170    pub fn progress(&self) -> f64 {
171        let completed = *self.completed.lock().unwrap();
172        if self.total == 0 {
173            1.0
174        } else {
175            completed as f64 / self.total as f64
176        }
177    }
178
179    /// Get the number of completed items
180    pub fn completed(&self) -> usize {
181        *self.completed.lock().unwrap()
182    }
183
184    /// Get all errors
185    pub fn errors(&self) -> Vec<String> {
186        self.errors.lock().unwrap().clone()
187    }
188
189    /// Check if all items are completed
190    pub fn is_complete(&self) -> bool {
191        *self.completed.lock().unwrap() >= self.total
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use std::sync::atomic::{AtomicUsize, Ordering};
199    use std::thread;
200    use crate::ModuleError;
201    use std::time::Duration;
202
203    // Helper function to create a test module
204    fn create_test_module(name: &str) -> Module {
205        Module {
206            name: name.to_string(),
207            unit: crate::ast::Unit {
208                name: name.to_string(),
209                uses: vec![],
210                interface: crate::ast::UnitInterface {
211                    uses: vec![],
212                    types: vec![],
213                    constants: vec![],
214                    variables: vec![],
215                    procedures: vec![],
216                    functions: vec![],
217                    classes: vec![],
218                    interfaces: vec![],
219                },
220                implementation: crate::ast::UnitImplementation {
221                    uses: vec![],
222                    types: vec![],
223                    constants: vec![],
224                    variables: vec![],
225                    procedures: vec![],
226                    functions: vec![],
227                    classes: vec![],
228                    interfaces: vec![],
229                    initialization: None,
230                    finalization: None,
231                },
232            },
233            dependencies: vec![],
234        }
235    }
236
237    #[test]
238    fn test_parallel_config_default() {
239        let config = ParallelConfig::default();
240
241        assert_eq!(config.num_threads, 0); // Auto-detect
242        assert!(config.parallel_modules);
243        assert!(config.parallel_optimization);
244        assert_eq!(config.min_modules_for_parallel, 2);
245    }
246
247    #[test]
248    fn test_parallel_config_builder() {
249        let config = ParallelConfig::new()
250            .with_threads(4)
251            .with_parallel_modules(true)
252            .with_parallel_optimization(false);
253
254        assert_eq!(config.num_threads, 4);
255        assert!(config.parallel_modules);
256        assert!(!config.parallel_optimization);
257    }
258
259    #[test]
260    fn test_parallel_config_chaining() {
261        let config = ParallelConfig::new()
262            .with_threads(8)
263            .with_parallel_modules(false)
264            .with_parallel_optimization(true);
265
266        assert_eq!(config.num_threads, 8);
267        assert!(!config.parallel_modules);
268        assert!(config.parallel_optimization);
269    }
270
271    #[test]
272    fn test_progress_tracker_basic() {
273        let tracker = ProgressTracker::new(10);
274
275        assert_eq!(tracker.completed(), 0);
276        assert_eq!(tracker.progress(), 0.0);
277        assert!(!tracker.is_complete());
278
279        tracker.complete_one();
280        assert_eq!(tracker.completed(), 1);
281        assert_eq!(tracker.progress(), 0.1);
282
283        tracker.add_error("Test error".to_string());
284        assert_eq!(tracker.errors().len(), 1);
285    }
286
287    #[test]
288    fn test_progress_tracker_completion() {
289        let tracker = ProgressTracker::new(5);
290
291        for _ in 0..5 {
292            tracker.complete_one();
293        }
294
295        assert_eq!(tracker.completed(), 5);
296        assert_eq!(tracker.progress(), 1.0);
297        assert!(tracker.is_complete());
298    }
299
300    #[test]
301    fn test_progress_tracker_zero_total() {
302        let tracker = ProgressTracker::new(0);
303
304        assert_eq!(tracker.progress(), 1.0);
305        assert!(tracker.is_complete());
306    }
307
308    #[test]
309    fn test_progress_tracker_multiple_errors() {
310        let tracker = ProgressTracker::new(10);
311
312        tracker.add_error("Error 1".to_string());
313        tracker.add_error("Error 2".to_string());
314        tracker.add_error("Error 3".to_string());
315
316        let errors = tracker.errors();
317        assert_eq!(errors.len(), 3);
318        assert_eq!(errors[0], "Error 1");
319        assert_eq!(errors[1], "Error 2");
320        assert_eq!(errors[2], "Error 3");
321    }
322
323    #[test]
324    fn test_progress_tracker_thread_safety() {
325        let tracker = ProgressTracker::new(100);
326        let mut handles = vec![];
327
328        // Spawn 10 threads, each completing 10 items
329        for _ in 0..10 {
330            let tracker_clone = ProgressTracker {
331                total: tracker.total,
332                completed: Arc::clone(&tracker.completed),
333                errors: Arc::clone(&tracker.errors),
334            };
335
336            let handle = thread::spawn(move || {
337                for _ in 0..10 {
338                    tracker_clone.complete_one();
339                    thread::sleep(Duration::from_micros(1));
340                }
341            });
342            handles.push(handle);
343        }
344
345        // Wait for all threads
346        for handle in handles {
347            handle.join().unwrap();
348        }
349
350        assert_eq!(tracker.completed(), 100);
351        assert_eq!(tracker.progress(), 1.0);
352    }
353
354    #[test]
355    fn test_parallel_compiler_basic() {
356        let config = ParallelConfig::new();
357        let compiler = ParallelCompiler::new(config);
358
359        let items = vec![1, 2, 3, 4, 5];
360        let results = compiler.optimize_parallel(items, |x| x * 2);
361
362        assert_eq!(results, vec![2, 4, 6, 8, 10]);
363    }
364
365    #[test]
366    fn test_parallel_compiler_large_dataset() {
367        let config = ParallelConfig::new();
368        let compiler = ParallelCompiler::new(config);
369
370        let items: Vec<i32> = (1..=1000).collect();
371        let results = compiler.optimize_parallel(items, |x| x * x);
372
373        assert_eq!(results.len(), 1000);
374        assert_eq!(results[0], 1);
375        assert_eq!(results[999], 1000000);
376    }
377
378    #[test]
379    fn test_parallel_compiler_sequential_fallback() {
380        let config = ParallelConfig::new().with_parallel_optimization(false);
381        let compiler = ParallelCompiler::new(config);
382
383        let items = vec![1, 2, 3];
384        let results = compiler.optimize_parallel(items, |x| x + 1);
385
386        assert_eq!(results, vec![2, 3, 4]);
387    }
388
389    #[test]
390    fn test_parallel_compiler_small_workload_fallback() {
391        let config = ParallelConfig::new();
392        let compiler = ParallelCompiler::new(config);
393
394        // Single item should use sequential processing
395        let items = vec![42];
396        let results = compiler.optimize_parallel(items, |x| x * 2);
397
398        assert_eq!(results, vec![84]);
399    }
400
401    #[test]
402    fn test_compile_modules_parallel_success() {
403        let config = ParallelConfig::new();
404        let compiler = ParallelCompiler::new(config);
405
406        let modules = vec![
407            "Module1".to_string(),
408            "Module2".to_string(),
409            "Module3".to_string(),
410        ];
411
412        let results = compiler.compile_modules_parallel(modules, |name| {
413            // Simulate successful compilation
414            Ok(create_test_module(name))
415        });
416
417        assert_eq!(results.len(), 3);
418        assert!(results.iter().all(|r| r.is_ok()));
419    }
420
421    #[test]
422    fn test_compile_modules_parallel_with_errors() {
423        let config = ParallelConfig::new();
424        let compiler = ParallelCompiler::new(config);
425
426        let modules = vec!["Good1".to_string(), "Bad".to_string(), "Good2".to_string()];
427
428        let results = compiler.compile_modules_parallel(modules, |name| {
429            if name == "Bad" {
430                Err(ModuleError::LoadError(
431                    name.to_string(),
432                    "Simulated error".to_string(),
433                ))
434            } else {
435                Ok(create_test_module(name))
436            }
437        });
438
439        assert_eq!(results.len(), 3);
440        assert!(results[0].is_ok());
441        assert!(results[1].is_err());
442        assert!(results[2].is_ok());
443    }
444
445    #[test]
446    fn test_compile_modules_sequential_for_small_workload() {
447        let config = ParallelConfig::new();
448        let compiler = ParallelCompiler::new(config);
449
450        // Single module should use sequential processing
451        let modules = vec!["SingleModule".to_string()];
452        let call_count = Arc::new(AtomicUsize::new(0));
453        let call_count_clone = Arc::clone(&call_count);
454
455        let results = compiler.compile_modules_parallel(modules, move |name| {
456            call_count_clone.fetch_add(1, Ordering::SeqCst);
457            Ok(create_test_module(name))
458        });
459
460        assert_eq!(results.len(), 1);
461        assert_eq!(call_count.load(Ordering::SeqCst), 1);
462    }
463
464    #[test]
465    fn test_parallel_optimization_preserves_order() {
466        let config = ParallelConfig::new();
467        let compiler = ParallelCompiler::new(config);
468
469        let items: Vec<usize> = (0..100).collect();
470        let results = compiler.optimize_parallel(items, |x| x);
471
472        // Verify order is preserved
473        for (i, &val) in results.iter().enumerate() {
474            assert_eq!(i, val);
475        }
476    }
477
478    #[test]
479    fn test_parallel_compiler_with_complex_computation() {
480        let config = ParallelConfig::new();
481        let compiler = ParallelCompiler::new(config);
482
483        let items = vec![10, 20, 30, 40, 50];
484        let results = compiler.optimize_parallel(items, |x| {
485            // Simulate complex computation
486            let mut sum = 0;
487            for i in 0..x {
488                sum += i;
489            }
490            sum
491        });
492
493        assert_eq!(results.len(), 5);
494        assert!(results.iter().all(|&x| x >= 0));
495    }
496}