git_features/parallel/
serial.rs1use crate::parallel::Reduce;
2
3#[cfg(not(feature = "parallel"))]
4mod not_parallel {
5 pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
7 (left(), right())
8 }
9
10 pub struct Scope<'env> {
12 _marker: std::marker::PhantomData<&'env mut &'env ()>,
13 }
14
15 pub struct ThreadBuilder<'a, 'env> {
16 scope: &'a Scope<'env>,
17 }
18
19 #[allow(unsafe_code)]
20 unsafe impl Sync for Scope<'_> {}
21
22 impl<'a, 'env> ThreadBuilder<'a, 'env> {
23 pub fn name(self, _new: String) -> Self {
24 self
25 }
26 pub fn spawn<F, T>(&self, f: F) -> std::io::Result<ScopedJoinHandle<'a, T>>
27 where
28 F: FnOnce(&Scope<'env>) -> T,
29 F: Send + 'env,
30 T: Send + 'env,
31 {
32 Ok(self.scope.spawn(f))
33 }
34 }
35
36 impl<'env> Scope<'env> {
37 pub fn builder(&self) -> ThreadBuilder<'_, 'env> {
39 ThreadBuilder { scope: self }
40 }
41 pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
42 where
43 F: FnOnce(&Scope<'env>) -> T,
44 F: Send + 'env,
45 T: Send + 'env,
46 {
47 ScopedJoinHandle {
48 result: f(self),
49 _marker: Default::default(),
50 }
51 }
52 }
53
54 pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
57 where
58 F: FnOnce(&Scope<'env>) -> R,
59 {
60 Ok(f(&Scope {
61 _marker: Default::default(),
62 }))
63 }
64
65 pub struct ScopedJoinHandle<'scope, T> {
70 result: T,
72 _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
73 }
74
75 impl<T> ScopedJoinHandle<'_, T> {
76 pub fn join(self) -> std::thread::Result<T> {
77 Ok(self.result)
78 }
79 }
80
81 pub fn in_parallel_with_slice<I, S, R, E>(
86 input: &mut [I],
87 _thread_limit: Option<usize>,
88 mut new_thread_state: impl FnMut(usize) -> S + Clone,
89 mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
90 mut periodic: impl FnMut() -> Option<std::time::Duration>,
91 state_to_rval: impl FnOnce(S) -> R + Clone,
92 ) -> Result<Vec<R>, E> {
93 let mut state = new_thread_state(0);
94 for item in input {
95 consume(item, &mut state)?;
96 if periodic().is_none() {
97 break;
98 }
99 }
100 Ok(vec![state_to_rval(state)])
101 }
102}
103
104#[cfg(not(feature = "parallel"))]
105pub use not_parallel::{in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
106
107pub fn in_parallel<I, S, O, R>(
118 input: impl Iterator<Item = I>,
119 _thread_limit: Option<usize>,
120 new_thread_state: impl Fn(usize) -> S,
121 consume: impl Fn(I, &mut S) -> O,
122 mut reducer: R,
123) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
124where
125 R: Reduce<Input = O>,
126{
127 let mut state = new_thread_state(0);
128 for item in input {
129 drop(reducer.feed(consume(item, &mut state))?);
130 }
131 reducer.finalize()
132}