Skip to main content

rumdl_lib/
parallel.rs

1/// Parallel file processing module for rumdl
2///
3/// This module implements file-level parallel execution of markdown linting
4/// to improve performance when processing multiple files.
5use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9/// Configuration for parallel execution
10#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12    /// Enable/disable parallel execution
13    pub enabled: bool,
14    /// Number of threads to use (None = auto-detect)
15    pub thread_count: Option<usize>,
16    /// Minimum number of files to enable parallel execution
17    pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21    fn default() -> Self {
22        Self {
23            enabled: true,
24            thread_count: None, // Auto-detect based on CPU cores
25            min_file_count: 2,  // At least 2 files to benefit from parallelization
26        }
27    }
28}
29
30/// File-level parallel processing for multiple files
31pub struct FileParallelProcessor {
32    config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36    pub fn new(config: ParallelConfig) -> Self {
37        Self { config }
38    }
39
40    pub fn with_default_config() -> Self {
41        Self::new(ParallelConfig::default())
42    }
43
44    /// Process multiple files in parallel
45    pub fn process_files(
46        &self,
47        files: &[(String, String)], // (path, content) pairs
48        rules: &[Box<dyn Rule>],
49    ) -> Result<Vec<(String, LintResult)>, String> {
50        if !self.should_use_parallel(files) {
51            // Fall back to sequential processing
52            return Ok(files
53                .iter()
54                .map(|(path, content)| {
55                    let result = crate::lint(
56                        content,
57                        rules,
58                        false,
59                        crate::config::MarkdownFlavor::Standard,
60                        Some(std::path::PathBuf::from(path)),
61                        None,
62                    );
63                    (path.clone(), result)
64                })
65                .collect());
66        }
67
68        // Set up thread pool if specified
69        if let Some(thread_count) = self.config.thread_count {
70            rayon::ThreadPoolBuilder::new()
71                .num_threads(thread_count)
72                .build_global()
73                .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
74        }
75
76        let results: Vec<(String, LintResult)> = files
77            .par_iter()
78            .map(|(path, content)| {
79                let start = Instant::now();
80                let result = crate::lint(
81                    content,
82                    rules,
83                    false,
84                    crate::config::MarkdownFlavor::Standard,
85                    Some(std::path::PathBuf::from(path)),
86                    None,
87                );
88                let duration = start.elapsed();
89
90                if duration.as_millis() > 1000 {
91                    log::debug!("File {path} took {duration:?}");
92                }
93
94                (path.clone(), result)
95            })
96            .collect();
97
98        Ok(results)
99    }
100
101    /// Determine if file-level parallel processing should be used
102    pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
103        if !self.config.enabled {
104            return false;
105        }
106
107        // Need at least minimum files to benefit from parallelization
108        if files.len() < self.config.min_file_count {
109            return false;
110        }
111
112        // Check if we have enough CPU cores
113        let cpu_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
114        if cpu_cores < 2 {
115            return false;
116        }
117
118        true
119    }
120}
121
122/// Performance comparison utilities
123pub struct ParallelPerformanceComparison {
124    pub sequential_time: std::time::Duration,
125    pub parallel_time: std::time::Duration,
126    pub speedup_factor: f64,
127    pub parallel_overhead: std::time::Duration,
128}
129
130impl ParallelPerformanceComparison {
131    pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
132        // Guard against division by zero: if parallel_time is zero, speedup is infinite
133        let speedup_factor = if parallel_time.is_zero() {
134            f64::INFINITY
135        } else {
136            sequential_time.as_secs_f64() / parallel_time.as_secs_f64()
137        };
138        let parallel_overhead = if parallel_time > sequential_time {
139            parallel_time - sequential_time
140        } else {
141            std::time::Duration::ZERO
142        };
143
144        Self {
145            sequential_time,
146            parallel_time,
147            speedup_factor,
148            parallel_overhead,
149        }
150    }
151
152    pub fn print_comparison(&self) {
153        println!("๐Ÿ”„ Parallel vs Sequential Performance:");
154        println!(
155            "   Sequential time: {:.3}ms",
156            self.sequential_time.as_secs_f64() * 1000.0
157        );
158        println!("   Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
159        println!("   Speedup factor: {:.2}x", self.speedup_factor);
160
161        if self.speedup_factor > 1.0 {
162            let improvement = (self.speedup_factor - 1.0) * 100.0;
163            println!("   Performance improvement: {improvement:.1}%");
164        } else {
165            let degradation = (1.0 - self.speedup_factor) * 100.0;
166            println!("   Performance degradation: {degradation:.1}%");
167            if self.parallel_overhead > std::time::Duration::ZERO {
168                println!(
169                    "   Parallel overhead: {:.3}ms",
170                    self.parallel_overhead.as_secs_f64() * 1000.0
171                );
172            }
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::config::Config;
181    use crate::rules::all_rules;
182
183    #[test]
184    fn test_parallel_config_defaults() {
185        let config = ParallelConfig::default();
186        assert!(config.enabled);
187        assert_eq!(config.min_file_count, 2);
188        assert!(config.thread_count.is_none());
189    }
190
191    #[test]
192    fn test_parallel_config_custom() {
193        let config = ParallelConfig {
194            enabled: false,
195            thread_count: Some(4),
196            min_file_count: 5,
197        };
198        assert!(!config.enabled);
199        assert_eq!(config.thread_count, Some(4));
200        assert_eq!(config.min_file_count, 5);
201    }
202
203    #[test]
204    fn test_should_use_parallel_logic() {
205        let processor = FileParallelProcessor::with_default_config();
206
207        // Single file should not use parallel
208        let single_file = vec![("test.md".to_string(), "# Test".to_string())];
209        assert!(!processor.should_use_parallel(&single_file));
210
211        // Multiple files should use parallel
212        let multiple_files = vec![
213            ("test1.md".to_string(), "# Test 1".to_string()),
214            ("test2.md".to_string(), "# Test 2".to_string()),
215        ];
216        assert!(processor.should_use_parallel(&multiple_files));
217
218        // Test with disabled parallel
219        let disabled_config = ParallelConfig {
220            enabled: false,
221            ..Default::default()
222        };
223        let disabled_processor = FileParallelProcessor::new(disabled_config);
224        assert!(!disabled_processor.should_use_parallel(&multiple_files));
225
226        // Test with high min_file_count
227        let high_threshold_config = ParallelConfig {
228            enabled: true,
229            min_file_count: 10,
230            ..Default::default()
231        };
232        let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
233        assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
234    }
235
236    #[test]
237    fn test_file_parallel_processing() {
238        let config = Config::default();
239        let rules = all_rules(&config);
240        let processor = FileParallelProcessor::with_default_config();
241
242        let test_files = vec![
243            ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
244            ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
245        ];
246
247        let results = processor.process_files(&test_files, &rules).unwrap();
248        assert_eq!(results.len(), 2);
249
250        // Verify all results are Ok
251        for (_, result) in results {
252            assert!(result.is_ok());
253        }
254    }
255
256    #[test]
257    fn test_empty_files_handling() {
258        let config = Config::default();
259        let rules = all_rules(&config);
260        let processor = FileParallelProcessor::with_default_config();
261
262        let empty_files: Vec<(String, String)> = vec![];
263        let results = processor.process_files(&empty_files, &rules).unwrap();
264        assert_eq!(results.len(), 0);
265    }
266
267    #[test]
268    fn test_large_file_count() {
269        let config = Config::default();
270        let rules = all_rules(&config);
271        let processor = FileParallelProcessor::with_default_config();
272
273        // Create many files to test parallel processing scalability
274        let test_files: Vec<(String, String)> = (0..100)
275            .map(|i| {
276                (
277                    format!("test{i}.md"),
278                    format!("# Test {i}\n\nContent with trailing spaces   \n"),
279                )
280            })
281            .collect();
282
283        let results = processor.process_files(&test_files, &rules).unwrap();
284        assert_eq!(results.len(), 100);
285
286        // Verify all results are Ok and have expected warnings
287        for (path, result) in &results {
288            assert!(result.is_ok(), "Failed processing {path}");
289            let warnings = result.as_ref().unwrap();
290            // Should have at least one warning for trailing spaces
291            assert!(!warnings.is_empty(), "Expected warnings for {path}");
292        }
293    }
294
295    #[test]
296    fn test_error_propagation() {
297        let config = Config::default();
298        let rules = all_rules(&config);
299        let processor = FileParallelProcessor::with_default_config();
300
301        // Include files with various edge cases that might trigger errors
302        let test_files = vec![
303            ("empty.md".to_string(), "".to_string()),
304            ("unicode.md".to_string(), "# ๆต‹่ฏ•ๆ ‡้ข˜\n\n่ฟ™ๆ˜ฏไธญๆ–‡ๅ†…ๅฎนใ€‚".to_string()),
305            (
306                "emoji.md".to_string(),
307                "# Title with ๐Ÿš€ emoji\n\n๐ŸŽ‰ Content!".to_string(),
308            ),
309            ("very_long_line.md".to_string(), "a".repeat(10000)), // Very long single line
310            ("many_lines.md".to_string(), "Line\n".repeat(10000)), // Many lines
311        ];
312
313        let results = processor.process_files(&test_files, &rules).unwrap();
314        assert_eq!(results.len(), 5);
315
316        // All should process successfully even with edge cases
317        for (path, result) in &results {
318            assert!(result.is_ok(), "Failed processing {path}");
319        }
320    }
321
322    #[test]
323    fn test_thread_count_configuration() {
324        let config = Config::default();
325        let rules = all_rules(&config);
326
327        // Test with specific thread count
328        let parallel_config = ParallelConfig {
329            enabled: true,
330            thread_count: Some(2),
331            min_file_count: 2,
332        };
333        let processor = FileParallelProcessor::new(parallel_config);
334
335        let test_files = vec![
336            ("test1.md".to_string(), "# Test 1".to_string()),
337            ("test2.md".to_string(), "# Test 2".to_string()),
338            ("test3.md".to_string(), "# Test 3".to_string()),
339            ("test4.md".to_string(), "# Test 4".to_string()),
340        ];
341
342        let results = processor.process_files(&test_files, &rules).unwrap();
343        assert_eq!(results.len(), 4);
344    }
345
346    #[test]
347    fn test_result_ordering_preservation() {
348        let config = Config::default();
349        let rules = all_rules(&config);
350        let processor = FileParallelProcessor::with_default_config();
351
352        let test_files: Vec<(String, String)> = (0..20)
353            .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
354            .collect();
355
356        let results = processor.process_files(&test_files, &rules).unwrap();
357
358        // Verify results maintain the same order as input
359        for (i, (path, _)) in results.iter().enumerate() {
360            assert_eq!(path, &format!("test{i:02}.md"));
361        }
362    }
363
364    #[test]
365    fn test_concurrent_rule_execution_safety() {
366        // This test ensures rules can be safely executed concurrently
367        let config = Config::default();
368        let rules = all_rules(&config);
369        let processor = FileParallelProcessor::with_default_config();
370
371        // Create files that will trigger the same rules
372        let test_files: Vec<(String, String)> = (0..10)
373            .map(|i| {
374                (
375                    format!("test{i}.md"),
376                    "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
377                )
378            })
379            .collect();
380
381        let results = processor.process_files(&test_files, &rules).unwrap();
382        assert_eq!(results.len(), 10);
383
384        // All files should produce the same warnings
385        let first_warnings = &results[0].1.as_ref().unwrap();
386        for (_, result) in results.iter().skip(1) {
387            let warnings = result.as_ref().unwrap();
388            assert_eq!(warnings.len(), first_warnings.len());
389        }
390    }
391
392    #[test]
393    fn test_performance_comparison() {
394        let seq_time = std::time::Duration::from_millis(1000);
395        let par_time = std::time::Duration::from_millis(400);
396
397        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
398
399        assert_eq!(comparison.sequential_time, seq_time);
400        assert_eq!(comparison.parallel_time, par_time);
401        assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
402        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
403    }
404
405    #[test]
406    fn test_performance_comparison_with_overhead() {
407        let seq_time = std::time::Duration::from_millis(100);
408        let par_time = std::time::Duration::from_millis(150);
409
410        let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
411
412        assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
413        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
414    }
415
416    #[test]
417    fn test_fallback_to_sequential() {
418        let config = Config::default();
419        let rules = all_rules(&config);
420
421        // Force sequential processing
422        let sequential_config = ParallelConfig {
423            enabled: false,
424            ..Default::default()
425        };
426        let processor = FileParallelProcessor::new(sequential_config);
427
428        let test_files = vec![
429            ("test1.md".to_string(), "# Test 1".to_string()),
430            ("test2.md".to_string(), "# Test 2".to_string()),
431        ];
432
433        let results = processor.process_files(&test_files, &rules).unwrap();
434        assert_eq!(results.len(), 2);
435    }
436
437    #[test]
438    fn test_mixed_content_types() {
439        let config = Config::default();
440        let rules = all_rules(&config);
441        let processor = FileParallelProcessor::with_default_config();
442
443        let test_files = vec![
444            ("plain.md".to_string(), "Just plain text".to_string()),
445            ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
446            ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
447            (
448                "front_matter.md".to_string(),
449                "---\ntitle: Test\n---\n# Content".to_string(),
450            ),
451        ];
452
453        let results = processor.process_files(&test_files, &rules).unwrap();
454        assert_eq!(results.len(), 4);
455
456        for (_, result) in results {
457            assert!(result.is_ok());
458        }
459    }
460
461    #[test]
462    fn test_deterministic_results() {
463        // Ensure parallel processing produces the same results every time
464        let config = Config::default();
465        let rules = all_rules(&config);
466        let processor = FileParallelProcessor::with_default_config();
467
468        let test_files: Vec<(String, String)> = (0..10)
469            .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces   \n")))
470            .collect();
471
472        // Run multiple times
473        let results1 = processor.process_files(&test_files, &rules).unwrap();
474        let results2 = processor.process_files(&test_files, &rules).unwrap();
475        let results3 = processor.process_files(&test_files, &rules).unwrap();
476
477        // Compare warning counts for each file
478        for i in 0..test_files.len() {
479            let warnings1 = results1[i].1.as_ref().unwrap();
480            let warnings2 = results2[i].1.as_ref().unwrap();
481            let warnings3 = results3[i].1.as_ref().unwrap();
482
483            assert_eq!(warnings1.len(), warnings2.len());
484            assert_eq!(warnings2.len(), warnings3.len());
485        }
486    }
487
488    // =========================================================================
489    // Tests for ParallelPerformanceComparison edge cases
490    // =========================================================================
491
492    #[test]
493    fn test_performance_comparison_normal() {
494        let sequential = std::time::Duration::from_millis(100);
495        let parallel = std::time::Duration::from_millis(50);
496
497        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
498
499        assert_eq!(comparison.sequential_time, sequential);
500        assert_eq!(comparison.parallel_time, parallel);
501        assert!((comparison.speedup_factor - 2.0).abs() < 0.001);
502        assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
503    }
504
505    #[test]
506    fn test_performance_comparison_zero_parallel_time() {
507        // Edge case: parallel_time is zero (instant completion)
508        let sequential = std::time::Duration::from_millis(100);
509        let parallel = std::time::Duration::ZERO;
510
511        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
512
513        // Should not panic, speedup should be infinity
514        assert!(comparison.speedup_factor.is_infinite());
515        assert!(comparison.speedup_factor.is_sign_positive());
516    }
517
518    #[test]
519    fn test_performance_comparison_both_zero() {
520        // Edge case: both times are zero
521        let sequential = std::time::Duration::ZERO;
522        let parallel = std::time::Duration::ZERO;
523
524        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
525
526        // Should not panic, speedup should be infinity (0/0 guarded)
527        assert!(comparison.speedup_factor.is_infinite());
528    }
529
530    #[test]
531    fn test_performance_comparison_parallel_slower() {
532        // Case where parallel is actually slower (overhead dominates)
533        let sequential = std::time::Duration::from_millis(10);
534        let parallel = std::time::Duration::from_millis(20);
535
536        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
537
538        assert!((comparison.speedup_factor - 0.5).abs() < 0.001);
539        assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(10));
540    }
541
542    #[test]
543    fn test_performance_comparison_very_small_times() {
544        // Very small durations (nanoseconds)
545        let sequential = std::time::Duration::from_nanos(100);
546        let parallel = std::time::Duration::from_nanos(50);
547
548        let comparison = ParallelPerformanceComparison::new(sequential, parallel);
549
550        // Should handle small durations without precision issues
551        assert!(comparison.speedup_factor > 1.0);
552    }
553}