1use crate::rule::{LintResult, Rule};
6use rayon::prelude::*;
7use std::time::Instant;
8
9#[derive(Debug, Clone)]
11pub struct ParallelConfig {
12 pub enabled: bool,
14 pub thread_count: Option<usize>,
16 pub min_file_count: usize,
18}
19
20impl Default for ParallelConfig {
21 fn default() -> Self {
22 Self {
23 enabled: true,
24 thread_count: None, min_file_count: 2, }
27 }
28}
29
30pub struct FileParallelProcessor {
32 config: ParallelConfig,
33}
34
35impl FileParallelProcessor {
36 pub fn new(config: ParallelConfig) -> Self {
37 Self { config }
38 }
39
40 pub fn with_default_config() -> Self {
41 Self::new(ParallelConfig::default())
42 }
43
44 pub fn process_files(
46 &self,
47 files: &[(String, String)], rules: &[Box<dyn Rule>],
49 ) -> Result<Vec<(String, LintResult)>, String> {
50 if !self.should_use_parallel(files) {
51 return Ok(files
53 .iter()
54 .map(|(path, content)| {
55 let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard);
56 (path.clone(), result)
57 })
58 .collect());
59 }
60
61 if let Some(thread_count) = self.config.thread_count {
63 rayon::ThreadPoolBuilder::new()
64 .num_threads(thread_count)
65 .build_global()
66 .unwrap_or_else(|_| log::warn!("Failed to set thread pool size to {thread_count}"));
67 }
68
69 let results: Vec<(String, LintResult)> = files
70 .par_iter()
71 .map(|(path, content)| {
72 let start = Instant::now();
73 let result = crate::lint(content, rules, false, crate::config::MarkdownFlavor::Standard);
74 let duration = start.elapsed();
75
76 if duration.as_millis() > 1000 {
77 log::debug!("File {path} took {duration:?}");
78 }
79
80 (path.clone(), result)
81 })
82 .collect();
83
84 Ok(results)
85 }
86
87 pub fn should_use_parallel(&self, files: &[(String, String)]) -> bool {
89 if !self.config.enabled {
90 return false;
91 }
92
93 if files.len() < self.config.min_file_count {
95 return false;
96 }
97
98 let cpu_cores = num_cpus::get();
100 if cpu_cores < 2 {
101 return false;
102 }
103
104 true
105 }
106}
107
108pub struct ParallelPerformanceComparison {
110 pub sequential_time: std::time::Duration,
111 pub parallel_time: std::time::Duration,
112 pub speedup_factor: f64,
113 pub parallel_overhead: std::time::Duration,
114}
115
116impl ParallelPerformanceComparison {
117 pub fn new(sequential_time: std::time::Duration, parallel_time: std::time::Duration) -> Self {
118 let speedup_factor = sequential_time.as_secs_f64() / parallel_time.as_secs_f64();
119 let parallel_overhead = if parallel_time > sequential_time {
120 parallel_time - sequential_time
121 } else {
122 std::time::Duration::ZERO
123 };
124
125 Self {
126 sequential_time,
127 parallel_time,
128 speedup_factor,
129 parallel_overhead,
130 }
131 }
132
133 pub fn print_comparison(&self) {
134 println!("๐ Parallel vs Sequential Performance:");
135 println!(
136 " Sequential time: {:.3}ms",
137 self.sequential_time.as_secs_f64() * 1000.0
138 );
139 println!(" Parallel time: {:.3}ms", self.parallel_time.as_secs_f64() * 1000.0);
140 println!(" Speedup factor: {:.2}x", self.speedup_factor);
141
142 if self.speedup_factor > 1.0 {
143 let improvement = (self.speedup_factor - 1.0) * 100.0;
144 println!(" Performance improvement: {improvement:.1}%");
145 } else {
146 let degradation = (1.0 - self.speedup_factor) * 100.0;
147 println!(" Performance degradation: {degradation:.1}%");
148 if self.parallel_overhead > std::time::Duration::ZERO {
149 println!(
150 " Parallel overhead: {:.3}ms",
151 self.parallel_overhead.as_secs_f64() * 1000.0
152 );
153 }
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::config::Config;
162 use crate::rules::all_rules;
163
164 #[test]
165 fn test_parallel_config_defaults() {
166 let config = ParallelConfig::default();
167 assert!(config.enabled);
168 assert_eq!(config.min_file_count, 2);
169 assert!(config.thread_count.is_none());
170 }
171
172 #[test]
173 fn test_parallel_config_custom() {
174 let config = ParallelConfig {
175 enabled: false,
176 thread_count: Some(4),
177 min_file_count: 5,
178 };
179 assert!(!config.enabled);
180 assert_eq!(config.thread_count, Some(4));
181 assert_eq!(config.min_file_count, 5);
182 }
183
184 #[test]
185 fn test_should_use_parallel_logic() {
186 let processor = FileParallelProcessor::with_default_config();
187
188 let single_file = vec![("test.md".to_string(), "# Test".to_string())];
190 assert!(!processor.should_use_parallel(&single_file));
191
192 let multiple_files = vec![
194 ("test1.md".to_string(), "# Test 1".to_string()),
195 ("test2.md".to_string(), "# Test 2".to_string()),
196 ];
197 assert!(processor.should_use_parallel(&multiple_files));
198
199 let disabled_config = ParallelConfig {
201 enabled: false,
202 ..Default::default()
203 };
204 let disabled_processor = FileParallelProcessor::new(disabled_config);
205 assert!(!disabled_processor.should_use_parallel(&multiple_files));
206
207 let high_threshold_config = ParallelConfig {
209 enabled: true,
210 min_file_count: 10,
211 ..Default::default()
212 };
213 let high_threshold_processor = FileParallelProcessor::new(high_threshold_config);
214 assert!(!high_threshold_processor.should_use_parallel(&multiple_files));
215 }
216
217 #[test]
218 fn test_file_parallel_processing() {
219 let config = Config::default();
220 let rules = all_rules(&config);
221 let processor = FileParallelProcessor::with_default_config();
222
223 let test_files = vec![
224 ("test1.md".to_string(), "# Test 1\n\nContent".to_string()),
225 ("test2.md".to_string(), "# Test 2\n\nMore content".to_string()),
226 ];
227
228 let results = processor.process_files(&test_files, &rules).unwrap();
229 assert_eq!(results.len(), 2);
230
231 for (_, result) in results {
233 assert!(result.is_ok());
234 }
235 }
236
237 #[test]
238 fn test_empty_files_handling() {
239 let config = Config::default();
240 let rules = all_rules(&config);
241 let processor = FileParallelProcessor::with_default_config();
242
243 let empty_files: Vec<(String, String)> = vec![];
244 let results = processor.process_files(&empty_files, &rules).unwrap();
245 assert_eq!(results.len(), 0);
246 }
247
248 #[test]
249 fn test_large_file_count() {
250 let config = Config::default();
251 let rules = all_rules(&config);
252 let processor = FileParallelProcessor::with_default_config();
253
254 let test_files: Vec<(String, String)> = (0..100)
256 .map(|i| {
257 (
258 format!("test{i}.md"),
259 format!("# Test {i}\n\nContent with trailing spaces \n"),
260 )
261 })
262 .collect();
263
264 let results = processor.process_files(&test_files, &rules).unwrap();
265 assert_eq!(results.len(), 100);
266
267 for (path, result) in &results {
269 assert!(result.is_ok(), "Failed processing {path}");
270 let warnings = result.as_ref().unwrap();
271 assert!(!warnings.is_empty(), "Expected warnings for {path}");
273 }
274 }
275
276 #[test]
277 fn test_error_propagation() {
278 let config = Config::default();
279 let rules = all_rules(&config);
280 let processor = FileParallelProcessor::with_default_config();
281
282 let test_files = vec![
284 ("empty.md".to_string(), "".to_string()),
285 ("unicode.md".to_string(), "# ๆต่ฏๆ ้ข\n\n่ฟๆฏไธญๆๅ
ๅฎนใ".to_string()),
286 (
287 "emoji.md".to_string(),
288 "# Title with ๐ emoji\n\n๐ Content!".to_string(),
289 ),
290 ("very_long_line.md".to_string(), "a".repeat(10000)), ("many_lines.md".to_string(), "Line\n".repeat(10000)), ];
293
294 let results = processor.process_files(&test_files, &rules).unwrap();
295 assert_eq!(results.len(), 5);
296
297 for (path, result) in &results {
299 assert!(result.is_ok(), "Failed processing {path}");
300 }
301 }
302
303 #[test]
304 fn test_thread_count_configuration() {
305 let config = Config::default();
306 let rules = all_rules(&config);
307
308 let parallel_config = ParallelConfig {
310 enabled: true,
311 thread_count: Some(2),
312 min_file_count: 2,
313 };
314 let processor = FileParallelProcessor::new(parallel_config);
315
316 let test_files = vec![
317 ("test1.md".to_string(), "# Test 1".to_string()),
318 ("test2.md".to_string(), "# Test 2".to_string()),
319 ("test3.md".to_string(), "# Test 3".to_string()),
320 ("test4.md".to_string(), "# Test 4".to_string()),
321 ];
322
323 let results = processor.process_files(&test_files, &rules).unwrap();
324 assert_eq!(results.len(), 4);
325 }
326
327 #[test]
328 fn test_result_ordering_preservation() {
329 let config = Config::default();
330 let rules = all_rules(&config);
331 let processor = FileParallelProcessor::with_default_config();
332
333 let test_files: Vec<(String, String)> = (0..20)
334 .map(|i| (format!("test{i:02}.md"), format!("# Test {i}")))
335 .collect();
336
337 let results = processor.process_files(&test_files, &rules).unwrap();
338
339 for (i, (path, _)) in results.iter().enumerate() {
341 assert_eq!(path, &format!("test{i:02}.md"));
342 }
343 }
344
345 #[test]
346 fn test_concurrent_rule_execution_safety() {
347 let config = Config::default();
349 let rules = all_rules(&config);
350 let processor = FileParallelProcessor::with_default_config();
351
352 let test_files: Vec<(String, String)> = (0..10)
354 .map(|i| {
355 (
356 format!("test{i}.md"),
357 "# Heading\n\n- List item\n- Another item\n\n[link](url)\n`code`".to_string(),
358 )
359 })
360 .collect();
361
362 let results = processor.process_files(&test_files, &rules).unwrap();
363 assert_eq!(results.len(), 10);
364
365 let first_warnings = &results[0].1.as_ref().unwrap();
367 for (_, result) in results.iter().skip(1) {
368 let warnings = result.as_ref().unwrap();
369 assert_eq!(warnings.len(), first_warnings.len());
370 }
371 }
372
373 #[test]
374 fn test_performance_comparison() {
375 let seq_time = std::time::Duration::from_millis(1000);
376 let par_time = std::time::Duration::from_millis(400);
377
378 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
379
380 assert_eq!(comparison.sequential_time, seq_time);
381 assert_eq!(comparison.parallel_time, par_time);
382 assert!((comparison.speedup_factor - 2.5).abs() < 0.01);
383 assert_eq!(comparison.parallel_overhead, std::time::Duration::ZERO);
384 }
385
386 #[test]
387 fn test_performance_comparison_with_overhead() {
388 let seq_time = std::time::Duration::from_millis(100);
389 let par_time = std::time::Duration::from_millis(150);
390
391 let comparison = ParallelPerformanceComparison::new(seq_time, par_time);
392
393 assert!((comparison.speedup_factor - 0.667).abs() < 0.01);
394 assert_eq!(comparison.parallel_overhead, std::time::Duration::from_millis(50));
395 }
396
397 #[test]
398 fn test_fallback_to_sequential() {
399 let config = Config::default();
400 let rules = all_rules(&config);
401
402 let sequential_config = ParallelConfig {
404 enabled: false,
405 ..Default::default()
406 };
407 let processor = FileParallelProcessor::new(sequential_config);
408
409 let test_files = vec![
410 ("test1.md".to_string(), "# Test 1".to_string()),
411 ("test2.md".to_string(), "# Test 2".to_string()),
412 ];
413
414 let results = processor.process_files(&test_files, &rules).unwrap();
415 assert_eq!(results.len(), 2);
416 }
417
418 #[test]
419 fn test_mixed_content_types() {
420 let config = Config::default();
421 let rules = all_rules(&config);
422 let processor = FileParallelProcessor::with_default_config();
423
424 let test_files = vec![
425 ("plain.md".to_string(), "Just plain text".to_string()),
426 ("code.md".to_string(), "```rust\nfn main() {}\n```".to_string()),
427 ("table.md".to_string(), "| A | B |\n|---|---|\n| 1 | 2 |".to_string()),
428 (
429 "front_matter.md".to_string(),
430 "---\ntitle: Test\n---\n# Content".to_string(),
431 ),
432 ];
433
434 let results = processor.process_files(&test_files, &rules).unwrap();
435 assert_eq!(results.len(), 4);
436
437 for (_, result) in results {
438 assert!(result.is_ok());
439 }
440 }
441
442 #[test]
443 fn test_deterministic_results() {
444 let config = Config::default();
446 let rules = all_rules(&config);
447 let processor = FileParallelProcessor::with_default_config();
448
449 let test_files: Vec<(String, String)> = (0..10)
450 .map(|i| (format!("test{i}.md"), format!("# Heading {i}\n\nTrailing spaces \n")))
451 .collect();
452
453 let results1 = processor.process_files(&test_files, &rules).unwrap();
455 let results2 = processor.process_files(&test_files, &rules).unwrap();
456 let results3 = processor.process_files(&test_files, &rules).unwrap();
457
458 for i in 0..test_files.len() {
460 let warnings1 = results1[i].1.as_ref().unwrap();
461 let warnings2 = results2[i].1.as_ref().unwrap();
462 let warnings3 = results3[i].1.as_ref().unwrap();
463
464 assert_eq!(warnings1.len(), warnings2.len());
465 assert_eq!(warnings2.len(), warnings3.len());
466 }
467 }
468}