1#[cfg(feature = "parallel")]
21pub use rayon::prelude::*;
22
23#[cfg(feature = "parallel")]
25pub use rayon::{ThreadPool, ThreadPoolBuilder};
26
27#[cfg(not(feature = "parallel"))]
29mod sequential_fallbacks {
30 use std::iter;
31
32 pub trait IntoParallelIterator: Sized {
34 type Iter: Iterator<Item = Self::Item>;
35 type Item;
36
37 fn into_par_iter(self) -> Self::Iter;
38 }
39
40 pub trait ParallelIterator: Iterator + Sized {
42 fn map<F, R>(self, f: F) -> iter::Map<Self, F>
43 where
44 F: FnMut(Self::Item) -> R,
45 {
46 Iterator::map(self, f)
47 }
48
49 fn for_each<F>(self, f: F)
50 where
51 F: FnMut(Self::Item),
52 {
53 Iterator::for_each(self, f)
54 }
55
56 fn try_for_each<F, E>(self, f: F) -> Result<(), E>
57 where
58 F: FnMut(Self::Item) -> Result<(), E>,
59 {
60 Iterator::try_for_each(self, f)
61 }
62
63 fn filter<P>(self, predicate: P) -> iter::Filter<Self, P>
64 where
65 P: FnMut(&Self::Item) -> bool,
66 {
67 Iterator::filter(self, predicate)
68 }
69
70 fn collect<C>(self) -> C
71 where
72 C: FromIterator<Self::Item>,
73 {
74 Iterator::collect(self)
75 }
76
77 fn fold<T, F>(self, init: T, f: F) -> T
78 where
79 F: FnMut(T, Self::Item) -> T,
80 {
81 Iterator::fold(self, init, f)
82 }
83
84 fn reduce<F>(self, f: F) -> Option<Self::Item>
85 where
86 F: FnMut(Self::Item, Self::Item) -> Self::Item,
87 {
88 Iterator::reduce(self, f)
89 }
90
91 fn count(self) -> usize {
92 Iterator::count(self)
93 }
94
95 fn sum<S>(self) -> S
96 where
97 S: std::iter::Sum<Self::Item>,
98 {
99 Iterator::sum(self)
100 }
101
102 fn min(self) -> Option<Self::Item>
103 where
104 Self::Item: Ord,
105 {
106 Iterator::min(self)
107 }
108
109 fn max(self) -> Option<Self::Item>
110 where
111 Self::Item: Ord,
112 {
113 Iterator::max(self)
114 }
115 }
116
117 pub trait ParallelBridge: Iterator + Sized {
119 fn par_bridge(self) -> Self {
120 self
121 }
122 }
123
124 impl IntoParallelIterator for std::ops::Range<usize> {
126 type Item = usize;
127 type Iter = std::ops::Range<usize>;
128
129 fn into_par_iter(self) -> Self::Iter {
130 self
131 }
132 }
133
134 impl<T> IntoParallelIterator for Vec<T> {
135 type Item = T;
136 type Iter = std::vec::IntoIter<T>;
137
138 fn into_par_iter(self) -> Self::Iter {
139 self.into_iter()
140 }
141 }
142
143 impl<'a, T> IntoParallelIterator for &'a [T] {
144 type Item = &'a T;
145 type Iter = std::slice::Iter<'a, T>;
146
147 fn into_par_iter(self) -> Self::Iter {
148 self.iter()
149 }
150 }
151
152 impl<'a, T> IntoParallelIterator for &'a mut [T] {
153 type Item = &'a mut T;
154 type Iter = std::slice::IterMut<'a, T>;
155
156 fn into_par_iter(self) -> Self::Iter {
157 self.iter_mut()
158 }
159 }
160
161 impl<T: Iterator> ParallelIterator for T {}
163
164 impl<T: Iterator> ParallelBridge for T {}
166
167 pub fn scope<'scope, F, R>(f: F) -> R
169 where
170 F: FnOnce(&()) -> R,
171 {
172 f(&())
173 }
174
175 pub fn join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
177 where
178 A: FnOnce() -> RA,
179 B: FnOnce() -> RB,
180 {
181 (a(), b())
182 }
183
184 pub use self::{IntoParallelIterator, ParallelBridge, ParallelIterator};
186}
187
188#[cfg(not(feature = "parallel"))]
190pub use sequential_fallbacks::*;
191
192#[allow(dead_code)]
194pub fn par_range(start: usize, end: usize) -> impl ParallelIterator<Item = usize> {
195 (start..end).into_par_iter()
196}
197
198#[cfg(feature = "parallel")]
200#[allow(dead_code)]
201pub fn par_chunks<T: Sync>(slice: &[T], chunksize: usize) -> rayon::slice::Chunks<'_, T> {
202 slice.par_chunks(chunksize)
203}
204
205#[cfg(not(feature = "parallel"))]
207#[allow(dead_code)]
208pub fn par_chunks<T>(_slice: &[T], chunksize: usize) -> std::slice::Chunks<'_, T> {
209 slice.chunks(chunk_size)
210}
211
212#[cfg(feature = "parallel")]
214#[allow(dead_code)]
215pub fn par_chunks_mut<T: Send>(
216 slice: &mut [T],
217 chunk_size: usize,
218) -> rayon::slice::ChunksMut<'_, T> {
219 slice.par_chunks_mut(chunk_size)
220}
221
222#[cfg(not(feature = "parallel"))]
224#[allow(dead_code)]
225pub fn par_chunks_mut<T>(_slice: &mut [T], chunksize: usize) -> std::slice::ChunksMut<'_, T> {
226 slice.chunks_mut(chunk_size)
227}
228
229#[cfg(feature = "parallel")]
231#[allow(dead_code)]
232pub fn parallel_map<T, U, F>(items: &[T], f: F) -> Vec<U>
233where
234 T: Sync,
235 U: Send,
236 F: Fn(&T) -> U + Sync + Send,
237{
238 use rayon::prelude::*;
239 items.par_iter().map(f).collect()
240}
241
242#[cfg(not(feature = "parallel"))]
244#[allow(dead_code)]
245pub fn parallel_map<T, U, F>(items: &[T], f: F) -> Vec<U>
246where
247 F: Fn(&T) -> U,
248{
249 items.iter().map(f).collect()
250}
251
252#[cfg(feature = "parallel")]
254#[allow(dead_code)]
255pub fn parallel_map_result<T, U, E, F>(items: &[T], f: F) -> Result<Vec<U>, E>
256where
257 T: Sync,
258 U: Send,
259 E: Send,
260 F: Fn(&T) -> Result<U, E> + Sync + Send,
261{
262 use rayon::prelude::*;
263 items.par_iter().map(f).collect()
264}
265
266#[cfg(not(feature = "parallel"))]
268#[allow(dead_code)]
269pub fn parallel_map_result<T, U, E, F>(items: &[T], f: F) -> Result<Vec<U>, E>
270where
271 F: Fn(&T) -> Result<U, E>,
272{
273 items.iter().map(f).collect()
274}
275
276#[allow(dead_code)]
278pub fn is_parallel_enabled() -> bool {
279 cfg!(feature = "parallel")
280}
281
282#[cfg(feature = "parallel")]
284#[allow(dead_code)]
285pub fn num_threads() -> usize {
286 rayon::current_num_threads()
287}
288
289#[cfg(not(feature = "parallel"))]
291#[allow(dead_code)]
292pub fn num_threads() -> usize {
293 1
294}
295
296#[cfg(feature = "parallel")]
301#[allow(dead_code)]
302pub fn current_num_threads() -> usize {
303 rayon::current_num_threads()
304}
305
306#[cfg(not(feature = "parallel"))]
308#[allow(dead_code)]
309pub fn current_num_threads() -> usize {
310 1
311}
312
313#[allow(dead_code)]
315pub fn get_num_threads() -> usize {
316 num_threads()
317}
318
319#[cfg(feature = "parallel")]
321#[allow(dead_code)]
322pub fn set_num_threads(numthreads: usize) {
323 rayon::ThreadPoolBuilder::new()
324 .num_threads(numthreads)
325 .build_global()
326 .expect("Failed to initialize thread pool");
327}
328
329#[cfg(not(feature = "parallel"))]
331#[allow(dead_code)]
332pub fn threads(_: usize) {
333 }
335
336#[cfg(feature = "parallel")]
338pub use rayon::scope as par_scope;
339
340#[cfg(not(feature = "parallel"))]
342pub use sequential_fallbacks::scope as par_scope;
343
344#[cfg(feature = "parallel")]
346pub use rayon::join as par_join;
347
348#[cfg(not(feature = "parallel"))]
350pub use sequential_fallbacks::join as par_join;
351
352#[cfg(feature = "parallel")]
367#[allow(dead_code)]
368pub fn parallel_map_reduce<D, T, M, Red>(data: D, mapper: M, reducer: Red) -> T
369where
370 D: Send + Sync,
371 T: Send + Clone,
372 M: Fn(D) -> T + Sync + Send + Clone,
373 Red: Fn(T, T) -> T + Sync + Send,
374{
375 mapper(data)
378}
379
380#[cfg(not(feature = "parallel"))]
382#[allow(dead_code)]
383pub fn parallel_map_reduce<D, T, M, Red>(data: D, chunksize: usize, mapper: M, reducer: Red) -> T
384where
385 T: Clone,
386 M: Fn(D) -> T,
387 Red: Fn(T, T) -> T,
388{
389 mapper(data)
390}
391
392#[cfg(feature = "parallel")]
405#[allow(dead_code)]
406pub fn parallel_map_collect<I, T, U, M>(items: I, mapper: M) -> Vec<U>
407where
408 I: IntoParallelIterator<Item = T>,
409 T: Send,
410 U: Send,
411 M: Fn(T) -> U + Sync + Send,
412{
413 use rayon::prelude::*;
414 items.into_par_iter().map(mapper).collect()
415}
416
417#[cfg(not(feature = "parallel"))]
419#[allow(dead_code)]
420pub fn parallel_map_collect<I, T, U, M>(items: I, mapper: M) -> Vec<U>
421where
422 I: IntoIterator<Item = T>,
423 M: Fn(T) -> U,
424{
425 items.into_iter().map(mapper).collect()
426}
427
428#[cfg(feature = "parallel")]
445#[allow(dead_code)]
446pub fn parallel_map_reduce_indexed<R, T, M, Red>(
447 range: R,
448 chunk_size: usize,
449 mapper: M,
450 reducer: Red,
451) -> T
452where
453 R: Iterator<Item = usize> + Send,
454 T: Send + Clone,
455 M: Fn(&[usize]) -> T + Sync + Send,
456 Red: Fn(T, T) -> T + Sync + Send,
457{
458 use rayon::prelude::*;
459
460 let indices: Vec<usize> = range.collect();
461
462 indices
463 .chunks(chunk_size)
464 .collect::<Vec<_>>()
465 .into_par_iter()
466 .map(&mapper)
467 .reduce_with(reducer)
468 .unwrap_or_else(|| mapper(&[]))
469}
470
471#[cfg(not(feature = "parallel"))]
473#[allow(dead_code)]
474pub fn parallel_map_reduce_indexed<R, T, M, Red>(
475 range: R,
476 chunk_size: usize,
477 mapper: M,
478 reducer: Red,
479) -> T
480where
481 R: Iterator<Item = usize>,
482 T: Clone,
483 M: Fn(&[usize]) -> T,
484 Red: Fn(T, T) -> T,
485{
486 let indices: Vec<usize> = range.collect();
487
488 let mut results = Vec::new();
489 for chunk in indices.chunks(chunk_size) {
490 results.push(mapper(chunk));
491 }
492
493 results
494 .into_iter()
495 .reduce(reducer)
496 .unwrap_or_else(|| mapper(&[]))
497}
498
499#[cfg(test)]
500#[allow(clippy::items_after_test_module)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn test_par_range() {
506 let result: Vec<usize> = par_range(0, 10).collect();
507 assert_eq!(result, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
508 }
509
510 #[test]
511 fn test_par_map() {
512 let data = vec![1, 2, 3, 4, 5];
513 let result: Vec<i32> = data.into_par_iter().map(|x| x * 2).collect();
514 assert_eq!(result, vec![2, 4, 6, 8, 10]);
515 }
516
517 #[test]
518 fn test_par_filter() {
519 let data = vec![1, 2, 3, 4, 5, 6];
520 let result: Vec<i32> = data.into_par_iter().filter(|x| x % 2 == 0).collect();
521 assert_eq!(result, vec![2, 4, 6]);
522 }
523
524 #[test]
525 fn test_par_try_for_each() {
526 let data = vec![1, 2, 3, 4, 5];
527 let result =
528 data.into_par_iter()
529 .try_for_each(|x| if x < 6 { Ok(()) } else { Err("Too large") });
530 assert!(result.is_ok());
531 }
532
533 #[test]
534 fn test_par_chunks() {
535 let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
536 let chunks: Vec<Vec<i32>> = par_chunks(&data, 3).map(|chunk| chunk.to_vec()).collect();
537 assert_eq!(chunks.len(), 3);
538 assert_eq!(chunks[0], vec![1, 2, 3]);
539 assert_eq!(chunks[1], vec![4, 5, 6]);
540 assert_eq!(chunks[2], vec![7, 8]);
541 }
542
543 #[test]
544 fn test_is_parallel_enabled() {
545 let enabled = is_parallel_enabled();
546 #[cfg(feature = "parallel")]
547 assert!(enabled);
548 #[cfg(not(feature = "parallel"))]
549 assert!(!enabled);
550 }
551
552 #[test]
553 fn test_num_threads() {
554 let threads = num_threads();
555 #[cfg(feature = "parallel")]
556 assert!(threads > 0);
557 #[cfg(not(feature = "parallel"))]
558 assert_eq!(threads, 1);
559 }
560}
561
562#[cfg(feature = "parallel")]
568pub fn parallel_scan<T, F>(items: &[T], init: T, op: F) -> Vec<T>
569where
570 T: Clone + Send + Sync,
571 F: Fn(&T, &T) -> T + Sync,
572{
573 use rayon::prelude::*;
574
575 if items.is_empty() {
576 return Vec::new();
577 }
578
579 let mut result = vec![init.clone(); items.len()];
580
581 result[0] = op(&init, &items[0]);
583 for i in 1..items.len() {
584 result[i] = op(&result[i - 1], &items[i]);
585 }
586
587 result
588}
589
590#[cfg(not(feature = "parallel"))]
592pub fn parallel_scan<T, F>(items: &[T], init: T, op: F) -> Vec<T>
593where
594 T: Clone,
595 F: Fn(&T, &T) -> T,
596{
597 if items.is_empty() {
598 return Vec::new();
599 }
600
601 let mut result = vec![init.clone(); items.len()];
602 result[0] = op(&init, &items[0]);
603 for i in 1..items.len() {
604 result[i] = op(&result[i - 1], &items[i]);
605 }
606
607 result
608}
609
610#[cfg(feature = "parallel")]
615pub fn parallel_matrix_rows<T, U, F>(matrix: &[&[T]], op: F) -> Vec<U>
616where
617 T: Sync,
618 U: Send,
619 F: Fn(&[T]) -> U + Sync,
620{
621 use rayon::prelude::*;
622 matrix.par_iter().map(|row| op(row)).collect()
623}
624
625#[cfg(not(feature = "parallel"))]
627pub fn parallel_matrix_rows<T, U, F>(matrix: &[&[T]], op: F) -> Vec<U>
628where
629 F: Fn(&[T]) -> U,
630{
631 matrix.iter().map(|row| op(row)).collect()
632}
633
634#[cfg(feature = "parallel")]
639pub fn parallel_zip<T, U, V, F>(a: &[T], b: &[U], op: F) -> Vec<V>
640where
641 T: Sync,
642 U: Sync,
643 V: Send,
644 F: Fn(&T, &U) -> V + Sync,
645{
646 use rayon::prelude::*;
647 a.par_iter()
648 .zip(b.par_iter())
649 .map(|(x, y)| op(x, y))
650 .collect()
651}
652
653#[cfg(not(feature = "parallel"))]
655pub fn parallel_zip<T, U, V, F>(a: &[T], b: &[U], op: F) -> Vec<V>
656where
657 F: Fn(&T, &U) -> V,
658{
659 a.iter().zip(b.iter()).map(|(x, y)| op(x, y)).collect()
660}
661
662#[cfg(feature = "parallel")]
667pub fn parallel_sort<T, F>(items: &mut [T], compare: F)
668where
669 T: Send,
670 F: Fn(&T, &T) -> std::cmp::Ordering + Sync,
671{
672 use rayon::slice::ParallelSliceMut;
673 items.par_sort_by(compare);
674}
675
676#[cfg(not(feature = "parallel"))]
678pub fn parallel_sort<T, F>(items: &mut [T], compare: F)
679where
680 F: Fn(&T, &T) -> std::cmp::Ordering,
681{
682 items.sort_by(compare);
683}
684
685#[cfg(feature = "parallel")]
690pub fn parallel_map_work_stealing<T, U, F>(items: &[T], op: F) -> Vec<U>
691where
692 T: Sync,
693 U: Send,
694 F: Fn(&T) -> U + Sync,
695{
696 use rayon::prelude::*;
697
698 let chunk_size = std::cmp::max(1, items.len() / (num_threads() * 4));
700
701 items
702 .par_chunks(chunk_size)
703 .flat_map(|chunk| chunk.par_iter().map(&op))
704 .collect()
705}
706
707#[cfg(not(feature = "parallel"))]
709pub fn parallel_map_work_stealing<T, U, F>(items: &[T], op: F) -> Vec<U>
710where
711 F: Fn(&T) -> U,
712{
713 items.iter().map(op).collect()
714}
715
716#[cfg(feature = "parallel")]
721pub fn parallel_map_numa_aware<T, U, F>(items: &[T], op: F) -> Vec<U>
722where
723 T: Sync,
724 U: Send,
725 F: Fn(&T) -> U + Sync,
726{
727 use rayon::prelude::*;
728
729 let num_cpus = num_threads();
730 let chunk_size = std::cmp::max(1, items.len() / num_cpus);
731
732 let aligned_chunk_size = ((chunk_size + 63) / 64) * 64; items
736 .par_chunks(aligned_chunk_size)
737 .flat_map(|chunk| chunk.par_iter().map(&op))
738 .collect()
739}
740
741#[cfg(not(feature = "parallel"))]
743pub fn parallel_map_numa_aware<T, U, F>(items: &[T], op: F) -> Vec<U>
744where
745 F: Fn(&T) -> U,
746{
747 items.iter().map(op).collect()
748}
749
750#[cfg(feature = "parallel")]
755pub fn parallel_tree_reduce<T, F>(items: &[T], op: F) -> Option<T>
756where
757 T: Clone + Send + Sync,
758 F: Fn(T, T) -> T + Sync,
759{
760 use rayon::prelude::*;
761
762 if items.is_empty() {
763 return None;
764 }
765
766 Some(items.par_iter().cloned().reduce(|| items[0].clone(), &op))
768}
769
770#[cfg(not(feature = "parallel"))]
772pub fn parallel_tree_reduce<T, F>(items: &[T], op: F) -> Option<T>
773where
774 T: Clone,
775 F: Fn(T, T) -> T,
776{
777 items.iter().cloned().reduce(op)
778}
779
780#[cfg(feature = "parallel")]
785pub fn parallel_batch_process<T, U, F, P>(
786 items: &[T],
787 batch_size: usize,
788 processor: F,
789 progress_callback: P,
790) -> Vec<U>
791where
792 T: Sync,
793 U: Send,
794 F: Fn(&[T]) -> Vec<U> + Sync,
795 P: Fn(usize, usize) + Sync,
796{
797 use rayon::prelude::*;
798
799 let total_batches = (items.len() + batch_size - 1) / batch_size;
800 let results: Vec<Vec<U>> = items
801 .par_chunks(batch_size)
802 .enumerate()
803 .map(|(batch_idx, chunk)| {
804 let result = processor(chunk);
805 progress_callback(batch_idx + 1, total_batches);
806 result
807 })
808 .collect();
809
810 results.into_iter().flatten().collect()
811}
812
813#[cfg(not(feature = "parallel"))]
815pub fn parallel_batch_process<T, U, F, P>(
816 items: &[T],
817 batch_size: usize,
818 processor: F,
819 progress_callback: P,
820) -> Vec<U>
821where
822 F: Fn(&[T]) -> Vec<U>,
823 P: Fn(usize, usize),
824{
825 let total_batches = (items.len() + batch_size - 1) / batch_size;
826 let mut results = Vec::new();
827
828 for (batch_idx, chunk) in items.chunks(batch_size).enumerate() {
829 results.extend(processor(chunk));
830 progress_callback(batch_idx + 1, total_batches);
831 }
832
833 results
834}