Skip to main content

rumdl_lib/
parallel.rs

1/// Parallel file processing module for rumdl
2///
3/// This module implements file-level parallel execution of markdown linting
4/// to improve performance when processing multiple files.
5use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9/// Configuration for parallel execution
10#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12    /// Enable/disable parallel execution
13    pub enabled: bool,
14    /// Number of threads to use (None = auto-detect)
15    pub thread_count: Option<usize>,
16    /// Minimum number of files to enable parallel execution
17    pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21    fn default() -> Self {
22        Self {
23            enabled: true,
24            thread_count: None, // Auto-detect based on CPU cores
25            min_file_count: 2,  // At least 2 files to benefit from parallelization
26        }
27    }
28}
29
30/// File-level parallel processing for multiple files
31pub struct FileParallelProcessor {
32    config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36    pub fn new(config: ParallelConfig) -> Self {
37        Self { config }
38    }
39
40    pub fn with_default_config() -> Self {
41        Self::new(ParallelConfig::default())
42    }
43
44    /// Process multiple files in parallel
45    pub fn process_files(
46        &self,
47        files: &[(String, String)], // (path, content) pairs
48        rules: &[Box<dyn Rule>],
49    ) -> Result<Vec<(String, LintResult)>, String> {
50        if !self.should_use_parallel(files) {
51            // Fall back to sequential processing
52            return Ok(files
53                .iter()
54                .map(|(path, content)| {
55                    let result = crate::lint(
56                        content,
57                        rules,
58                        false,
59                        crate::config::MarkdownFlavor::Standard,
60                        Some(std::path::PathBuf::from(path)),
61                        None,
62                    );
63                    (path.clone(), result)
64                })
65                .collect());
66        }
67
68        // Set up thread pool if specified
69        if let Some(thread_count) = self.config.thread_count {
70            rayon::ThreadPoolBuilder::new()
71                .num_threads(thread_count)
72                .build_global()
73                .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
74        }
75
76        let results: Vec<(String, LintResult)> = files
77            .par_iter()
78            .map(|(path, content)| {
79                let start = Instant::now();
80                let result = crate::lint(
81                    content,
82                    rules,
83                    false,
84                    crate::config::MarkdownFlavor::Standard,
85                    Some(std::path::PathBuf::from(path)),
86                    None,
87                );
88                let duration = start.elapsed();
89
90                if duration.as_millis() > 1000 {
91                    log::debug!("File {path} took {duration:?}");
92                }
93
94                (path.clone(), result)
95            })
96            .collect();
97
98        Ok(results)
99    }
100
101    /// Determine if file-level parallel processing should be used
102    pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
103        if !self.config.enabled {
104            return false;
105        }
106
107        // Need at least minimum files to benefit from parallelization
108        if files.len() < self.config.min_file_count {
109            return false;
110        }
111
112        // Check if we have enough CPU cores
113        let cpu_cores = std::thread::available_parallelism()
114            .map(std::num::NonZero::get)
115            .unwrap_or(1);
116        if cpu_cores < 2 {
117            return false;
118        }
119
120        true
121    }
122}
123
124/// Performance comparison utilities
125pub struct ParallelPerformanceComparison {
126    pub sequential_time: std::time::Duration,
127    pub parallel_time: std::time::Duration,
128    pub speedup_factor: f64,
129    pub parallel_overhead: std::time::Duration,
130}
131
132impl ParallelPerformanceComparison {
133    pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
134        // Guard against division by zero: if parallel_time is zero, speedup is infinite
135        let speedup_factor = if parallel_time.is_zero() {
136            f64::INFINITY
137        } else {
138            sequential_time.as_secs_f64() / parallel_time.as_secs_f64()
139        };
140        let parallel_overhead = if parallel_time > sequential_time {
141            parallel_time.checked_sub(sequential_time).unwrap()
142        } else {
143            std::time::Duration::ZERO
144        };
145
146        Self {
147            sequential_time,
148            parallel_time,
149            speedup_factor,
150            parallel_overhead,
151        }
152    }
153
154    pub fn print_comparison(&self) {
155        println!("๐Ÿ”„ Parallel vs Sequential Performance:");
156        println!(
157            "   Sequential time: {:.3}ms",
158            self.sequential_time.as_secs_f64() * 1000.0
159        );
160        println!("   Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
161        println!("   Speedup factor: {:.2}x", self.speedup_factor);
162
163        if self.speedup_factor > 1.0 {
164            let improvement = (self.speedup_factor - 1.0) * 100.0;
165            println!("   Performance improvement: {improvement:.1}%");
166        } else {
167            let degradation = (1.0 - self.speedup_factor) * 100.0;
168            println!("   Performance degradation: {degradation:.1}%");
169            if self.parallel_overhead > std::time::Duration::ZERO {
170                println!(
171                    "   Parallel overhead: {:.3}ms",
172                    self.parallel_overhead.as_secs_f64() * 1000.0
173                );
174            }
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::config::Config;
183    use crate::rules::all_rules;
184
185    #[test]
186    fn test_parallel_config_defaults() {
187        let config = ParallelConfig::default();
188        assert!(config.enabled);
189        assert_eq!(config.min_file_count, 2);
190        assert!(config.thread_count.is_none());
191    }
192
193    #[test]
194    fn test_parallel_config_custom() {
195        let config = ParallelConfig {
196            enabled: false,
197            thread_count: Some(4),
198            min_file_count: 5,
199        };
200        assert!(!config.enabled);
201        assert_eq!(config.thread_count, Some(4));
202        assert_eq!(config.min_file_count, 5);
203    }
204
205    #[test]
206    fn test_should_use_parallel_logic() {
207        let processor = FileParallelProcessor::with_default_config();
208
209        // Single file should not use parallel
210        let single_file = vec![("test.md".to_string(), "# Test".to_string())];
211        assert!(!processor.should_use_parallel(&single_file));
212
213        // Multiple files should use parallel
214        let multiple_files = vec![
215            ("test1.md".to_string(), "# Test 1".to_string()),
216            ("test2.md".to_string(), "# Test 2".to_string()),
217        ];
218        assert!(processor.should_use_parallel(&multiple_files));
219
220        // Test with disabled parallel
221        let disabled_config = ParallelConfig {
222            enabled: false,
223            ..Default::default()
224        };
225        let disabled_processor = FileParallelProcessor::new(disabled_config);
226        assert!(!disabled_processor.should_use_parallel(&multiple_files));
227
228        // Test with high min_file_count
229        let high_threshold_config = ParallelConfig {
230            enabled: true,
231            min_file_count: 10,
232            ..Default::default()
233        };
234        let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
235        assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
236    }
237
238    #[test]
239    fn test_file_parallel_processing() {
240        let config = Config::default();
241        let rules = all_rules(&config);
242        let processor = FileParallelProcessor::with_default_config();
243
244        let test_files = vec![
245            ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
246            ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
247        ];
248
249        let results = processor.process_files(&test_files, &rules).unwrap();
250        assert_eq!(results.len(), 2);
251
252        // Verify all results are Ok
253        for (_, result) in results {
254            assert!(result.is_ok());
255        }
256    }
257
258    #[test]
259    fn test_empty_files_handling() {
260        let config = Config::default();
261        let rules = all_rules(&config);
262        let processor = FileParallelProcessor::with_default_config();
263
264        let empty_files: Vec<(String, String)> = vec![];
265        let results = processor.process_files(&empty_files, &rules).unwrap();
266        assert_eq!(results.len(), 0);
267    }
268
269    #[test]
270    fn test_large_file_count() {
271        let config = Config::default();
272        let rules = all_rules(&config);
273        let processor = FileParallelProcessor::with_default_config();
274
275        // Create many files to test parallel processing scalability
276        let test_files: Vec<(String, String)> = (0..100)
277            .map(|i| {
278                (
279                    format!("test{i}.md"),
280                    format!("# Test {i}\n\nContent with trailing spaces   \n"),
281                )
282            })
283            .collect();
284
285        let results = processor.process_files(&test_files, &rules).unwrap();
286        assert_eq!(results.len(), 100);
287
288        // Verify all results are Ok and have expected warnings
289        for (path, result) in &results {
290            assert!(result.is_ok(), "Failed processing {path}");
291            let warnings = result.as_ref().unwrap();
292            // Should have at least one warning for trailing spaces
293            assert!(!warnings.is_empty(), "Expected warnings for {path}");
294        }
295    }
296
297    #[test]
298    fn test_error_propagation() {
299        let config = Config::default();
300        let rules = all_rules(&config);
301        let processor = FileParallelProcessor::with_default_config();
302
303        // Include files with various edge cases that might trigger errors
304        let test_files = vec![
305            ("empty.md".to_string(), "".to_string()),
306            ("unicode.md".to_string(), "# ๆต‹่ฏ•ๆ ‡้ข˜\n\n่ฟ™ๆ˜ฏไธญๆ–‡ๅ†…ๅฎนใ€‚".to_string()),
307            (
308                "emoji.md".to_string(),
309                "# Title with ๐Ÿš€ emoji\n\n๐ŸŽ‰ Content!".to_string(),
310            ),
311            ("very_long_line.md".to_string(), "a".repeat(10000)), // Very long single line
312            ("many_lines.md".to_string(), "Line\n".repeat(10000)), // Many lines
313        ];
314
315        let results = processor.process_files(&test_files, &rules).unwrap();
316        assert_eq!(results.len(), 5);
317
318        // All should process successfully even with edge cases
319        for (path, result) in &results {
320            assert!(result.is_ok(), "Failed processing {path}");
321        }
322    }
323
324    #[test]
325    fn test_thread_count_configuration() {
326        let config = Config::default();
327        let rules = all_rules(&config);
328
329        // Test with specific thread count
330        let parallel_config = ParallelConfig {
331            enabled: true,
332            thread_count: Some(2),
333            min_file_count: 2,
334        };
335        let processor = FileParallelProcessor::new(parallel_config);
336
337        let test_files = vec![
338            ("test1.md".to_string(), "# Test 1".to_string()),
339            ("test2.md".to_string(), "# Test 2".to_string()),
340            ("test3.md".to_string(), "# Test 3".to_string()),
341            ("test4.md".to_string(), "# Test 4".to_string()),
342        ];
343
344        let results = processor.process_files(&test_files, &rules).unwrap();
345        assert_eq!(results.len(), 4);
346    }
347
348    #[test]
349    fn test_result_ordering_preservation() {
350        let config = Config::default();
351        let rules = all_rules(&config);
352        let processor = FileParallelProcessor::with_default_config();
353
354        let test_files: Vec<(String, String)> = (0..20)
355            .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
356            .collect();
357
358        let results = processor.process_files(&test_files, &rules).unwrap();
359
360        // Verify results maintain the same order as input
361        for (i, (path, _)) in results.iter().enumerate() {
362            assert_eq!(path, &format!("test{i:02}.md"));
363        }
364    }
365
366    #[test]
367    fn test_concurrent_rule_execution_safety() {
368        // This test ensures rules can be safely executed concurrently
369        let config = Config::default();
370        let rules = all_rules(&config);
371        let processor = FileParallelProcessor::with_default_config();
372
373        // Create files that will trigger the same rules
374        let test_files: Vec<(String, String)> = (0..10)
375            .map(|i| {
376                (
377                    format!("test{i}.md"),
378                    "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
379                )
380            })
381            .collect();
382
383        let results = processor.process_files(&test_files, &rules).unwrap();
384        assert_eq!(results.len(), 10);
385
386        // All files should produce the same warnings
387        let first_warnings = &results[0].1.as_ref().unwrap();
388        for (_, result) in results.iter().skip(1) {
389            let warnings = result.as_ref().unwrap();
390            assert_eq!(warnings.len(), first_warnings.len());
391        }
392    }
393
394    #[test]
395    fn test_performance_comparison() {
396        let seq_time = std::time::Duration::from_millis(1000);
397        let par_time = std::time::Duration::from_millis(400);
398
399        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
400
401        assert_eq!(comparison.sequential_time, seq_time);
402        assert_eq!(comparison.parallel_time, par_time);
403        assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
404        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
405    }
406
407    #[test]
408    fn test_performance_comparison_with_overhead() {
409        let seq_time = std::time::Duration::from_millis(100);
410        let par_time = std::time::Duration::from_millis(150);
411
412        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
413
414        assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
415        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
416    }
417
418    #[test]
419    fn test_fallback_to_sequential() {
420        let config = Config::default();
421        let rules = all_rules(&config);
422
423        // Force sequential processing
424        let sequential_config = ParallelConfig {
425            enabled: false,
426            ..Default::default()
427        };
428        let processor = FileParallelProcessor::new(sequential_config);
429
430        let test_files = vec![
431            ("test1.md".to_string(), "# Test 1".to_string()),
432            ("test2.md".to_string(), "# Test 2".to_string()),
433        ];
434
435        let results = processor.process_files(&test_files, &rules).unwrap();
436        assert_eq!(results.len(), 2);
437    }
438
439    #[test]
440    fn test_mixed_content_types() {
441        let config = Config::default();
442        let rules = all_rules(&config);
443        let processor = FileParallelProcessor::with_default_config();
444
445        let test_files = vec![
446            ("plain.md".to_string(), "Just plain text".to_string()),
447            ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
448            ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
449            (
450                "front_matter.md".to_string(),
451                "---\ntitle: Test\n---\n# Content".to_string(),
452            ),
453        ];
454
455        let results = processor.process_files(&test_files, &rules).unwrap();
456        assert_eq!(results.len(), 4);
457
458        for (_, result) in results {
459            assert!(result.is_ok());
460        }
461    }
462
463    #[test]
464    fn test_deterministic_results() {
465        // Ensure parallel processing produces the same results every time
466        let config = Config::default();
467        let rules = all_rules(&config);
468        let processor = FileParallelProcessor::with_default_config();
469
470        let test_files: Vec<(String, String)> = (0..10)
471            .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces   \n")))
472            .collect();
473
474        // Run multiple times
475        let results1 = processor.process_files(&test_files, &rules).unwrap();
476        let results2 = processor.process_files(&test_files, &rules).unwrap();
477        let results3 = processor.process_files(&test_files, &rules).unwrap();
478
479        // Compare warning counts for each file
480        for i in 0..test_files.len() {
481            let warnings1 = results1[i].1.as_ref().unwrap();
482            let warnings2 = results2[i].1.as_ref().unwrap();
483            let warnings3 = results3[i].1.as_ref().unwrap();
484
485            assert_eq!(warnings1.len(), warnings2.len());
486            assert_eq!(warnings2.len(), warnings3.len());
487        }
488    }
489
490    // =========================================================================
491    // Tests for ParallelPerformanceComparison edge cases
492    // =========================================================================
493
494    #[test]
495    fn test_performance_comparison_normal() {
496        let sequential = std::time::Duration::from_millis(100);
497        let parallel = std::time::Duration::from_millis(50);
498
499        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
500
501        assert_eq!(comparison.sequential_time, sequential);
502        assert_eq!(comparison.parallel_time, parallel);
503        assert!((comparison.speedup_factor - 2.0).abs() < 0.001);
504        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
505    }
506
507    #[test]
508    fn test_performance_comparison_zero_parallel_time() {
509        // Edge case: parallel_time is zero (instant completion)
510        let sequential = std::time::Duration::from_millis(100);
511        let parallel = std::time::Duration::ZERO;
512
513        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
514
515        // Should not panic, speedup should be infinity
516        assert!(comparison.speedup_factor.is_infinite());
517        assert!(comparison.speedup_factor.is_sign_positive());
518    }
519
520    #[test]
521    fn test_performance_comparison_both_zero() {
522        // Edge case: both times are zero
523        let sequential = std::time::Duration::ZERO;
524        let parallel = std::time::Duration::ZERO;
525
526        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
527
528        // Should not panic, speedup should be infinity (0/0 guarded)
529        assert!(comparison.speedup_factor.is_infinite());
530    }
531
532    #[test]
533    fn test_performance_comparison_parallel_slower() {
534        // Case where parallel is actually slower (overhead dominates)
535        let sequential = std::time::Duration::from_millis(10);
536        let parallel = std::time::Duration::from_millis(20);
537
538        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
539
540        assert!((comparison.speedup_factor - 0.5).abs() < 0.001);
541        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(10));
542    }
543
544    #[test]
545    fn test_performance_comparison_very_small_times() {
546        // Very small durations (nanoseconds)
547        let sequential = std::time::Duration::from_nanos(100);
548        let parallel = std::time::Duration::from_nanos(50);
549
550        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
551
552        // Should handle small durations without precision issues
553        assert!(comparison.speedup_factor > 1.0);
554    }
555}