1use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12 pub enabled: bool,
14 pub thread_count: Option<usize>,
16 pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21 fn default() -> Self {
22 Self {
23 enabled: true,
24 thread_count: None, min_file_count: 2, }
27 }
28}
29
30pub struct FileParallelProcessor {
32 config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36 pub fn new(config: ParallelConfig) -> Self {
37 Self { config }
38 }
39
40 pub fn with_default_config() -> Self {
41 Self::new(ParallelConfig::default())
42 }
43
44 pub fn process_files(
46 &self,
47 files: &[(String, String)], rules: &[Box<dyn Rule>],
49 ) -> Result<Vec<(String, LintResult)>, String> {
50 if !self.should_use_parallel(files) {
51 return Ok(files
53 .iter()
54 .map(|(path, content)| {
55 let result = crate::lint(
56 content,
57 rules,
58 false,
59 crate::config::MarkdownFlavor::Standard,
60 Some(std::path::PathBuf::from(path)),
61 None,
62 );
63 (path.clone(), result)
64 })
65 .collect());
66 }
67
68 if let Some(thread_count) = self.config.thread_count {
70 rayon::ThreadPoolBuilder::new()
71 .num_threads(thread_count)
72 .build_global()
73 .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
74 }
75
76 let results: Vec<(String, LintResult)> = files
77 .par_iter()
78 .map(|(path, content)| {
79 let start = Instant::now();
80 let result = crate::lint(
81 content,
82 rules,
83 false,
84 crate::config::MarkdownFlavor::Standard,
85 Some(std::path::PathBuf::from(path)),
86 None,
87 );
88 let duration = start.elapsed();
89
90 if duration.as_millis() > 1000 {
91 log::debug!("File {path} took {duration:?}");
92 }
93
94 (path.clone(), result)
95 })
96 .collect();
97
98 Ok(results)
99 }
100
101 pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
103 if !self.config.enabled {
104 return false;
105 }
106
107 if files.len() < self.config.min_file_count {
109 return false;
110 }
111
112 let cpu_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
114 if cpu_cores < 2 {
115 return false;
116 }
117
118 true
119 }
120}
121
122pub struct ParallelPerformanceComparison {
124 pub sequential_time: std::time::Duration,
125 pub parallel_time: std::time::Duration,
126 pub speedup_factor: f64,
127 pub parallel_overhead: std::time::Duration,
128}
129
130impl ParallelPerformanceComparison {
131 pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
132 let speedup_factor = if parallel_time.is_zero() {
134 f64::INFINITY
135 } else {
136 sequential_time.as_secs_f64() / parallel_time.as_secs_f64()
137 };
138 let parallel_overhead = if parallel_time > sequential_time {
139 parallel_time - sequential_time
140 } else {
141 std::time::Duration::ZERO
142 };
143
144 Self {
145 sequential_time,
146 parallel_time,
147 speedup_factor,
148 parallel_overhead,
149 }
150 }
151
152 pub fn print_comparison(&self) {
153 println!("๐ Parallel vs Sequential Performance:");
154 println!(
155 " Sequential time: {:.3}ms",
156 self.sequential_time.as_secs_f64() * 1000.0
157 );
158 println!(" Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
159 println!(" Speedup factor: {:.2}x", self.speedup_factor);
160
161 if self.speedup_factor > 1.0 {
162 let improvement = (self.speedup_factor - 1.0) * 100.0;
163 println!(" Performance improvement: {improvement:.1}%");
164 } else {
165 let degradation = (1.0 - self.speedup_factor) * 100.0;
166 println!(" Performance degradation: {degradation:.1}%");
167 if self.parallel_overhead > std::time::Duration::ZERO {
168 println!(
169 " Parallel overhead: {:.3}ms",
170 self.parallel_overhead.as_secs_f64() * 1000.0
171 );
172 }
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::config::Config;
181 use crate::rules::all_rules;
182
183 #[test]
184 fn test_parallel_config_defaults() {
185 let config = ParallelConfig::default();
186 assert!(config.enabled);
187 assert_eq!(config.min_file_count, 2);
188 assert!(config.thread_count.is_none());
189 }
190
191 #[test]
192 fn test_parallel_config_custom() {
193 let config = ParallelConfig {
194 enabled: false,
195 thread_count: Some(4),
196 min_file_count: 5,
197 };
198 assert!(!config.enabled);
199 assert_eq!(config.thread_count, Some(4));
200 assert_eq!(config.min_file_count, 5);
201 }
202
203 #[test]
204 fn test_should_use_parallel_logic() {
205 let processor = FileParallelProcessor::with_default_config();
206
207 let single_file = vec![("test.md".to_string(), "# Test".to_string())];
209 assert!(!processor.should_use_parallel(&single_file));
210
211 let multiple_files = vec![
213 ("test1.md".to_string(), "# Test 1".to_string()),
214 ("test2.md".to_string(), "# Test 2".to_string()),
215 ];
216 assert!(processor.should_use_parallel(&multiple_files));
217
218 let disabled_config = ParallelConfig {
220 enabled: false,
221 ..Default::default()
222 };
223 let disabled_processor = FileParallelProcessor::new(disabled_config);
224 assert!(!disabled_processor.should_use_parallel(&multiple_files));
225
226 let high_threshold_config = ParallelConfig {
228 enabled: true,
229 min_file_count: 10,
230 ..Default::default()
231 };
232 let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
233 assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
234 }
235
236 #[test]
237 fn test_file_parallel_processing() {
238 let config = Config::default();
239 let rules = all_rules(&config);
240 let processor = FileParallelProcessor::with_default_config();
241
242 let test_files = vec![
243 ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
244 ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
245 ];
246
247 let results = processor.process_files(&test_files, &rules).unwrap();
248 assert_eq!(results.len(), 2);
249
250 for (_, result) in results {
252 assert!(result.is_ok());
253 }
254 }
255
256 #[test]
257 fn test_empty_files_handling() {
258 let config = Config::default();
259 let rules = all_rules(&config);
260 let processor = FileParallelProcessor::with_default_config();
261
262 let empty_files: Vec<(String, String)> = vec![];
263 let results = processor.process_files(&empty_files, &rules).unwrap();
264 assert_eq!(results.len(), 0);
265 }
266
267 #[test]
268 fn test_large_file_count() {
269 let config = Config::default();
270 let rules = all_rules(&config);
271 let processor = FileParallelProcessor::with_default_config();
272
273 let test_files: Vec<(String, String)> = (0..100)
275 .map(|i| {
276 (
277 format!("test{i}.md"),
278 format!("# Test {i}\n\nContent with trailing spaces \n"),
279 )
280 })
281 .collect();
282
283 let results = processor.process_files(&test_files, &rules).unwrap();
284 assert_eq!(results.len(), 100);
285
286 for (path, result) in &results {
288 assert!(result.is_ok(), "Failed processing {path}");
289 let warnings = result.as_ref().unwrap();
290 assert!(!warnings.is_empty(), "Expected warnings for {path}");
292 }
293 }
294
295 #[test]
296 fn test_error_propagation() {
297 let config = Config::default();
298 let rules = all_rules(&config);
299 let processor = FileParallelProcessor::with_default_config();
300
301 let test_files = vec![
303 ("empty.md".to_string(), "".to_string()),
304 ("unicode.md".to_string(), "# ๆต่ฏๆ ้ข\n\n่ฟๆฏไธญๆๅ
ๅฎนใ".to_string()),
305 (
306 "emoji.md".to_string(),
307 "# Title with ๐ emoji\n\n๐ Content!".to_string(),
308 ),
309 ("very_long_line.md".to_string(), "a".repeat(10000)), ("many_lines.md".to_string(), "Line\n".repeat(10000)), ];
312
313 let results = processor.process_files(&test_files, &rules).unwrap();
314 assert_eq!(results.len(), 5);
315
316 for (path, result) in &results {
318 assert!(result.is_ok(), "Failed processing {path}");
319 }
320 }
321
322 #[test]
323 fn test_thread_count_configuration() {
324 let config = Config::default();
325 let rules = all_rules(&config);
326
327 let parallel_config = ParallelConfig {
329 enabled: true,
330 thread_count: Some(2),
331 min_file_count: 2,
332 };
333 let processor = FileParallelProcessor::new(parallel_config);
334
335 let test_files = vec![
336 ("test1.md".to_string(), "# Test 1".to_string()),
337 ("test2.md".to_string(), "# Test 2".to_string()),
338 ("test3.md".to_string(), "# Test 3".to_string()),
339 ("test4.md".to_string(), "# Test 4".to_string()),
340 ];
341
342 let results = processor.process_files(&test_files, &rules).unwrap();
343 assert_eq!(results.len(), 4);
344 }
345
346 #[test]
347 fn test_result_ordering_preservation() {
348 let config = Config::default();
349 let rules = all_rules(&config);
350 let processor = FileParallelProcessor::with_default_config();
351
352 let test_files: Vec<(String, String)> = (0..20)
353 .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
354 .collect();
355
356 let results = processor.process_files(&test_files, &rules).unwrap();
357
358 for (i, (path, _)) in results.iter().enumerate() {
360 assert_eq!(path, &format!("test{i:02}.md"));
361 }
362 }
363
364 #[test]
365 fn test_concurrent_rule_execution_safety() {
366 let config = Config::default();
368 let rules = all_rules(&config);
369 let processor = FileParallelProcessor::with_default_config();
370
371 let test_files: Vec<(String, String)> = (0..10)
373 .map(|i| {
374 (
375 format!("test{i}.md"),
376 "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
377 )
378 })
379 .collect();
380
381 let results = processor.process_files(&test_files, &rules).unwrap();
382 assert_eq!(results.len(), 10);
383
384 let first_warnings = &results[0].1.as_ref().unwrap();
386 for (_, result) in results.iter().skip(1) {
387 let warnings = result.as_ref().unwrap();
388 assert_eq!(warnings.len(), first_warnings.len());
389 }
390 }
391
392 #[test]
393 fn test_performance_comparison() {
394 let seq_time = std::time::Duration::from_millis(1000);
395 let par_time = std::time::Duration::from_millis(400);
396
397 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
398
399 assert_eq!(comparison.sequential_time, seq_time);
400 assert_eq!(comparison.parallel_time, par_time);
401 assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
402 assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
403 }
404
405 #[test]
406 fn test_performance_comparison_with_overhead() {
407 let seq_time = std::time::Duration::from_millis(100);
408 let par_time = std::time::Duration::from_millis(150);
409
410 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
411
412 assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
413 assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
414 }
415
416 #[test]
417 fn test_fallback_to_sequential() {
418 let config = Config::default();
419 let rules = all_rules(&config);
420
421 let sequential_config = ParallelConfig {
423 enabled: false,
424 ..Default::default()
425 };
426 let processor = FileParallelProcessor::new(sequential_config);
427
428 let test_files = vec![
429 ("test1.md".to_string(), "# Test 1".to_string()),
430 ("test2.md".to_string(), "# Test 2".to_string()),
431 ];
432
433 let results = processor.process_files(&test_files, &rules).unwrap();
434 assert_eq!(results.len(), 2);
435 }
436
437 #[test]
438 fn test_mixed_content_types() {
439 let config = Config::default();
440 let rules = all_rules(&config);
441 let processor = FileParallelProcessor::with_default_config();
442
443 let test_files = vec![
444 ("plain.md".to_string(), "Just plain text".to_string()),
445 ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
446 ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
447 (
448 "front_matter.md".to_string(),
449 "---\ntitle: Test\n---\n# Content".to_string(),
450 ),
451 ];
452
453 let results = processor.process_files(&test_files, &rules).unwrap();
454 assert_eq!(results.len(), 4);
455
456 for (_, result) in results {
457 assert!(result.is_ok());
458 }
459 }
460
461 #[test]
462 fn test_deterministic_results() {
463 let config = Config::default();
465 let rules = all_rules(&config);
466 let processor = FileParallelProcessor::with_default_config();
467
468 let test_files: Vec<(String, String)> = (0..10)
469 .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces \n")))
470 .collect();
471
472 let results1 = processor.process_files(&test_files, &rules).unwrap();
474 let results2 = processor.process_files(&test_files, &rules).unwrap();
475 let results3 = processor.process_files(&test_files, &rules).unwrap();
476
477 for i in 0..test_files.len() {
479 let warnings1 = results1[i].1.as_ref().unwrap();
480 let warnings2 = results2[i].1.as_ref().unwrap();
481 let warnings3 = results3[i].1.as_ref().unwrap();
482
483 assert_eq!(warnings1.len(), warnings2.len());
484 assert_eq!(warnings2.len(), warnings3.len());
485 }
486 }
487
488 #[test]
493 fn test_performance_comparison_normal() {
494 let sequential = std::time::Duration::from_millis(100);
495 let parallel = std::time::Duration::from_millis(50);
496
497 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
498
499 assert_eq!(comparison.sequential_time, sequential);
500 assert_eq!(comparison.parallel_time, parallel);
501 assert!((comparison.speedup_factor - 2.0).abs() < 0.001);
502 assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
503 }
504
505 #[test]
506 fn test_performance_comparison_zero_parallel_time() {
507 let sequential = std::time::Duration::from_millis(100);
509 let parallel = std::time::Duration::ZERO;
510
511 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
512
513 assert!(comparison.speedup_factor.is_infinite());
515 assert!(comparison.speedup_factor.is_sign_positive());
516 }
517
518 #[test]
519 fn test_performance_comparison_both_zero() {
520 let sequential = std::time::Duration::ZERO;
522 let parallel = std::time::Duration::ZERO;
523
524 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
525
526 assert!(comparison.speedup_factor.is_infinite());
528 }
529
530 #[test]
531 fn test_performance_comparison_parallel_slower() {
532 let sequential = std::time::Duration::from_millis(10);
534 let parallel = std::time::Duration::from_millis(20);
535
536 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
537
538 assert!((comparison.speedup_factor - 0.5).abs() < 0.001);
539 assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(10));
540 }
541
542 #[test]
543 fn test_performance_comparison_very_small_times() {
544 let sequential = std::time::Duration::from_nanos(100);
546 let parallel = std::time::Duration::from_nanos(50);
547
548 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
549
550 assert!(comparison.speedup_factor > 1.0);
552 }
553}