use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
use std::sync::atomic::{AtomicBool, AtomicIsize};
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
pub struct Scope<'scope, 'env: 'scope> {
_scope: std::marker::PhantomData<&'scope mut &'scope ()>,
_env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
pub fn build_thread() -> ThreadBuilder {
ThreadBuilder
}
#[allow(unsafe_code)]
unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
Ok(scope.spawn(f))
}
}
impl<'scope> Scope<'scope, '_> {
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
ScopedJoinHandle {
result: f(),
_marker: Default::default(),
}
}
}
pub fn threads<'env, F, R>(f: F) -> R
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
_scope: Default::default(),
_env: Default::default(),
})
}
pub struct ScopedJoinHandle<'scope, T> {
result: T,
_marker: std::marker::PhantomData<&'scope mut &'scope ()>,
}
impl<T> ScopedJoinHandle<'_, T> {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
pub fn is_finished(&self) -> bool {
true
}
}
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Clone,
mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
let should_interrupt = &AtomicBool::default();
let threads_left = &AtomicIsize::default();
for item in input {
consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
}
Ok(vec![state_to_rval(state)])
}
}
#[cfg(not(feature = "parallel"))]
pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope};
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.finalize()
}
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.feed(finalize(state))?;
reducer.finalize()
}