1#[cfg(test)]
2#[path = "../../tests/unit/utils/parallel_test.rs"]
3mod parallel_test;
4
5pub use self::actual::cartesian_product;
6pub use self::actual::fold_reduce;
7pub use self::actual::map_reduce;
8pub use self::actual::parallel_collect;
9pub use self::actual::parallel_foreach_mut;
10pub use self::actual::parallel_into_collect;
11pub use self::actual::ThreadPool;
12
13#[cfg(not(target_arch = "wasm32"))]
14mod actual {
15 use rayon::prelude::*;
16 use rayon::{ThreadPool as RayonThreadPool, ThreadPoolBuilder};
17
18 pub struct ThreadPool {
20 inner: RayonThreadPool,
21 }
22
23 impl ThreadPool {
24 pub fn new(num_threads: usize) -> Self {
26 Self {
27 inner: ThreadPoolBuilder::new().num_threads(num_threads).build().expect("cannot build a thread pool"),
28 }
29 }
30
31 pub fn execute<OP, R>(&self, op: OP) -> R
33 where
34 OP: FnOnce() -> R + Send,
35 R: Send,
36 {
37 self.inner.install(op)
38 }
39 }
40
41 pub fn cartesian_product<'a, A, B>(a: &'a [A], b: &'a [B]) -> impl IntoParallelIterator<Item = (&'a A, &'a B)>
43 where
44 A: Send + Sync + 'a,
45 B: Send + Sync + 'a,
46 {
47 a.par_iter().flat_map(|a| b.par_iter().map(move |b| (a, b)))
48 }
49
50 pub fn parallel_collect<T, S, FM, R>(source: S, map_op: FM) -> Vec<R>
52 where
53 T: Send + Sync,
54 S: IntoParallelIterator<Item = T>,
55 FM: Fn(T) -> R + Sync + Send,
56 R: Send,
57 {
58 source.into_par_iter().map(map_op).collect()
59 }
60
61 pub fn parallel_into_collect<T, F, R>(source: Vec<T>, map_op: F) -> Vec<R>
63 where
64 T: Send + Sync,
65 F: Fn(T) -> R + Sync + Send,
66 R: Send,
67 {
68 source.into_par_iter().map(map_op).collect()
69 }
70
71 pub fn map_reduce<'a, T, S, FM, FR, FD, R>(source: &'a S, map_op: FM, default_op: FD, reduce_op: FR) -> R
73 where
74 T: Send + Sync,
75 S: IntoParallelRefIterator<'a, Item = T> + ?Sized,
76 FM: Fn(T) -> R + Sync + Send,
77 FR: Fn(R, R) -> R + Sync + Send,
78 FD: Fn() -> R + Sync + Send,
79 R: Send,
80 {
81 source.par_iter().map(map_op).reduce(default_op, reduce_op)
82 }
83
84 pub fn fold_reduce<T, S, FI, FF, FR, R>(source: S, identity: FI, fold: FF, reduce: FR) -> R
86 where
87 T: Send + Sync,
88 S: IntoParallelIterator<Item = T>,
89 FI: Fn() -> R + Clone + Sync + Send,
90 FF: Fn(R, T) -> R + Sync + Send,
91 FR: Fn(R, R) -> R + Sync + Send,
92 R: Send,
93 {
94 source.into_par_iter().fold(identity.clone(), fold).reduce(identity, reduce)
95 }
96
97 pub fn parallel_foreach_mut<T, F>(source: &mut [T], action: F)
99 where
100 T: Send + Sync,
101 F: Fn(&mut T) + Send + Sync,
102 {
103 source.par_iter_mut().for_each(action)
104 }
105}
106
107#[cfg(target_arch = "wasm32")]
108mod actual {
109 pub struct ThreadPool;
111
112 impl ThreadPool {
113 pub fn new(_num_threads: usize) -> Self {
115 Self {}
116 }
117
118 pub fn execute<OP, R>(&self, op: OP) -> R
120 where
121 OP: FnOnce() -> R + Send,
122 R: Send,
123 {
124 op()
125 }
126 }
127
128 pub fn cartesian_product<'a, A, B>(a: &'a [A], b: &'a [B]) -> impl Iterator<Item = (&'a A, &'a B)>
130 where
131 A: Send + Sync + 'a,
132 B: Send + Sync + 'a,
133 {
134 a.iter().flat_map(|a| b.iter().map(move |b| (a, b)))
135 }
136
137 pub fn parallel_collect<T, F, R>(source: &[T], map_op: F) -> Vec<R>
139 where
140 T: Send + Sync,
141 F: Fn(&T) -> R + Sync + Send,
142 R: Send,
143 {
144 source.iter().map(map_op).collect()
145 }
146
147 pub fn parallel_into_collect<T, F, R>(source: Vec<T>, map_op: F) -> Vec<R>
149 where
150 T: Send + Sync,
151 F: Fn(T) -> R + Sync + Send,
152 R: Send,
153 {
154 source.into_iter().map(map_op).collect()
155 }
156
157 pub fn map_reduce<T, S, FM, FR, FD, R>(source: S, map_op: FM, default_op: FD, reduce_op: FR) -> R
159 where
160 T: Send + Sync,
161 S: IntoIterator<Item = T>,
162 FM: Fn(T) -> R + Sync + Send,
163 FR: Fn(R, R) -> R + Sync + Send,
164 FD: Fn() -> R + Sync + Send,
165 R: Send,
166 {
167 source.into_iter().map(map_op).fold(default_op(), reduce_op)
168 }
169
170 pub fn fold_reduce<T, S, FI, FF, FR, R>(source: S, identity: FI, fold: FF, mut reduce: FR) -> R
173 where
174 T: Send + Sync,
175 S: IntoIterator<Item = T>,
176 FI: Fn() -> R + Sync + Send,
177 FF: FnMut(R, T) -> R + Sync + Send,
178 FR: FnMut(R, R) -> R + Sync + Send,
179 R: Send,
180 {
181 reduce(identity(), source.into_iter().fold(identity(), fold))
182 }
183
184 pub fn parallel_foreach_mut<T, F>(source: &mut [T], action: F)
186 where
187 T: Send + Sync,
188 F: Fn(&mut T) + Send + Sync,
189 {
190 source.iter_mut().for_each(action)
191 }
192}