1use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12 pub enabled: bool,
14 pub thread_count: Option<usize>,
16 pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21 fn default() -> Self {
22 Self {
23 enabled: true,
24 thread_count: None, min_file_count: 2, }
27 }
28}
29
30pub struct FileParallelProcessor {
32 config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36 pub fn new(config: ParallelConfig) -> Self {
37 Self { config }
38 }
39
40 pub fn with_default_config() -> Self {
41 Self::new(ParallelConfig::default())
42 }
43
44 pub fn process_files(
46 &self,
47 files: &[(String, String)], rules: &[Box<dyn Rule>],
49 ) -> Result<Vec<(String, LintResult)>, String> {
50 if !self.should_use_parallel(files) {
51 return Ok(files
53 .iter()
54 .map(|(path, content)| {
55 let result = crate::lint(
56 content,
57 rules,
58 false,
59 crate::config::MarkdownFlavor::Standard,
60 Some(std::path::PathBuf::from(path)),
61 None,
62 );
63 (path.clone(), result)
64 })
65 .collect());
66 }
67
68 if let Some(thread_count) = self.config.thread_count {
70 rayon::ThreadPoolBuilder::new()
71 .num_threads(thread_count)
72 .build_global()
73 .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
74 }
75
76 let results: Vec<(String, LintResult)> = files
77 .par_iter()
78 .map(|(path, content)| {
79 let start = Instant::now();
80 let result = crate::lint(
81 content,
82 rules,
83 false,
84 crate::config::MarkdownFlavor::Standard,
85 Some(std::path::PathBuf::from(path)),
86 None,
87 );
88 let duration = start.elapsed();
89
90 if duration.as_millis() > 1000 {
91 log::debug!("File {path} took {duration:?}");
92 }
93
94 (path.clone(), result)
95 })
96 .collect();
97
98 Ok(results)
99 }
100
101 pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
103 if !self.config.enabled {
104 return false;
105 }
106
107 if files.len() < self.config.min_file_count {
109 return false;
110 }
111
112 let cpu_cores = std::thread::available_parallelism()
114 .map(std::num::NonZero::get)
115 .unwrap_or(1);
116 if cpu_cores < 2 {
117 return false;
118 }
119
120 true
121 }
122}
123
124pub struct ParallelPerformanceComparison {
126 pub sequential_time: std::time::Duration,
127 pub parallel_time: std::time::Duration,
128 pub speedup_factor: f64,
129 pub parallel_overhead: std::time::Duration,
130}
131
132impl ParallelPerformanceComparison {
133 pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
134 let speedup_factor = if parallel_time.is_zero() {
136 f64::INFINITY
137 } else {
138 sequential_time.as_secs_f64() / parallel_time.as_secs_f64()
139 };
140 let parallel_overhead = if parallel_time > sequential_time {
141 parallel_time.checked_sub(sequential_time).unwrap()
142 } else {
143 std::time::Duration::ZERO
144 };
145
146 Self {
147 sequential_time,
148 parallel_time,
149 speedup_factor,
150 parallel_overhead,
151 }
152 }
153
154 pub fn print_comparison(&self) {
155 println!("๐ Parallel vs Sequential Performance:");
156 println!(
157 " Sequential time: {:.3}ms",
158 self.sequential_time.as_secs_f64() * 1000.0
159 );
160 println!(" Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
161 println!(" Speedup factor: {:.2}x", self.speedup_factor);
162
163 if self.speedup_factor > 1.0 {
164 let improvement = (self.speedup_factor - 1.0) * 100.0;
165 println!(" Performance improvement: {improvement:.1}%");
166 } else {
167 let degradation = (1.0 - self.speedup_factor) * 100.0;
168 println!(" Performance degradation: {degradation:.1}%");
169 if self.parallel_overhead > std::time::Duration::ZERO {
170 println!(
171 " Parallel overhead: {:.3}ms",
172 self.parallel_overhead.as_secs_f64() * 1000.0
173 );
174 }
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::config::Config;
183 use crate::rules::all_rules;
184
185 #[test]
186 fn test_parallel_config_defaults() {
187 let config = ParallelConfig::default();
188 assert!(config.enabled);
189 assert_eq!(config.min_file_count, 2);
190 assert!(config.thread_count.is_none());
191 }
192
193 #[test]
194 fn test_parallel_config_custom() {
195 let config = ParallelConfig {
196 enabled: false,
197 thread_count: Some(4),
198 min_file_count: 5,
199 };
200 assert!(!config.enabled);
201 assert_eq!(config.thread_count, Some(4));
202 assert_eq!(config.min_file_count, 5);
203 }
204
205 #[test]
206 fn test_should_use_parallel_logic() {
207 let processor = FileParallelProcessor::with_default_config();
208
209 let single_file = vec![("test.md".to_string(), "# Test".to_string())];
211 assert!(!processor.should_use_parallel(&single_file));
212
213 let multiple_files = vec![
215 ("test1.md".to_string(), "# Test 1".to_string()),
216 ("test2.md".to_string(), "# Test 2".to_string()),
217 ];
218 assert!(processor.should_use_parallel(&multiple_files));
219
220 let disabled_config = ParallelConfig {
222 enabled: false,
223 ..Default::default()
224 };
225 let disabled_processor = FileParallelProcessor::new(disabled_config);
226 assert!(!disabled_processor.should_use_parallel(&multiple_files));
227
228 let high_threshold_config = ParallelConfig {
230 enabled: true,
231 min_file_count: 10,
232 ..Default::default()
233 };
234 let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
235 assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
236 }
237
238 #[test]
239 fn test_file_parallel_processing() {
240 let config = Config::default();
241 let rules = all_rules(&config);
242 let processor = FileParallelProcessor::with_default_config();
243
244 let test_files = vec![
245 ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
246 ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
247 ];
248
249 let results = processor.process_files(&test_files, &rules).unwrap();
250 assert_eq!(results.len(), 2);
251
252 for (_, result) in results {
254 assert!(result.is_ok());
255 }
256 }
257
258 #[test]
259 fn test_empty_files_handling() {
260 let config = Config::default();
261 let rules = all_rules(&config);
262 let processor = FileParallelProcessor::with_default_config();
263
264 let empty_files: Vec<(String, String)> = vec![];
265 let results = processor.process_files(&empty_files, &rules).unwrap();
266 assert_eq!(results.len(), 0);
267 }
268
269 #[test]
270 fn test_large_file_count() {
271 let config = Config::default();
272 let rules = all_rules(&config);
273 let processor = FileParallelProcessor::with_default_config();
274
275 let test_files: Vec<(String, String)> = (0..100)
277 .map(|i| {
278 (
279 format!("test{i}.md"),
280 format!("# Test {i}\n\nContent with trailing spaces \n"),
281 )
282 })
283 .collect();
284
285 let results = processor.process_files(&test_files, &rules).unwrap();
286 assert_eq!(results.len(), 100);
287
288 for (path, result) in &results {
290 assert!(result.is_ok(), "Failed processing {path}");
291 let warnings = result.as_ref().unwrap();
292 assert!(!warnings.is_empty(), "Expected warnings for {path}");
294 }
295 }
296
297 #[test]
298 fn test_error_propagation() {
299 let config = Config::default();
300 let rules = all_rules(&config);
301 let processor = FileParallelProcessor::with_default_config();
302
303 let test_files = vec![
305 ("empty.md".to_string(), "".to_string()),
306 ("unicode.md".to_string(), "# ๆต่ฏๆ ้ข\n\n่ฟๆฏไธญๆๅ
ๅฎนใ".to_string()),
307 (
308 "emoji.md".to_string(),
309 "# Title with ๐ emoji\n\n๐ Content!".to_string(),
310 ),
311 ("very_long_line.md".to_string(), "a".repeat(10000)), ("many_lines.md".to_string(), "Line\n".repeat(10000)), ];
314
315 let results = processor.process_files(&test_files, &rules).unwrap();
316 assert_eq!(results.len(), 5);
317
318 for (path, result) in &results {
320 assert!(result.is_ok(), "Failed processing {path}");
321 }
322 }
323
324 #[test]
325 fn test_thread_count_configuration() {
326 let config = Config::default();
327 let rules = all_rules(&config);
328
329 let parallel_config = ParallelConfig {
331 enabled: true,
332 thread_count: Some(2),
333 min_file_count: 2,
334 };
335 let processor = FileParallelProcessor::new(parallel_config);
336
337 let test_files = vec![
338 ("test1.md".to_string(), "# Test 1".to_string()),
339 ("test2.md".to_string(), "# Test 2".to_string()),
340 ("test3.md".to_string(), "# Test 3".to_string()),
341 ("test4.md".to_string(), "# Test 4".to_string()),
342 ];
343
344 let results = processor.process_files(&test_files, &rules).unwrap();
345 assert_eq!(results.len(), 4);
346 }
347
348 #[test]
349 fn test_result_ordering_preservation() {
350 let config = Config::default();
351 let rules = all_rules(&config);
352 let processor = FileParallelProcessor::with_default_config();
353
354 let test_files: Vec<(String, String)> = (0..20)
355 .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
356 .collect();
357
358 let results = processor.process_files(&test_files, &rules).unwrap();
359
360 for (i, (path, _)) in results.iter().enumerate() {
362 assert_eq!(path, &format!("test{i:02}.md"));
363 }
364 }
365
366 #[test]
367 fn test_concurrent_rule_execution_safety() {
368 let config = Config::default();
370 let rules = all_rules(&config);
371 let processor = FileParallelProcessor::with_default_config();
372
373 let test_files: Vec<(String, String)> = (0..10)
375 .map(|i| {
376 (
377 format!("test{i}.md"),
378 "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
379 )
380 })
381 .collect();
382
383 let results = processor.process_files(&test_files, &rules).unwrap();
384 assert_eq!(results.len(), 10);
385
386 let first_warnings = &results[0].1.as_ref().unwrap();
388 for (_, result) in results.iter().skip(1) {
389 let warnings = result.as_ref().unwrap();
390 assert_eq!(warnings.len(), first_warnings.len());
391 }
392 }
393
394 #[test]
395 fn test_performance_comparison() {
396 let seq_time = std::time::Duration::from_millis(1000);
397 let par_time = std::time::Duration::from_millis(400);
398
399 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
400
401 assert_eq!(comparison.sequential_time, seq_time);
402 assert_eq!(comparison.parallel_time, par_time);
403 assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
404 assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
405 }
406
407 #[test]
408 fn test_performance_comparison_with_overhead() {
409 let seq_time = std::time::Duration::from_millis(100);
410 let par_time = std::time::Duration::from_millis(150);
411
412 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
413
414 assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
415 assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
416 }
417
418 #[test]
419 fn test_fallback_to_sequential() {
420 let config = Config::default();
421 let rules = all_rules(&config);
422
423 let sequential_config = ParallelConfig {
425 enabled: false,
426 ..Default::default()
427 };
428 let processor = FileParallelProcessor::new(sequential_config);
429
430 let test_files = vec![
431 ("test1.md".to_string(), "# Test 1".to_string()),
432 ("test2.md".to_string(), "# Test 2".to_string()),
433 ];
434
435 let results = processor.process_files(&test_files, &rules).unwrap();
436 assert_eq!(results.len(), 2);
437 }
438
439 #[test]
440 fn test_mixed_content_types() {
441 let config = Config::default();
442 let rules = all_rules(&config);
443 let processor = FileParallelProcessor::with_default_config();
444
445 let test_files = vec![
446 ("plain.md".to_string(), "Just plain text".to_string()),
447 ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
448 ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
449 (
450 "front_matter.md".to_string(),
451 "---\ntitle: Test\n---\n# Content".to_string(),
452 ),
453 ];
454
455 let results = processor.process_files(&test_files, &rules).unwrap();
456 assert_eq!(results.len(), 4);
457
458 for (_, result) in results {
459 assert!(result.is_ok());
460 }
461 }
462
463 #[test]
464 fn test_deterministic_results() {
465 let config = Config::default();
467 let rules = all_rules(&config);
468 let processor = FileParallelProcessor::with_default_config();
469
470 let test_files: Vec<(String, String)> = (0..10)
471 .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces \n")))
472 .collect();
473
474 let results1 = processor.process_files(&test_files, &rules).unwrap();
476 let results2 = processor.process_files(&test_files, &rules).unwrap();
477 let results3 = processor.process_files(&test_files, &rules).unwrap();
478
479 for i in 0..test_files.len() {
481 let warnings1 = results1[i].1.as_ref().unwrap();
482 let warnings2 = results2[i].1.as_ref().unwrap();
483 let warnings3 = results3[i].1.as_ref().unwrap();
484
485 assert_eq!(warnings1.len(), warnings2.len());
486 assert_eq!(warnings2.len(), warnings3.len());
487 }
488 }
489
490 #[test]
495 fn test_performance_comparison_normal() {
496 let sequential = std::time::Duration::from_millis(100);
497 let parallel = std::time::Duration::from_millis(50);
498
499 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
500
501 assert_eq!(comparison.sequential_time, sequential);
502 assert_eq!(comparison.parallel_time, parallel);
503 assert!((comparison.speedup_factor - 2.0).abs() < 0.001);
504 assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
505 }
506
507 #[test]
508 fn test_performance_comparison_zero_parallel_time() {
509 let sequential = std::time::Duration::from_millis(100);
511 let parallel = std::time::Duration::ZERO;
512
513 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
514
515 assert!(comparison.speedup_factor.is_infinite());
517 assert!(comparison.speedup_factor.is_sign_positive());
518 }
519
520 #[test]
521 fn test_performance_comparison_both_zero() {
522 let sequential = std::time::Duration::ZERO;
524 let parallel = std::time::Duration::ZERO;
525
526 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
527
528 assert!(comparison.speedup_factor.is_infinite());
530 }
531
532 #[test]
533 fn test_performance_comparison_parallel_slower() {
534 let sequential = std::time::Duration::from_millis(10);
536 let parallel = std::time::Duration::from_millis(20);
537
538 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
539
540 assert!((comparison.speedup_factor - 0.5).abs() < 0.001);
541 assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(10));
542 }
543
544 #[test]
545 fn test_performance_comparison_very_small_times() {
546 let sequential = std::time::Duration::from_nanos(100);
548 let parallel = std::time::Duration::from_nanos(50);
549
550 let comparison = ParallelPerformanceComparison::new(sequential, parallel);
551
552 assert!(comparison.speedup_factor > 1.0);
554 }
555}