1use rayon::prelude::*;
20use std::error::Error;
21use std::fmt;
22use std::path::{Path, PathBuf};
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26
27#[derive(Debug, Clone)]
29pub enum ParallelError {
30 ThreadPoolError(String),
32 ProcessingError { index: usize, message: String },
34 AllTasksFailed(usize),
36}
37
38impl fmt::Display for ParallelError {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 Self::ThreadPoolError(msg) => write!(f, "Thread pool error: {}", msg),
42 Self::ProcessingError { index, message } => {
43 write!(f, "Processing error at index {}: {}", index, message)
44 }
45 Self::AllTasksFailed(count) => write!(f, "All {} tasks failed", count),
46 }
47 }
48}
49
50impl Error for ParallelError {}
51
52#[derive(Debug, Clone)]
54pub struct ParallelOptions {
55 pub num_threads: usize,
57 pub chunk_size: usize,
59 pub continue_on_error: bool,
61}
62
63impl Default for ParallelOptions {
64 fn default() -> Self {
65 Self {
66 num_threads: 0,
67 chunk_size: 0,
68 continue_on_error: true,
69 }
70 }
71}
72
73impl ParallelOptions {
74 pub fn with_threads(num_threads: usize) -> Self {
76 Self {
77 num_threads,
78 ..Default::default()
79 }
80 }
81
82 pub fn with_chunks(chunk_size: usize) -> Self {
84 Self {
85 chunk_size,
86 ..Default::default()
87 }
88 }
89
90 pub fn effective_threads(&self) -> usize {
92 if self.num_threads == 0 {
93 num_cpus::get()
94 } else {
95 self.num_threads
96 }
97 }
98}
99
100#[derive(Debug)]
102pub struct ParallelResult<T> {
103 pub results: Vec<(usize, T)>,
105 pub errors: Vec<(usize, String)>,
107 pub duration: Duration,
109 pub processed_count: usize,
111}
112
113impl<T> ParallelResult<T> {
114 pub fn is_success(&self) -> bool {
116 self.errors.is_empty()
117 }
118
119 pub fn success_rate(&self) -> f64 {
121 if self.processed_count == 0 {
122 return 0.0;
123 }
124 (self.results.len() as f64 / self.processed_count as f64) * 100.0
125 }
126
127 pub fn ordered_results(mut self) -> Vec<T> {
129 self.results.sort_by_key(|(idx, _)| *idx);
130 self.results.into_iter().map(|(_, v)| v).collect()
131 }
132}
133
134pub type ProgressCallback = Arc<dyn Fn(usize, usize) + Send + Sync>;
136
137pub struct ParallelProcessor {
139 options: ParallelOptions,
140 progress_callback: Option<ProgressCallback>,
141}
142
143impl ParallelProcessor {
144 pub fn new() -> Self {
146 Self {
147 options: ParallelOptions::default(),
148 progress_callback: None,
149 }
150 }
151
152 pub fn with_options(options: ParallelOptions) -> Self {
154 Self {
155 options,
156 progress_callback: None,
157 }
158 }
159
160 pub fn with_progress<F>(mut self, callback: F) -> Self
162 where
163 F: Fn(usize, usize) + Send + Sync + 'static,
164 {
165 self.progress_callback = Some(Arc::new(callback));
166 self
167 }
168
169 pub fn process<T, E, F>(&self, items: &[PathBuf], processor: F) -> ParallelResult<T>
171 where
172 F: Fn(&Path) -> Result<T, E> + Sync + Send,
173 E: std::fmt::Display,
174 T: Send,
175 {
176 let start = Instant::now();
177 let total = items.len();
178
179 if total == 0 {
180 return ParallelResult {
181 results: vec![],
182 errors: vec![],
183 duration: Duration::ZERO,
184 processed_count: 0,
185 };
186 }
187
188 let completed = Arc::new(AtomicUsize::new(0));
189 let progress_callback = self.progress_callback.clone();
190
191 let pool = if self.options.num_threads > 0 {
193 rayon::ThreadPoolBuilder::new()
194 .num_threads(self.options.num_threads)
195 .build()
196 .ok()
197 } else {
198 None
199 };
200
201 let process_chunk = |chunk: &[(usize, &PathBuf)]| -> Vec<(usize, Result<T, String>)> {
202 chunk
203 .par_iter()
204 .map(|(idx, path)| {
205 let result = processor(path).map_err(|e| e.to_string());
206
207 let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
209 if let Some(ref cb) = progress_callback {
210 cb(done, total);
211 }
212
213 (*idx, result)
214 })
215 .collect()
216 };
217
218 let indexed_items: Vec<_> = items.iter().enumerate().collect();
219
220 let all_results = if self.options.chunk_size > 0 {
221 let mut all_results = Vec::with_capacity(total);
223 for chunk in indexed_items.chunks(self.options.chunk_size) {
224 let chunk_results = if let Some(ref pool) = pool {
225 pool.install(|| process_chunk(chunk))
226 } else {
227 process_chunk(chunk)
228 };
229 all_results.extend(chunk_results);
230 }
231 all_results
232 } else {
233 if let Some(ref pool) = pool {
235 pool.install(|| process_chunk(&indexed_items))
236 } else {
237 process_chunk(&indexed_items)
238 }
239 };
240
241 let mut results = Vec::new();
243 let mut errors = Vec::new();
244
245 for (idx, result) in all_results {
246 match result {
247 Ok(value) => results.push((idx, value)),
248 Err(msg) => errors.push((idx, msg)),
249 }
250 }
251
252 ParallelResult {
253 results,
254 errors,
255 duration: start.elapsed(),
256 processed_count: total,
257 }
258 }
259
260 pub fn map<T, F>(&self, items: &[PathBuf], mapper: F) -> Vec<T>
262 where
263 F: Fn(&Path) -> T + Sync + Send,
264 T: Send,
265 {
266 if self.options.num_threads > 0 {
267 if let Ok(pool) = rayon::ThreadPoolBuilder::new()
268 .num_threads(self.options.num_threads)
269 .build()
270 {
271 return pool.install(|| items.par_iter().map(|p| mapper(p)).collect());
272 }
273 }
274
275 items.par_iter().map(|p| mapper(p)).collect()
276 }
277}
278
279impl Default for ParallelProcessor {
280 fn default() -> Self {
281 Self::new()
282 }
283}
284
285pub fn parallel_process<T, E, F>(
287 inputs: &[PathBuf],
288 processor: F,
289 options: &ParallelOptions,
290) -> ParallelResult<T>
291where
292 F: Fn(&Path) -> Result<T, E> + Sync + Send,
293 E: std::fmt::Display,
294 T: Send,
295{
296 ParallelProcessor::with_options(options.clone()).process(inputs, processor)
297}
298
299pub fn parallel_map<T, F>(inputs: &[PathBuf], mapper: F, num_threads: usize) -> Vec<T>
301where
302 F: Fn(&Path) -> T + Sync + Send,
303 T: Send,
304{
305 ParallelProcessor::with_options(ParallelOptions::with_threads(num_threads)).map(inputs, mapper)
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use std::fs::File;
312 use std::io::Write;
313 use tempfile::tempdir;
314
315 #[test]
318 fn test_par001_parallel_process_basic() {
319 let dir = tempdir().unwrap();
320 let paths: Vec<PathBuf> = (0..10)
321 .map(|i| {
322 let path = dir.path().join(format!("file_{}.txt", i));
323 let mut f = File::create(&path).unwrap();
324 writeln!(f, "content {}", i).unwrap();
325 path
326 })
327 .collect();
328
329 let options = ParallelOptions::default();
330 let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
331
332 assert!(result.is_success());
333 assert_eq!(result.results.len(), 10);
334 assert!(result.success_rate() > 99.0);
335 }
336
337 #[test]
340 fn test_par006_thread_count_options() {
341 let options = ParallelOptions::with_threads(4);
342 assert_eq!(options.effective_threads(), 4);
343
344 let auto_options = ParallelOptions::default();
345 assert!(auto_options.effective_threads() >= 1);
346 }
347
348 #[test]
349 fn test_par006_zero_threads_uses_cpu_count() {
350 let options = ParallelOptions::with_threads(0);
351 assert_eq!(options.effective_threads(), num_cpus::get());
352 }
353
354 #[test]
357 fn test_par007_partial_failure() {
358 let paths: Vec<PathBuf> = (0..5)
359 .map(|i| PathBuf::from(format!("/nonexistent/path_{}", i)))
360 .collect();
361
362 let options = ParallelOptions::default();
363 let result = parallel_process(
364 &paths,
365 |path| {
366 if path.exists() {
367 Ok(true)
368 } else {
369 Err("File not found")
370 }
371 },
372 &options,
373 );
374
375 assert!(!result.is_success());
376 assert_eq!(result.errors.len(), 5);
377 }
378
379 #[test]
380 fn test_par007_continue_on_error() {
381 let dir = tempdir().unwrap();
382 let mut paths: Vec<PathBuf> = vec![];
383
384 for i in 0..3 {
386 let path = dir.path().join(format!("valid_{}.txt", i));
387 File::create(&path).unwrap();
388 paths.push(path);
389 }
390 for i in 0..2 {
392 paths.push(PathBuf::from(format!("/invalid_{}", i)));
393 }
394
395 let options = ParallelOptions {
396 continue_on_error: true,
397 ..Default::default()
398 };
399
400 let result = parallel_process(
401 &paths,
402 |path| {
403 if path.exists() {
404 Ok(true)
405 } else {
406 Err("Not found")
407 }
408 },
409 &options,
410 );
411
412 assert_eq!(result.results.len(), 3);
413 assert_eq!(result.errors.len(), 2);
414 }
415
416 #[test]
419 fn test_par009_chunk_processing() {
420 let dir = tempdir().unwrap();
421 let paths: Vec<PathBuf> = (0..20)
422 .map(|i| {
423 let path = dir.path().join(format!("file_{}.txt", i));
424 File::create(&path).unwrap();
425 path
426 })
427 .collect();
428
429 let options = ParallelOptions::with_chunks(5);
430 let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
431
432 assert!(result.is_success());
433 assert_eq!(result.results.len(), 20);
434 }
435
436 #[test]
439 fn test_empty_input() {
440 let paths: Vec<PathBuf> = vec![];
441 let options = ParallelOptions::default();
442 let result = parallel_process(&paths, |_| Ok::<_, String>(true), &options);
443
444 assert!(result.is_success());
445 assert_eq!(result.processed_count, 0);
446 }
447
448 #[test]
449 fn test_ordered_results() {
450 let dir = tempdir().unwrap();
451 let paths: Vec<PathBuf> = (0..5)
452 .map(|i| {
453 let path = dir.path().join(format!("{}.txt", i));
454 File::create(&path).unwrap();
455 path
456 })
457 .collect();
458
459 let options = ParallelOptions::default();
460 let result = parallel_process(
461 &paths,
462 |path| {
463 let name = path.file_stem().unwrap().to_str().unwrap();
464 Ok::<_, String>(name.parse::<usize>().unwrap())
465 },
466 &options,
467 );
468
469 let ordered = result.ordered_results();
470 assert_eq!(ordered, vec![0, 1, 2, 3, 4]);
471 }
472
473 #[test]
474 fn test_parallel_processor_builder() {
475 let processor = ParallelProcessor::new();
476 assert!(processor.progress_callback.is_none());
477
478 let processor_with_progress =
479 ParallelProcessor::new().with_progress(|done, total| println!("{}/{}", done, total));
480 assert!(processor_with_progress.progress_callback.is_some());
481 }
482
483 #[test]
484 fn test_parallel_map() {
485 let paths: Vec<PathBuf> = (0..5).map(|i| PathBuf::from(format!("{}", i))).collect();
486
487 let results = parallel_map(&paths, |p| p.to_string_lossy().parse::<i32>().unwrap(), 2);
488
489 assert_eq!(results.len(), 5);
490 }
491
492 #[test]
493 fn test_success_rate_calculation() {
494 let result: ParallelResult<bool> = ParallelResult {
495 results: vec![(0, true), (1, true)],
496 errors: vec![(2, "error".to_string())],
497 duration: Duration::ZERO,
498 processed_count: 3,
499 };
500
501 let rate = result.success_rate();
502 assert!((rate - 66.67).abs() < 1.0);
503 }
504
505 #[test]
506 fn test_error_types() {
507 let err1 = ParallelError::ThreadPoolError("test".to_string());
508 assert!(err1.to_string().contains("Thread pool"));
509
510 let err2 = ParallelError::ProcessingError {
511 index: 5,
512 message: "fail".to_string(),
513 };
514 assert!(err2.to_string().contains("index 5"));
515
516 let err3 = ParallelError::AllTasksFailed(10);
517 assert!(err3.to_string().contains("10 tasks"));
518 }
519
520 #[test]
521 fn test_default_options() {
522 let options = ParallelOptions::default();
523 assert_eq!(options.num_threads, 0);
524 assert_eq!(options.chunk_size, 0);
525 assert!(options.continue_on_error);
526 }
527
528 #[test]
529 fn test_thread_safety() {
530 use std::sync::atomic::AtomicUsize;
531 use std::sync::Arc;
532
533 let counter = Arc::new(AtomicUsize::new(0));
534 let paths: Vec<PathBuf> = (0..100).map(|i| PathBuf::from(format!("{}", i))).collect();
535
536 let counter_clone = Arc::clone(&counter);
537 let results = parallel_map(
538 &paths,
539 move |_| {
540 counter_clone.fetch_add(1, Ordering::Relaxed);
541 true
542 },
543 4,
544 );
545
546 assert_eq!(results.len(), 100);
547 assert_eq!(counter.load(Ordering::Relaxed), 100);
548 }
549
550 #[test]
551 fn test_progress_callback() {
552 use std::sync::atomic::AtomicUsize;
553 use std::sync::Arc;
554
555 let progress_count = Arc::new(AtomicUsize::new(0));
556 let progress_clone = Arc::clone(&progress_count);
557
558 let dir = tempdir().unwrap();
559 let paths: Vec<PathBuf> = (0..10)
560 .map(|i| {
561 let path = dir.path().join(format!("{}.txt", i));
562 File::create(&path).unwrap();
563 path
564 })
565 .collect();
566
567 let processor = ParallelProcessor::new().with_progress(move |_, _| {
568 progress_clone.fetch_add(1, Ordering::Relaxed);
569 });
570
571 let _ = processor.process(&paths, |p| Ok::<_, String>(p.exists()));
572
573 assert_eq!(progress_count.load(Ordering::Relaxed), 10);
574 }
575
576 #[test]
579 fn test_custom_thread_pool() {
580 let dir = tempdir().unwrap();
581 let paths: Vec<PathBuf> = (0..20)
582 .map(|i| {
583 let path = dir.path().join(format!("file_{}.txt", i));
584 File::create(&path).unwrap();
585 path
586 })
587 .collect();
588
589 let options = ParallelOptions::with_threads(2);
591 let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
592
593 assert!(result.is_success());
594 assert_eq!(result.results.len(), 20);
595 }
596
597 #[test]
598 fn test_large_chunk_size() {
599 let dir = tempdir().unwrap();
600 let paths: Vec<PathBuf> = (0..50)
601 .map(|i| {
602 let path = dir.path().join(format!("file_{}.txt", i));
603 File::create(&path).unwrap();
604 path
605 })
606 .collect();
607
608 let options = ParallelOptions::with_chunks(100);
610 let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
611
612 assert!(result.is_success());
613 assert_eq!(result.results.len(), 50);
614 }
615
616 #[test]
617 fn test_mixed_success_failure() {
618 let dir = tempdir().unwrap();
619 let mut paths = Vec::new();
620
621 for i in 0..5 {
623 let path = dir.path().join(format!("valid_{}.txt", i));
624 File::create(&path).unwrap();
625 paths.push(path);
626 }
627 for i in 0..5 {
629 paths.push(PathBuf::from(format!("/nonexistent/path_{}", i)));
630 }
631
632 let options = ParallelOptions::default();
633 let result = parallel_process(
634 &paths,
635 |path| {
636 if path.exists() {
637 Ok(path.to_string_lossy().to_string())
638 } else {
639 Err("File not found")
640 }
641 },
642 &options,
643 );
644
645 assert_eq!(result.results.len(), 5);
646 assert_eq!(result.errors.len(), 5);
647 assert!((result.success_rate() - 50.0).abs() < 0.1);
648 }
649
650 #[test]
651 fn test_processor_with_custom_options() {
652 let dir = tempdir().unwrap();
653 let paths: Vec<PathBuf> = (0..10)
654 .map(|i| {
655 let path = dir.path().join(format!("{}.txt", i));
656 File::create(&path).unwrap();
657 path
658 })
659 .collect();
660
661 let options = ParallelOptions {
662 num_threads: 4,
663 chunk_size: 3,
664 continue_on_error: true,
665 };
666
667 let processor = ParallelProcessor::with_options(options);
668 let result = processor.process(&paths, |p| Ok::<_, String>(p.exists()));
669
670 assert!(result.is_success());
671 assert_eq!(result.processed_count, 10);
672 }
673
674 #[test]
675 fn test_parallel_result_duration() {
676 let dir = tempdir().unwrap();
677 let paths: Vec<PathBuf> = (0..5)
678 .map(|i| {
679 let path = dir.path().join(format!("{}.txt", i));
680 File::create(&path).unwrap();
681 path
682 })
683 .collect();
684
685 let options = ParallelOptions::default();
686 let result = parallel_process(&paths, |path| Ok::<_, String>(path.exists()), &options);
687
688 assert!(result.duration.as_nanos() > 0);
690 }
691
692 #[test]
693 fn test_single_item_processing() {
694 let dir = tempdir().unwrap();
695 let path = dir.path().join("single.txt");
696 File::create(&path).unwrap();
697 let paths = vec![path];
698
699 let options = ParallelOptions::default();
700 let result = parallel_process(&paths, |p| Ok::<_, String>(p.exists()), &options);
701
702 assert!(result.is_success());
703 assert_eq!(result.results.len(), 1);
704 assert_eq!(result.processed_count, 1);
705 }
706
707 #[test]
708 fn test_parallel_map_preserves_order() {
709 let paths: Vec<PathBuf> = (0..20).map(|i| PathBuf::from(format!("{:02}", i))).collect();
710
711 let results = parallel_map(&paths, |p| p.to_string_lossy().to_string(), 4);
712
713 assert_eq!(results.len(), 20);
715 for result in &results {
717 assert!(result.len() == 2); }
719 }
720}