rosomaxa/utils/
parallel.rs

1#[cfg(test)]
2#[path = "../../tests/unit/utils/parallel_test.rs"]
3mod parallel_test;
4
5pub use self::actual::cartesian_product;
6pub use self::actual::fold_reduce;
7pub use self::actual::map_reduce;
8pub use self::actual::parallel_collect;
9pub use self::actual::parallel_foreach_mut;
10pub use self::actual::parallel_into_collect;
11pub use self::actual::ThreadPool;
12
13#[cfg(not(target_arch = "wasm32"))]
14mod actual {
15    use rayon::prelude::*;
16    use rayon::{ThreadPool as RayonThreadPool, ThreadPoolBuilder};
17
18    /// Represents a thread pool wrapper.
19    pub struct ThreadPool {
20        inner: RayonThreadPool,
21    }
22
23    impl ThreadPool {
24        /// Creates a new instance of `ThreadPool`
25        pub fn new(num_threads: usize) -> Self {
26            Self {
27                inner: ThreadPoolBuilder::new().num_threads(num_threads).build().expect("cannot build a thread pool"),
28            }
29        }
30
31        /// Executes given operation on thread pool.
32        pub fn execute<OP, R>(&self, op: OP) -> R
33        where
34            OP: FnOnce() -> R + Send,
35            R: Send,
36        {
37            self.inner.install(op)
38        }
39    }
40
41    /// Creates a cartesian product returning a parallel iterator.
42    pub fn cartesian_product<'a, A, B>(a: &'a [A], b: &'a [B]) -> impl IntoParallelIterator<Item = (&'a A, &'a B)>
43    where
44        A: Send + Sync + 'a,
45        B: Send + Sync + 'a,
46    {
47        a.par_iter().flat_map(|a| b.par_iter().map(move |b| (a, b)))
48    }
49
50    /// Maps collection and collects results into vector in parallel.
51    pub fn parallel_collect<T, S, FM, R>(source: S, map_op: FM) -> Vec<R>
52    where
53        T: Send + Sync,
54        S: IntoParallelIterator<Item = T>,
55        FM: Fn(T) -> R + Sync + Send,
56        R: Send,
57    {
58        source.into_par_iter().map(map_op).collect()
59    }
60
61    /// Maps collection and collects results into vector in parallel.
62    pub fn parallel_into_collect<T, F, R>(source: Vec<T>, map_op: F) -> Vec<R>
63    where
64        T: Send + Sync,
65        F: Fn(T) -> R + Sync + Send,
66        R: Send,
67    {
68        source.into_par_iter().map(map_op).collect()
69    }
70
71    /// Performs map reduce operations in parallel.
72    pub fn map_reduce<'a, T, S, FM, FR, FD, R>(source: &'a S, map_op: FM, default_op: FD, reduce_op: FR) -> R
73    where
74        T: Send + Sync,
75        S: IntoParallelRefIterator<'a, Item = T> + ?Sized,
76        FM: Fn(T) -> R + Sync + Send,
77        FR: Fn(R, R) -> R + Sync + Send,
78        FD: Fn() -> R + Sync + Send,
79        R: Send,
80    {
81        source.par_iter().map(map_op).reduce(default_op, reduce_op)
82    }
83
84    /// Performs fold and then reduce operations in parallel.
85    pub fn fold_reduce<T, S, FI, FF, FR, R>(source: S, identity: FI, fold: FF, reduce: FR) -> R
86    where
87        T: Send + Sync,
88        S: IntoParallelIterator<Item = T>,
89        FI: Fn() -> R + Clone + Sync + Send,
90        FF: Fn(R, T) -> R + Sync + Send,
91        FR: Fn(R, R) -> R + Sync + Send,
92        R: Send,
93    {
94        source.into_par_iter().fold(identity.clone(), fold).reduce(identity, reduce)
95    }
96
97    /// Performs mutable foreach in parallel.
98    pub fn parallel_foreach_mut<T, F>(source: &mut [T], action: F)
99    where
100        T: Send + Sync,
101        F: Fn(&mut T) + Send + Sync,
102    {
103        source.par_iter_mut().for_each(action)
104    }
105}
106
107#[cfg(target_arch = "wasm32")]
108mod actual {
109    /// Represents a thread pool wrapper.
110    pub struct ThreadPool;
111
112    impl ThreadPool {
113        /// Creates a new instance of `ThreadPool`.
114        pub fn new(_num_threads: usize) -> Self {
115            Self {}
116        }
117
118        /// Executes given operation on thread pool (dummy).
119        pub fn execute<OP, R>(&self, op: OP) -> R
120        where
121            OP: FnOnce() -> R + Send,
122            R: Send,
123        {
124            op()
125        }
126    }
127
128    /// Creates a cartesian product returning an iterator.
129    pub fn cartesian_product<'a, A, B>(a: &'a [A], b: &'a [B]) -> impl Iterator<Item = (&'a A, &'a B)>
130    where
131        A: Send + Sync + 'a,
132        B: Send + Sync + 'a,
133    {
134        a.iter().flat_map(|a| b.iter().map(move |b| (a, b)))
135    }
136
137    /// Map collections and collects results into vector synchronously.
138    pub fn parallel_collect<T, F, R>(source: &[T], map_op: F) -> Vec<R>
139    where
140        T: Send + Sync,
141        F: Fn(&T) -> R + Sync + Send,
142        R: Send,
143    {
144        source.iter().map(map_op).collect()
145    }
146
147    /// Map collections and collects results into vector synchronously.
148    pub fn parallel_into_collect<T, F, R>(source: Vec<T>, map_op: F) -> Vec<R>
149    where
150        T: Send + Sync,
151        F: Fn(T) -> R + Sync + Send,
152        R: Send,
153    {
154        source.into_iter().map(map_op).collect()
155    }
156
157    /// Performs map and reduce operations synchronously.
158    pub fn map_reduce<T, S, FM, FR, FD, R>(source: S, map_op: FM, default_op: FD, reduce_op: FR) -> R
159    where
160        T: Send + Sync,
161        S: IntoIterator<Item = T>,
162        FM: Fn(T) -> R + Sync + Send,
163        FR: Fn(R, R) -> R + Sync + Send,
164        FD: Fn() -> R + Sync + Send,
165        R: Send,
166    {
167        source.into_iter().map(map_op).fold(default_op(), reduce_op)
168    }
169
170    /// Performs fold and then reduce operations.
171    /// NOTE it behaves differently from parallel implementation.
172    pub fn fold_reduce<T, S, FI, FF, FR, R>(source: S, identity: FI, fold: FF, mut reduce: FR) -> R
173    where
174        T: Send + Sync,
175        S: IntoIterator<Item = T>,
176        FI: Fn() -> R + Sync + Send,
177        FF: FnMut(R, T) -> R + Sync + Send,
178        FR: FnMut(R, R) -> R + Sync + Send,
179        R: Send,
180    {
181        reduce(identity(), source.into_iter().fold(identity(), fold))
182    }
183
184    /// Performs mutable foreach in parallel.
185    pub fn parallel_foreach_mut<T, F>(source: &mut [T], action: F)
186    where
187        T: Send + Sync,
188        F: Fn(&mut T) + Send + Sync,
189    {
190        source.iter_mut().for_each(action)
191    }
192}