rumdl_lib/
parallel.rs

1/// Parallel file processing module for rumdl
2///
3/// This module implements file-level parallel execution of markdown linting
4/// to improve performance when processing multiple files.
5use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9/// Configuration for parallel execution
10#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12    /// Enable/disable parallel execution
13    pub enabled: bool,
14    /// Number of threads to use (None = auto-detect)
15    pub thread_count: Option<usize>,
16    /// Minimum number of files to enable parallel execution
17    pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21    fn default() -> Self {
22        Self {
23            enabled: true,
24            thread_count: None, // Auto-detect based on CPU cores
25            min_file_count: 2,  // At least 2 files to benefit from parallelization
26        }
27    }
28}
29
30/// File-level parallel processing for multiple files
31pub struct FileParallelProcessor {
32    config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36    pub fn new(config: ParallelConfig) -> Self {
37        Self { config }
38    }
39
40    pub fn with_default_config() -> Self {
41        Self::new(ParallelConfig::default())
42    }
43
44    /// Process multiple files in parallel
45    pub fn process_files(
46        &self,
47        files: &[(String, String)], // (path, content) pairs
48        rules: &[Box<dyn Rule>],
49    ) -> Result<Vec<(String, LintResult)>, String> {
50        if !self.should_use_parallel(files) {
51            // Fall back to sequential processing
52            return Ok(files
53                .iter()
54                .map(|(path, content)| {
55                    let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard, None);
56                    (path.clone(), result)
57                })
58                .collect());
59        }
60
61        // Set up thread pool if specified
62        if let Some(thread_count) = self.config.thread_count {
63            rayon::ThreadPoolBuilder::new()
64                .num_threads(thread_count)
65                .build_global()
66                .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
67        }
68
69        let results: Vec<(String, LintResult)> = files
70            .par_iter()
71            .map(|(path, content)| {
72                let start = Instant::now();
73                let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard, None);
74                let duration = start.elapsed();
75
76                if duration.as_millis() > 1000 {
77                    log::debug!("File {path} took {duration:?}");
78                }
79
80                (path.clone(), result)
81            })
82            .collect();
83
84        Ok(results)
85    }
86
87    /// Determine if file-level parallel processing should be used
88    pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
89        if !self.config.enabled {
90            return false;
91        }
92
93        // Need at least minimum files to benefit from parallelization
94        if files.len() < self.config.min_file_count {
95            return false;
96        }
97
98        // Check if we have enough CPU cores
99        let cpu_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
100        if cpu_cores < 2 {
101            return false;
102        }
103
104        true
105    }
106}
107
108/// Performance comparison utilities
109pub struct ParallelPerformanceComparison {
110    pub sequential_time: std::time::Duration,
111    pub parallel_time: std::time::Duration,
112    pub speedup_factor: f64,
113    pub parallel_overhead: std::time::Duration,
114}
115
116impl ParallelPerformanceComparison {
117    pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
118        // Guard against division by zero: if parallel_time is zero, speedup is infinite
119        let speedup_factor = if parallel_time.is_zero() {
120            f64::INFINITY
121        } else {
122            sequential_time.as_secs_f64() / parallel_time.as_secs_f64()
123        };
124        let parallel_overhead = if parallel_time > sequential_time {
125            parallel_time - sequential_time
126        } else {
127            std::time::Duration::ZERO
128        };
129
130        Self {
131            sequential_time,
132            parallel_time,
133            speedup_factor,
134            parallel_overhead,
135        }
136    }
137
138    pub fn print_comparison(&self) {
139        println!("๐Ÿ”„ Parallel vs Sequential Performance:");
140        println!(
141            "   Sequential time: {:.3}ms",
142            self.sequential_time.as_secs_f64() * 1000.0
143        );
144        println!("   Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
145        println!("   Speedup factor: {:.2}x", self.speedup_factor);
146
147        if self.speedup_factor > 1.0 {
148            let improvement = (self.speedup_factor - 1.0) * 100.0;
149            println!("   Performance improvement: {improvement:.1}%");
150        } else {
151            let degradation = (1.0 - self.speedup_factor) * 100.0;
152            println!("   Performance degradation: {degradation:.1}%");
153            if self.parallel_overhead > std::time::Duration::ZERO {
154                println!(
155                    "   Parallel overhead: {:.3}ms",
156                    self.parallel_overhead.as_secs_f64() * 1000.0
157                );
158            }
159        }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::config::Config;
167    use crate::rules::all_rules;
168
169    #[test]
170    fn test_parallel_config_defaults() {
171        let config = ParallelConfig::default();
172        assert!(config.enabled);
173        assert_eq!(config.min_file_count, 2);
174        assert!(config.thread_count.is_none());
175    }
176
177    #[test]
178    fn test_parallel_config_custom() {
179        let config = ParallelConfig {
180            enabled: false,
181            thread_count: Some(4),
182            min_file_count: 5,
183        };
184        assert!(!config.enabled);
185        assert_eq!(config.thread_count, Some(4));
186        assert_eq!(config.min_file_count, 5);
187    }
188
189    #[test]
190    fn test_should_use_parallel_logic() {
191        let processor = FileParallelProcessor::with_default_config();
192
193        // Single file should not use parallel
194        let single_file = vec![("test.md".to_string(), "# Test".to_string())];
195        assert!(!processor.should_use_parallel(&single_file));
196
197        // Multiple files should use parallel
198        let multiple_files = vec![
199            ("test1.md".to_string(), "# Test 1".to_string()),
200            ("test2.md".to_string(), "# Test 2".to_string()),
201        ];
202        assert!(processor.should_use_parallel(&multiple_files));
203
204        // Test with disabled parallel
205        let disabled_config = ParallelConfig {
206            enabled: false,
207            ..Default::default()
208        };
209        let disabled_processor = FileParallelProcessor::new(disabled_config);
210        assert!(!disabled_processor.should_use_parallel(&multiple_files));
211
212        // Test with high min_file_count
213        let high_threshold_config = ParallelConfig {
214            enabled: true,
215            min_file_count: 10,
216            ..Default::default()
217        };
218        let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
219        assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
220    }
221
222    #[test]
223    fn test_file_parallel_processing() {
224        let config = Config::default();
225        let rules = all_rules(&config);
226        let processor = FileParallelProcessor::with_default_config();
227
228        let test_files = vec![
229            ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
230            ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
231        ];
232
233        let results = processor.process_files(&test_files, &rules).unwrap();
234        assert_eq!(results.len(), 2);
235
236        // Verify all results are Ok
237        for (_, result) in results {
238            assert!(result.is_ok());
239        }
240    }
241
242    #[test]
243    fn test_empty_files_handling() {
244        let config = Config::default();
245        let rules = all_rules(&config);
246        let processor = FileParallelProcessor::with_default_config();
247
248        let empty_files: Vec<(String, String)> = vec![];
249        let results = processor.process_files(&empty_files, &rules).unwrap();
250        assert_eq!(results.len(), 0);
251    }
252
253    #[test]
254    fn test_large_file_count() {
255        let config = Config::default();
256        let rules = all_rules(&config);
257        let processor = FileParallelProcessor::with_default_config();
258
259        // Create many files to test parallel processing scalability
260        let test_files: Vec<(String, String)> = (0..100)
261            .map(|i| {
262                (
263                    format!("test{i}.md"),
264                    format!("# Test {i}\n\nContent with trailing spaces   \n"),
265                )
266            })
267            .collect();
268
269        let results = processor.process_files(&test_files, &rules).unwrap();
270        assert_eq!(results.len(), 100);
271
272        // Verify all results are Ok and have expected warnings
273        for (path, result) in &results {
274            assert!(result.is_ok(), "Failed processing {path}");
275            let warnings = result.as_ref().unwrap();
276            // Should have at least one warning for trailing spaces
277            assert!(!warnings.is_empty(), "Expected warnings for {path}");
278        }
279    }
280
281    #[test]
282    fn test_error_propagation() {
283        let config = Config::default();
284        let rules = all_rules(&config);
285        let processor = FileParallelProcessor::with_default_config();
286
287        // Include files with various edge cases that might trigger errors
288        let test_files = vec![
289            ("empty.md".to_string(), "".to_string()),
290            ("unicode.md".to_string(), "# ๆต‹่ฏ•ๆ ‡้ข˜\n\n่ฟ™ๆ˜ฏไธญๆ–‡ๅ†…ๅฎนใ€‚".to_string()),
291            (
292                "emoji.md".to_string(),
293                "# Title with ๐Ÿš€ emoji\n\n๐ŸŽ‰ Content!".to_string(),
294            ),
295            ("very_long_line.md".to_string(), "a".repeat(10000)), // Very long single line
296            ("many_lines.md".to_string(), "Line\n".repeat(10000)), // Many lines
297        ];
298
299        let results = processor.process_files(&test_files, &rules).unwrap();
300        assert_eq!(results.len(), 5);
301
302        // All should process successfully even with edge cases
303        for (path, result) in &results {
304            assert!(result.is_ok(), "Failed processing {path}");
305        }
306    }
307
308    #[test]
309    fn test_thread_count_configuration() {
310        let config = Config::default();
311        let rules = all_rules(&config);
312
313        // Test with specific thread count
314        let parallel_config = ParallelConfig {
315            enabled: true,
316            thread_count: Some(2),
317            min_file_count: 2,
318        };
319        let processor = FileParallelProcessor::new(parallel_config);
320
321        let test_files = vec![
322            ("test1.md".to_string(), "# Test 1".to_string()),
323            ("test2.md".to_string(), "# Test 2".to_string()),
324            ("test3.md".to_string(), "# Test 3".to_string()),
325            ("test4.md".to_string(), "# Test 4".to_string()),
326        ];
327
328        let results = processor.process_files(&test_files, &rules).unwrap();
329        assert_eq!(results.len(), 4);
330    }
331
332    #[test]
333    fn test_result_ordering_preservation() {
334        let config = Config::default();
335        let rules = all_rules(&config);
336        let processor = FileParallelProcessor::with_default_config();
337
338        let test_files: Vec<(String, String)> = (0..20)
339            .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
340            .collect();
341
342        let results = processor.process_files(&test_files, &rules).unwrap();
343
344        // Verify results maintain the same order as input
345        for (i, (path, _)) in results.iter().enumerate() {
346            assert_eq!(path, &format!("test{i:02}.md"));
347        }
348    }
349
350    #[test]
351    fn test_concurrent_rule_execution_safety() {
352        // This test ensures rules can be safely executed concurrently
353        let config = Config::default();
354        let rules = all_rules(&config);
355        let processor = FileParallelProcessor::with_default_config();
356
357        // Create files that will trigger the same rules
358        let test_files: Vec<(String, String)> = (0..10)
359            .map(|i| {
360                (
361                    format!("test{i}.md"),
362                    "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
363                )
364            })
365            .collect();
366
367        let results = processor.process_files(&test_files, &rules).unwrap();
368        assert_eq!(results.len(), 10);
369
370        // All files should produce the same warnings
371        let first_warnings = &results[0].1.as_ref().unwrap();
372        for (_, result) in results.iter().skip(1) {
373            let warnings = result.as_ref().unwrap();
374            assert_eq!(warnings.len(), first_warnings.len());
375        }
376    }
377
378    #[test]
379    fn test_performance_comparison() {
380        let seq_time = std::time::Duration::from_millis(1000);
381        let par_time = std::time::Duration::from_millis(400);
382
383        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
384
385        assert_eq!(comparison.sequential_time, seq_time);
386        assert_eq!(comparison.parallel_time, par_time);
387        assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
388        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
389    }
390
391    #[test]
392    fn test_performance_comparison_with_overhead() {
393        let seq_time = std::time::Duration::from_millis(100);
394        let par_time = std::time::Duration::from_millis(150);
395
396        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
397
398        assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
399        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
400    }
401
402    #[test]
403    fn test_fallback_to_sequential() {
404        let config = Config::default();
405        let rules = all_rules(&config);
406
407        // Force sequential processing
408        let sequential_config = ParallelConfig {
409            enabled: false,
410            ..Default::default()
411        };
412        let processor = FileParallelProcessor::new(sequential_config);
413
414        let test_files = vec![
415            ("test1.md".to_string(), "# Test 1".to_string()),
416            ("test2.md".to_string(), "# Test 2".to_string()),
417        ];
418
419        let results = processor.process_files(&test_files, &rules).unwrap();
420        assert_eq!(results.len(), 2);
421    }
422
423    #[test]
424    fn test_mixed_content_types() {
425        let config = Config::default();
426        let rules = all_rules(&config);
427        let processor = FileParallelProcessor::with_default_config();
428
429        let test_files = vec![
430            ("plain.md".to_string(), "Just plain text".to_string()),
431            ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
432            ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
433            (
434                "front_matter.md".to_string(),
435                "---\ntitle: Test\n---\n# Content".to_string(),
436            ),
437        ];
438
439        let results = processor.process_files(&test_files, &rules).unwrap();
440        assert_eq!(results.len(), 4);
441
442        for (_, result) in results {
443            assert!(result.is_ok());
444        }
445    }
446
447    #[test]
448    fn test_deterministic_results() {
449        // Ensure parallel processing produces the same results every time
450        let config = Config::default();
451        let rules = all_rules(&config);
452        let processor = FileParallelProcessor::with_default_config();
453
454        let test_files: Vec<(String, String)> = (0..10)
455            .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces   \n")))
456            .collect();
457
458        // Run multiple times
459        let results1 = processor.process_files(&test_files, &rules).unwrap();
460        let results2 = processor.process_files(&test_files, &rules).unwrap();
461        let results3 = processor.process_files(&test_files, &rules).unwrap();
462
463        // Compare warning counts for each file
464        for i in 0..test_files.len() {
465            let warnings1 = results1[i].1.as_ref().unwrap();
466            let warnings2 = results2[i].1.as_ref().unwrap();
467            let warnings3 = results3[i].1.as_ref().unwrap();
468
469            assert_eq!(warnings1.len(), warnings2.len());
470            assert_eq!(warnings2.len(), warnings3.len());
471        }
472    }
473
474    // =========================================================================
475    // Tests for ParallelPerformanceComparison edge cases
476    // =========================================================================
477
478    #[test]
479    fn test_performance_comparison_normal() {
480        let sequential = std::time::Duration::from_millis(100);
481        let parallel = std::time::Duration::from_millis(50);
482
483        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
484
485        assert_eq!(comparison.sequential_time, sequential);
486        assert_eq!(comparison.parallel_time, parallel);
487        assert!((comparison.speedup_factor - 2.0).abs() < 0.001);
488        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
489    }
490
491    #[test]
492    fn test_performance_comparison_zero_parallel_time() {
493        // Edge case: parallel_time is zero (instant completion)
494        let sequential = std::time::Duration::from_millis(100);
495        let parallel = std::time::Duration::ZERO;
496
497        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
498
499        // Should not panic, speedup should be infinity
500        assert!(comparison.speedup_factor.is_infinite());
501        assert!(comparison.speedup_factor.is_sign_positive());
502    }
503
504    #[test]
505    fn test_performance_comparison_both_zero() {
506        // Edge case: both times are zero
507        let sequential = std::time::Duration::ZERO;
508        let parallel = std::time::Duration::ZERO;
509
510        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
511
512        // Should not panic, speedup should be infinity (0/0 guarded)
513        assert!(comparison.speedup_factor.is_infinite());
514    }
515
516    #[test]
517    fn test_performance_comparison_parallel_slower() {
518        // Case where parallel is actually slower (overhead dominates)
519        let sequential = std::time::Duration::from_millis(10);
520        let parallel = std::time::Duration::from_millis(20);
521
522        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
523
524        assert!((comparison.speedup_factor - 0.5).abs() < 0.001);
525        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(10));
526    }
527
528    #[test]
529    fn test_performance_comparison_very_small_times() {
530        // Very small durations (nanoseconds)
531        let sequential = std::time::Duration::from_nanos(100);
532        let parallel = std::time::Duration::from_nanos(50);
533
534        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
535
536        // Should handle small durations without precision issues
537        assert!(comparison.speedup_factor > 1.0);
538    }
539}