1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
use std::sync::atomic::{AtomicBool, AtomicIsize};
/// Runs `left` and then `right`, one after another, returning their output when both are done.
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
/// A scope for spawning threads.
pub struct Scope<'scope, 'env: 'scope> {
_scope: std::marker::PhantomData<&'scope mut &'scope ()>,
_env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
pub fn build_thread() -> ThreadBuilder {
ThreadBuilder
}
#[allow(unsafe_code)]
unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
Ok(scope.spawn(f))
}
}
impl<'scope, 'env> Scope<'scope, 'env> {
/// Provided with this scope, let `f` start new threads that live within it.
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
ScopedJoinHandle {
result: f(),
_marker: Default::default(),
}
}
}
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
/// Note that this implementation will run the spawned functions immediately.
pub fn threads<'env, F, R>(f: F) -> R
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
_scope: Default::default(),
_env: Default::default(),
})
}
/// A handle that can be used to join its scoped thread.
///
/// This struct is created by the [`Scope::spawn`] method and the
/// [`ScopedThreadBuilder::spawn`] method.
pub struct ScopedJoinHandle<'scope, T> {
/// Holds the result of the inner closure.
result: T,
_marker: std::marker::PhantomData<&'scope mut &'scope ()>,
}
impl<T> ScopedJoinHandle<'_, T> {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
pub fn is_finished(&self) -> bool {
true
}
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Clone,
mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
let should_interrupt = &AtomicBool::default();
let threads_left = &AtomicIsize::default();
for item in input {
consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
}
Ok(vec![state_to_rval(state)])
}
}
#[cfg(not(feature = "parallel"))]
pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
/// whose task is to aggregate these outputs into the final result returned by this function.
///
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
/// * For `reducer`, see the [`Reduce`] trait
/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
/// similar to the parallel version.
///
/// **This serial version performing all calculations on the current thread.**
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.finalize()
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.feed(finalize(state))?;
reducer.finalize()
}