rumdl_lib/
parallel.rs

1/// Parallel file processing module for rumdl
2///
3/// This module implements file-level parallel execution of markdown linting
4/// to improve performance when processing multiple files.
5use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9/// Configuration for parallel execution
10#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12    /// Enable/disable parallel execution
13    pub enabled: bool,
14    /// Number of threads to use (None = auto-detect)
15    pub thread_count: Option<usize>,
16    /// Minimum number of files to enable parallel execution
17    pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21    fn default() -> Self {
22        Self {
23            enabled: true,
24            thread_count: None, // Auto-detect based on CPU cores
25            min_file_count: 2,  // At least 2 files to benefit from parallelization
26        }
27    }
28}
29
30/// File-level parallel processing for multiple files
31pub struct FileParallelProcessor {
32    config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36    pub fn new(config: ParallelConfig) -> Self {
37        Self { config }
38    }
39
40    pub fn with_default_config() -> Self {
41        Self::new(ParallelConfig::default())
42    }
43
44    /// Process multiple files in parallel
45    pub fn process_files(
46        &self,
47        files: &[(String, String)], // (path, content) pairs
48        rules: &[Box<dyn Rule>],
49    ) -> Result<Vec<(String, LintResult)>, String> {
50        if !self.should_use_parallel(files) {
51            // Fall back to sequential processing
52            return Ok(files
53                .iter()
54                .map(|(path, content)| {
55                    let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard);
56                    (path.clone(), result)
57                })
58                .collect());
59        }
60
61        // Set up thread pool if specified
62        if let Some(thread_count) = self.config.thread_count {
63            rayon::ThreadPoolBuilder::new()
64                .num_threads(thread_count)
65                .build_global()
66                .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
67        }
68
69        let results: Vec<(String, LintResult)> = files
70            .par_iter()
71            .map(|(path, content)| {
72                let start = Instant::now();
73                let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard);
74                let duration = start.elapsed();
75
76                if duration.as_millis() > 1000 {
77                    log::debug!("File {path} took {duration:?}");
78                }
79
80                (path.clone(), result)
81            })
82            .collect();
83
84        Ok(results)
85    }
86
87    /// Determine if file-level parallel processing should be used
88    pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
89        if !self.config.enabled {
90            return false;
91        }
92
93        // Need at least minimum files to benefit from parallelization
94        if files.len() < self.config.min_file_count {
95            return false;
96        }
97
98        // Check if we have enough CPU cores
99        let cpu_cores = num_cpus::get();
100        if cpu_cores < 2 {
101            return false;
102        }
103
104        true
105    }
106}
107
108/// Performance comparison utilities
109pub struct ParallelPerformanceComparison {
110    pub sequential_time: std::time::Duration,
111    pub parallel_time: std::time::Duration,
112    pub speedup_factor: f64,
113    pub parallel_overhead: std::time::Duration,
114}
115
116impl ParallelPerformanceComparison {
117    pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
118        let speedup_factor = sequential_time.as_secs_f64() / parallel_time.as_secs_f64();
119        let parallel_overhead = if parallel_time > sequential_time {
120            parallel_time - sequential_time
121        } else {
122            std::time::Duration::ZERO
123        };
124
125        Self {
126            sequential_time,
127            parallel_time,
128            speedup_factor,
129            parallel_overhead,
130        }
131    }
132
133    pub fn print_comparison(&self) {
134        println!("๐Ÿ”„ Parallel vs Sequential Performance:");
135        println!(
136            "   Sequential time: {:.3}ms",
137            self.sequential_time.as_secs_f64() * 1000.0
138        );
139        println!("   Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
140        println!("   Speedup factor: {:.2}x", self.speedup_factor);
141
142        if self.speedup_factor > 1.0 {
143            let improvement = (self.speedup_factor - 1.0) * 100.0;
144            println!("   Performance improvement: {improvement:.1}%");
145        } else {
146            let degradation = (1.0 - self.speedup_factor) * 100.0;
147            println!("   Performance degradation: {degradation:.1}%");
148            if self.parallel_overhead > std::time::Duration::ZERO {
149                println!(
150                    "   Parallel overhead: {:.3}ms",
151                    self.parallel_overhead.as_secs_f64() * 1000.0
152                );
153            }
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::config::Config;
162    use crate::rules::all_rules;
163
164    #[test]
165    fn test_parallel_config_defaults() {
166        let config = ParallelConfig::default();
167        assert!(config.enabled);
168        assert_eq!(config.min_file_count, 2);
169        assert!(config.thread_count.is_none());
170    }
171
172    #[test]
173    fn test_parallel_config_custom() {
174        let config = ParallelConfig {
175            enabled: false,
176            thread_count: Some(4),
177            min_file_count: 5,
178        };
179        assert!(!config.enabled);
180        assert_eq!(config.thread_count, Some(4));
181        assert_eq!(config.min_file_count, 5);
182    }
183
184    #[test]
185    fn test_should_use_parallel_logic() {
186        let processor = FileParallelProcessor::with_default_config();
187
188        // Single file should not use parallel
189        let single_file = vec![("test.md".to_string(), "# Test".to_string())];
190        assert!(!processor.should_use_parallel(&single_file));
191
192        // Multiple files should use parallel
193        let multiple_files = vec![
194            ("test1.md".to_string(), "# Test 1".to_string()),
195            ("test2.md".to_string(), "# Test 2".to_string()),
196        ];
197        assert!(processor.should_use_parallel(&multiple_files));
198
199        // Test with disabled parallel
200        let disabled_config = ParallelConfig {
201            enabled: false,
202            ..Default::default()
203        };
204        let disabled_processor = FileParallelProcessor::new(disabled_config);
205        assert!(!disabled_processor.should_use_parallel(&multiple_files));
206
207        // Test with high min_file_count
208        let high_threshold_config = ParallelConfig {
209            enabled: true,
210            min_file_count: 10,
211            ..Default::default()
212        };
213        let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
214        assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
215    }
216
217    #[test]
218    fn test_file_parallel_processing() {
219        let config = Config::default();
220        let rules = all_rules(&config);
221        let processor = FileParallelProcessor::with_default_config();
222
223        let test_files = vec![
224            ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
225            ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
226        ];
227
228        let results = processor.process_files(&test_files, &rules).unwrap();
229        assert_eq!(results.len(), 2);
230
231        // Verify all results are Ok
232        for (_, result) in results {
233            assert!(result.is_ok());
234        }
235    }
236
237    #[test]
238    fn test_empty_files_handling() {
239        let config = Config::default();
240        let rules = all_rules(&config);
241        let processor = FileParallelProcessor::with_default_config();
242
243        let empty_files: Vec<(String, String)> = vec![];
244        let results = processor.process_files(&empty_files, &rules).unwrap();
245        assert_eq!(results.len(), 0);
246    }
247
248    #[test]
249    fn test_large_file_count() {
250        let config = Config::default();
251        let rules = all_rules(&config);
252        let processor = FileParallelProcessor::with_default_config();
253
254        // Create many files to test parallel processing scalability
255        let test_files: Vec<(String, String)> = (0..100)
256            .map(|i| {
257                (
258                    format!("test{i}.md"),
259                    format!("# Test {i}\n\nContent with trailing spaces   \n"),
260                )
261            })
262            .collect();
263
264        let results = processor.process_files(&test_files, &rules).unwrap();
265        assert_eq!(results.len(), 100);
266
267        // Verify all results are Ok and have expected warnings
268        for (path, result) in &results {
269            assert!(result.is_ok(), "Failed processing {path}");
270            let warnings = result.as_ref().unwrap();
271            // Should have at least one warning for trailing spaces
272            assert!(!warnings.is_empty(), "Expected warnings for {path}");
273        }
274    }
275
276    #[test]
277    fn test_error_propagation() {
278        let config = Config::default();
279        let rules = all_rules(&config);
280        let processor = FileParallelProcessor::with_default_config();
281
282        // Include files with various edge cases that might trigger errors
283        let test_files = vec![
284            ("empty.md".to_string(), "".to_string()),
285            ("unicode.md".to_string(), "# ๆต‹่ฏ•ๆ ‡้ข˜\n\n่ฟ™ๆ˜ฏไธญๆ–‡ๅ†…ๅฎนใ€‚".to_string()),
286            (
287                "emoji.md".to_string(),
288                "# Title with ๐Ÿš€ emoji\n\n๐ŸŽ‰ Content!".to_string(),
289            ),
290            ("very_long_line.md".to_string(), "a".repeat(10000)), // Very long single line
291            ("many_lines.md".to_string(), "Line\n".repeat(10000)), // Many lines
292        ];
293
294        let results = processor.process_files(&test_files, &rules).unwrap();
295        assert_eq!(results.len(), 5);
296
297        // All should process successfully even with edge cases
298        for (path, result) in &results {
299            assert!(result.is_ok(), "Failed processing {path}");
300        }
301    }
302
303    #[test]
304    fn test_thread_count_configuration() {
305        let config = Config::default();
306        let rules = all_rules(&config);
307
308        // Test with specific thread count
309        let parallel_config = ParallelConfig {
310            enabled: true,
311            thread_count: Some(2),
312            min_file_count: 2,
313        };
314        let processor = FileParallelProcessor::new(parallel_config);
315
316        let test_files = vec![
317            ("test1.md".to_string(), "# Test 1".to_string()),
318            ("test2.md".to_string(), "# Test 2".to_string()),
319            ("test3.md".to_string(), "# Test 3".to_string()),
320            ("test4.md".to_string(), "# Test 4".to_string()),
321        ];
322
323        let results = processor.process_files(&test_files, &rules).unwrap();
324        assert_eq!(results.len(), 4);
325    }
326
327    #[test]
328    fn test_result_ordering_preservation() {
329        let config = Config::default();
330        let rules = all_rules(&config);
331        let processor = FileParallelProcessor::with_default_config();
332
333        let test_files: Vec<(String, String)> = (0..20)
334            .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
335            .collect();
336
337        let results = processor.process_files(&test_files, &rules).unwrap();
338
339        // Verify results maintain the same order as input
340        for (i, (path, _)) in results.iter().enumerate() {
341            assert_eq!(path, &format!("test{i:02}.md"));
342        }
343    }
344
345    #[test]
346    fn test_concurrent_rule_execution_safety() {
347        // This test ensures rules can be safely executed concurrently
348        let config = Config::default();
349        let rules = all_rules(&config);
350        let processor = FileParallelProcessor::with_default_config();
351
352        // Create files that will trigger the same rules
353        let test_files: Vec<(String, String)> = (0..10)
354            .map(|i| {
355                (
356                    format!("test{i}.md"),
357                    "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
358                )
359            })
360            .collect();
361
362        let results = processor.process_files(&test_files, &rules).unwrap();
363        assert_eq!(results.len(), 10);
364
365        // All files should produce the same warnings
366        let first_warnings = &results[0].1.as_ref().unwrap();
367        for (_, result) in results.iter().skip(1) {
368            let warnings = result.as_ref().unwrap();
369            assert_eq!(warnings.len(), first_warnings.len());
370        }
371    }
372
373    #[test]
374    fn test_performance_comparison() {
375        let seq_time = std::time::Duration::from_millis(1000);
376        let par_time = std::time::Duration::from_millis(400);
377
378        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
379
380        assert_eq!(comparison.sequential_time, seq_time);
381        assert_eq!(comparison.parallel_time, par_time);
382        assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
383        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
384    }
385
386    #[test]
387    fn test_performance_comparison_with_overhead() {
388        let seq_time = std::time::Duration::from_millis(100);
389        let par_time = std::time::Duration::from_millis(150);
390
391        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
392
393        assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
394        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
395    }
396
397    #[test]
398    fn test_fallback_to_sequential() {
399        let config = Config::default();
400        let rules = all_rules(&config);
401
402        // Force sequential processing
403        let sequential_config = ParallelConfig {
404            enabled: false,
405            ..Default::default()
406        };
407        let processor = FileParallelProcessor::new(sequential_config);
408
409        let test_files = vec![
410            ("test1.md".to_string(), "# Test 1".to_string()),
411            ("test2.md".to_string(), "# Test 2".to_string()),
412        ];
413
414        let results = processor.process_files(&test_files, &rules).unwrap();
415        assert_eq!(results.len(), 2);
416    }
417
418    #[test]
419    fn test_mixed_content_types() {
420        let config = Config::default();
421        let rules = all_rules(&config);
422        let processor = FileParallelProcessor::with_default_config();
423
424        let test_files = vec![
425            ("plain.md".to_string(), "Just plain text".to_string()),
426            ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
427            ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
428            (
429                "front_matter.md".to_string(),
430                "---\ntitle: Test\n---\n# Content".to_string(),
431            ),
432        ];
433
434        let results = processor.process_files(&test_files, &rules).unwrap();
435        assert_eq!(results.len(), 4);
436
437        for (_, result) in results {
438            assert!(result.is_ok());
439        }
440    }
441
442    #[test]
443    fn test_deterministic_results() {
444        // Ensure parallel processing produces the same results every time
445        let config = Config::default();
446        let rules = all_rules(&config);
447        let processor = FileParallelProcessor::with_default_config();
448
449        let test_files: Vec<(String, String)> = (0..10)
450            .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces   \n")))
451            .collect();
452
453        // Run multiple times
454        let results1 = processor.process_files(&test_files, &rules).unwrap();
455        let results2 = processor.process_files(&test_files, &rules).unwrap();
456        let results3 = processor.process_files(&test_files, &rules).unwrap();
457
458        // Compare warning counts for each file
459        for i in 0..test_files.len() {
460            let warnings1 = results1[i].1.as_ref().unwrap();
461            let warnings2 = results2[i].1.as_ref().unwrap();
462            let warnings3 = results3[i].1.as_ref().unwrap();
463
464            assert_eq!(warnings1.len(), warnings2.len());
465            assert_eq!(warnings2.len(), warnings3.len());
466        }
467    }
468}