1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
pub struct Scope<'env> {
_marker: std::marker::PhantomData<&'env mut &'env ()>,
}
#[allow(unsafe_code)]
unsafe impl Sync for Scope<'_> {}
impl<'env> Scope<'env> {
pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce(&Scope<'env>) -> T,
F: Send + 'env,
T: Send + 'env,
{
ScopedJoinHandle {
result: f(self),
_marker: Default::default(),
}
}
}
pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
where
F: FnOnce(&Scope<'env>) -> R,
{
Ok(f(&Scope {
_marker: Default::default(),
}))
}
pub struct ScopedJoinHandle<'scope, T> {
result: T,
_marker: std::marker::PhantomData<&'scope mut &'scope ()>,
}
impl<T> ScopedJoinHandle<'_, T> {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
}
}
#[cfg(not(feature = "parallel"))]
pub use not_parallel::{join, threads, Scope, ScopedJoinHandle};
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
consume: impl Fn(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.finalize()
}