1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4pub use paralight::iter;
5use paralight::iter::GenericThreadPool;
6
7pub use self::task::*;
8pub use self::thread_pool::*;
9
10mod join_point;
12
13mod task;
15
16mod thread_pool;
18
19mod util;
21
22pub fn split_per_item() -> impl GenericThreadPool {
30 struct SplitPerItem;
31
32 unsafe impl GenericThreadPool for SplitPerItem {
33 fn upper_bounded_pipeline<Output: Send, Accum>(
34 self,
35 input_len: usize,
36 init: impl Fn() -> Accum + Sync,
37 process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
38 finalize: impl Fn(Accum) -> Output + Sync,
39 reduce: impl Fn(Output, Output) -> Output,
40 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
41 ) -> Output {
42 ThreadPool::with_current(|f| {
43 f.split_per_item().upper_bounded_pipeline(
44 input_len,
45 init,
46 process_item,
47 finalize,
48 reduce,
49 cleanup,
50 )
51 })
52 }
53
54 fn iter_pipeline<Output: Send>(
55 self,
56 input_len: usize,
57 accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
58 reduce: impl paralight::iter::Accumulator<Output, Output>,
59 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
60 ) -> Output {
61 ThreadPool::with_current(|f| {
62 f.split_per_item()
63 .iter_pipeline(input_len, accum, reduce, cleanup)
64 })
65 }
66 }
67
68 SplitPerItem
69}
70
71pub fn split_per(chunk_size: usize) -> impl GenericThreadPool {
74 struct ThreadPerChunk(usize);
75
76 unsafe impl GenericThreadPool for ThreadPerChunk {
77 fn upper_bounded_pipeline<Output: Send, Accum>(
78 self,
79 input_len: usize,
80 init: impl Fn() -> Accum + Sync,
81 process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
82 finalize: impl Fn(Accum) -> Output + Sync,
83 reduce: impl Fn(Output, Output) -> Output,
84 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
85 ) -> Output {
86 ThreadPool::with_current(|f| {
87 f.split_by(self.0).upper_bounded_pipeline(
88 input_len,
89 init,
90 process_item,
91 finalize,
92 reduce,
93 cleanup,
94 )
95 })
96 }
97
98 fn iter_pipeline<Output: Send>(
99 self,
100 input_len: usize,
101 accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
102 reduce: impl paralight::iter::Accumulator<Output, Output>,
103 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
104 ) -> Output {
105 ThreadPool::with_current(|f| {
106 f.split_by(self.0)
107 .iter_pipeline(input_len, accum, reduce, cleanup)
108 })
109 }
110 }
111
112 ThreadPerChunk(chunk_size)
113}
114
115pub fn split_by(chunks: usize) -> impl GenericThreadPool {
119 struct Chunks(usize);
120
121 unsafe impl GenericThreadPool for Chunks {
122 fn upper_bounded_pipeline<Output: Send, Accum>(
123 self,
124 input_len: usize,
125 init: impl Fn() -> Accum + Sync,
126 process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
127 finalize: impl Fn(Accum) -> Output + Sync,
128 reduce: impl Fn(Output, Output) -> Output,
129 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
130 ) -> Output {
131 ThreadPool::with_current(|f| {
132 f.split_by(self.0).upper_bounded_pipeline(
133 input_len,
134 init,
135 process_item,
136 finalize,
137 reduce,
138 cleanup,
139 )
140 })
141 }
142
143 fn iter_pipeline<Output: Send>(
144 self,
145 input_len: usize,
146 accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
147 reduce: impl paralight::iter::Accumulator<Output, Output>,
148 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
149 ) -> Output {
150 ThreadPool::with_current(|f| {
151 f.split_by(self.0)
152 .iter_pipeline(input_len, accum, reduce, cleanup)
153 })
154 }
155 }
156
157 Chunks(chunks)
158}
159
160pub fn split_by_threads() -> impl GenericThreadPool {
165 struct SplitByThreads;
166
167 unsafe impl GenericThreadPool for SplitByThreads {
168 fn upper_bounded_pipeline<Output: Send, Accum>(
169 self,
170 input_len: usize,
171 init: impl Fn() -> Accum + Sync,
172 process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
173 finalize: impl Fn(Accum) -> Output + Sync,
174 reduce: impl Fn(Output, Output) -> Output,
175 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
176 ) -> Output {
177 ThreadPool::with_current(|f| {
178 f.split_by_threads().upper_bounded_pipeline(
179 input_len,
180 init,
181 process_item,
182 finalize,
183 reduce,
184 cleanup,
185 )
186 })
187 }
188
189 fn iter_pipeline<Output: Send>(
190 self,
191 input_len: usize,
192 accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
193 reduce: impl paralight::iter::Accumulator<Output, Output>,
194 cleanup: &(impl paralight::iter::SourceCleanup + Sync),
195 ) -> Output {
196 ThreadPool::with_current(|f| {
197 f.split_by_threads()
198 .iter_pipeline(input_len, accum, reduce, cleanup)
199 })
200 }
201 }
202
203 SplitByThreads
204}
205
206pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
209where
210 A: FnOnce() -> RA + Send,
211 B: FnOnce() -> RB + Send,
212 RA: Send,
213 RB: Send,
214{
215 ThreadPool::with_current(|pool| pool.join(oper_a, oper_b))
216}
217
218pub fn num_threads() -> usize {
220 ThreadPool::with_current(|pool| pool.num_threads())
221}
222
223pub fn spawn<T: 'static + Send>(f: impl 'static + Send + FnOnce() -> T) -> Task<T> {
226 ThreadPool::with_current(|pool| pool.spawn(f))
227}
228
229#[cfg(test)]
231mod tests {
232 use crate::iter::*;
233
234 #[test]
236 fn test_add() {
237 let len = 10_000;
238 let mut output = vec![0; len];
239 let left = (0..len as u64).collect::<Vec<u64>>();
240 let right = (0..len as u64).collect::<Vec<u64>>();
241 let expected_output = (0..len as u64).map(|x| 2 * x).collect::<Vec<u64>>();
242
243 let output_slice = output.as_mut_slice();
244 let left_slice = left.as_slice();
245 let right_slice = right.as_slice();
246
247 (
248 std::hint::black_box(output_slice.par_iter_mut()),
249 std::hint::black_box(left_slice).par_iter(),
250 std::hint::black_box(right_slice).par_iter(),
251 )
252 .zip_eq()
253 .with_thread_pool(crate::split_by_threads())
254 .for_each(|(out, &a, &b)| *out = a + b);
255
256 assert_eq!(output, expected_output);
257 }
258
259 #[test]
261 fn test_sum() {
262 let len = 10_000;
263 let input = (0..len as u64).collect::<Vec<u64>>();
264 let input_slice = input.as_slice();
265 let result = input_slice
266 .par_iter()
267 .with_thread_pool(crate::split_by_threads())
268 .sum::<u64>();
269 assert_eq!(result, 49995000);
270 }
271
272 #[test]
274 fn test_for_each() {
275 let mut result = [0; 5];
276 (result.par_iter_mut(), (1..=5).into_par_iter())
277 .zip_eq()
278 .with_thread_pool(crate::split_by_threads())
279 .for_each(|(out, x)| *out = x * x - 1);
280 assert_eq!([0, 3, 8, 15, 24], result);
281 }
282
283 #[test]
285 fn execute_many() {
286 let first_task = crate::spawn(|| 2);
287 let second_task = crate::spawn(|| 2);
288 assert_eq!(first_task.join(), second_task.join());
289
290 for _ in 0..1000 {
291 let third_task = crate::spawn(|| std::thread::sleep(std::time::Duration::new(0, 10)));
292 let fourth_task = crate::spawn(|| std::thread::sleep(std::time::Duration::new(0, 200)));
293 assert_eq!(third_task.join(), fourth_task.join());
294 }
295 }
296}