git_features/parallel/
serial.rs

1use crate::parallel::Reduce;
2
3#[cfg(not(feature = "parallel"))]
4mod not_parallel {
5    /// Runs `left` and then `right`, one after another, returning their output when both are done.
6    pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
7        (left(), right())
8    }
9
10    /// A scope for spawning threads.
11    pub struct Scope<'env> {
12        _marker: std::marker::PhantomData<&'env mut &'env ()>,
13    }
14
15    pub struct ThreadBuilder<'a, 'env> {
16        scope: &'a Scope<'env>,
17    }
18
19    #[allow(unsafe_code)]
20    unsafe impl Sync for Scope<'_> {}
21
22    impl<'a, 'env> ThreadBuilder<'a, 'env> {
23        pub fn name(self, _new: String) -> Self {
24            self
25        }
26        pub fn spawn<F, T>(&self, f: F) -> std::io::Result<ScopedJoinHandle<'a, T>>
27        where
28            F: FnOnce(&Scope<'env>) -> T,
29            F: Send + 'env,
30            T: Send + 'env,
31        {
32            Ok(self.scope.spawn(f))
33        }
34    }
35
36    impl<'env> Scope<'env> {
37        /// Obtain a builder to change settings on the spawned thread.
38        pub fn builder(&self) -> ThreadBuilder<'_, 'env> {
39            ThreadBuilder { scope: self }
40        }
41        pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
42        where
43            F: FnOnce(&Scope<'env>) -> T,
44            F: Send + 'env,
45            T: Send + 'env,
46        {
47            ScopedJoinHandle {
48                result: f(self),
49                _marker: Default::default(),
50            }
51        }
52    }
53
54    /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
55    /// Note that this implementation will run the spawned functions immediately.
56    pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
57    where
58        F: FnOnce(&Scope<'env>) -> R,
59    {
60        Ok(f(&Scope {
61            _marker: Default::default(),
62        }))
63    }
64
65    /// A handle that can be used to join its scoped thread.
66    ///
67    /// This struct is created by the [`Scope::spawn`] method and the
68    /// [`ScopedThreadBuilder::spawn`] method.
69    pub struct ScopedJoinHandle<'scope, T> {
70        /// Holds the result of the inner closure.
71        result: T,
72        _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
73    }
74
75    impl<T> ScopedJoinHandle<'_, T> {
76        pub fn join(self) -> std::thread::Result<T> {
77            Ok(self.result)
78        }
79    }
80
81    /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
82    /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
83    /// for file-io as it won't make use of sorted inputs well.
84    // TODO: better docs
85    pub fn in_parallel_with_slice<I, S, R, E>(
86        input: &mut [I],
87        _thread_limit: Option<usize>,
88        mut new_thread_state: impl FnMut(usize) -> S + Clone,
89        mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
90        mut periodic: impl FnMut() -> Option<std::time::Duration>,
91        state_to_rval: impl FnOnce(S) -> R + Clone,
92    ) -> Result<Vec<R>, E> {
93        let mut state = new_thread_state(0);
94        for item in input {
95            consume(item, &mut state)?;
96            if periodic().is_none() {
97                break;
98            }
99        }
100        Ok(vec![state_to_rval(state)])
101    }
102}
103
104#[cfg(not(feature = "parallel"))]
105pub use not_parallel::{in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
106
107/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
108/// whose task is to aggregate these outputs into the final result returned by this function.
109///
110/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
111/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
112/// * For `reducer`, see the [`Reduce`] trait
113/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
114///   similar to the parallel version.
115///
116/// **This serial version performing all calculations on the current thread.**
117pub fn in_parallel<I, S, O, R>(
118    input: impl Iterator<Item = I>,
119    _thread_limit: Option<usize>,
120    new_thread_state: impl Fn(usize) -> S,
121    consume: impl Fn(I, &mut S) -> O,
122    mut reducer: R,
123) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
124where
125    R: Reduce<Input = O>,
126{
127    let mut state = new_thread_state(0);
128    for item in input {
129        drop(reducer.feed(consume(item, &mut state))?);
130    }
131    reducer.finalize()
132}