use crate::default_fns::{map_count, reduce_sum, reduce_unit};
use crate::runner::{DefaultRunner, ParallelRunner};
use crate::{ChunkSize, IterationOrder, NumThreads, ParThreadPool, RunnerWithPool, Sum};
use crate::{ParCollectInto, ParIter, generic_values::fallible_iterators::ResultOfIter};
use core::cmp::Ordering;
pub trait ParIterResult<R = DefaultRunner>
where
R: ParallelRunner,
{
type Item;
type Err;
type RegularItem: IntoResult<Self::Item, Self::Err>;
type RegularParIter: ParIter<R, Item = Self::RegularItem>;
fn con_iter_len(&self) -> Option<usize>;
fn into_regular_par(self) -> Self::RegularParIter;
fn from_regular_par(regular_par: Self::RegularParIter) -> Self;
fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self
where
Self: Sized,
{
Self::from_regular_par(self.into_regular_par().num_threads(num_threads))
}
fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self
where
Self: Sized,
{
Self::from_regular_par(self.into_regular_par().chunk_size(chunk_size))
}
fn iteration_order(self, order: IterationOrder) -> Self
where
Self: Sized,
{
Self::from_regular_par(self.into_regular_par().iteration_order(order))
}
fn with_runner<Q: ParallelRunner>(
self,
orchestrator: Q,
) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err>;
fn with_pool<P: ParThreadPool>(
self,
pool: P,
) -> impl ParIterResult<RunnerWithPool<P, R::Executor>, Item = Self::Item, Err = Self::Err>
where
Self: Sized,
{
let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
self.with_runner(runner)
}
fn map<Out, Map>(self, map: Map) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
where
Self: Sized,
Map: Fn(Self::Item) -> Out + Sync + Clone,
Out: Send,
{
let par = self.into_regular_par();
let map = par.map(move |x| x.into_result().map(map.clone()));
map.into_fallible_result()
}
fn filter<Filter>(
self,
filter: Filter,
) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
where
Self: Sized,
Filter: Fn(&Self::Item) -> bool + Sync + Clone,
Self::Item: Send,
{
let par = self.into_regular_par();
let filter_map = par.filter_map(move |x| match x.into_result() {
Ok(x) => match filter(&x) {
true => Some(Ok(x)),
false => None,
},
Err(e) => Some(Err(e)),
});
filter_map.into_fallible_result()
}
fn flat_map<IOut, FlatMap>(
self,
flat_map: FlatMap,
) -> impl ParIterResult<R, Item = IOut::Item, Err = Self::Err>
where
Self: Sized,
IOut: IntoIterator,
IOut::Item: Send,
FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
{
let par = self.into_regular_par();
let map = par.flat_map(move |x| match x.into_result() {
Ok(x) => ResultOfIter::ok(flat_map(x).into_iter()),
Err(e) => ResultOfIter::err(e),
});
map.into_fallible_result()
}
fn filter_map<Out, FilterMap>(
self,
filter_map: FilterMap,
) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
where
Self: Sized,
FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
Out: Send,
{
let par = self.into_regular_par();
let filter_map = par.filter_map(move |x| match x.into_result() {
Ok(x) => filter_map(x).map(|x| Ok(x)),
Err(e) => Some(Err(e)),
});
filter_map.into_fallible_result()
}
fn inspect<Operation>(
self,
operation: Operation,
) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
where
Self: Sized,
Operation: Fn(&Self::Item) + Sync + Clone,
Self::Item: Send,
{
let map = move |x| {
operation(&x);
x
};
self.map(map)
}
fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
where
C: ParCollectInto<Self::Item>,
Self::Item: Send,
Self::Err: Send;
fn collect<C>(self) -> Result<C, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
C: ParCollectInto<Self::Item>,
{
let output = C::empty(self.con_iter_len());
self.collect_into(output)
}
fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
where
Self::Item: Send,
Self::Err: Send,
Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
fn all<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Predicate: Fn(&Self::Item) -> bool + Sync,
{
let violates = |x: &Self::Item| !predicate(x);
self.find(violates).map(|x| x.is_none())
}
fn any<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Predicate: Fn(&Self::Item) -> bool + Sync,
{
self.find(predicate).map(|x| x.is_some())
}
fn count(self) -> Result<usize, Self::Err>
where
Self: Sized,
Self::Err: Send,
{
self.map(map_count)
.reduce(reduce_sum)
.map(|x| x.unwrap_or(0))
}
fn for_each<Operation>(self, operation: Operation) -> Result<(), Self::Err>
where
Self: Sized,
Self::Err: Send,
Operation: Fn(Self::Item) + Sync,
{
let map = |x| operation(x);
self.map(map).reduce(reduce_unit).map(|_| ())
}
fn max(self) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Err: Send,
Self::Item: Ord + Send,
{
self.reduce(Ord::max)
}
fn max_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
{
let reduce = |x, y| match compare(&x, &y) {
Ordering::Greater | Ordering::Equal => x,
Ordering::Less => y,
};
self.reduce(reduce)
}
fn max_by_key<Key, GetKey>(self, key: GetKey) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Key: Ord,
GetKey: Fn(&Self::Item) -> Key + Sync,
{
let reduce = |x, y| match key(&x).cmp(&key(&y)) {
Ordering::Greater | Ordering::Equal => x,
Ordering::Less => y,
};
self.reduce(reduce)
}
fn min(self) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Ord + Send,
Self::Err: Send,
{
self.reduce(Ord::min)
}
fn min_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
{
let reduce = |x, y| match compare(&x, &y) {
Ordering::Less | Ordering::Equal => x,
Ordering::Greater => y,
};
self.reduce(reduce)
}
fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Key: Ord,
GetKey: Fn(&Self::Item) -> Key + Sync,
{
let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
Ordering::Less | Ordering::Equal => x,
Ordering::Greater => y,
};
self.reduce(reduce)
}
fn sum<Out>(self) -> Result<Out, Self::Err>
where
Self: Sized,
Self::Item: Sum<Out>,
Self::Err: Send,
Out: Send,
{
self.map(Self::Item::map)
.reduce(Self::Item::reduce)
.map(|x| x.unwrap_or(Self::Item::zero()))
}
fn first(self) -> Result<Option<Self::Item>, Self::Err>
where
Self::Item: Send,
Self::Err: Send;
fn find<Predicate>(self, predicate: Predicate) -> Result<Option<Self::Item>, Self::Err>
where
Self: Sized,
Self::Item: Send,
Self::Err: Send,
Predicate: Fn(&Self::Item) -> bool + Sync,
{
self.filter(&predicate).first()
}
}
pub trait IntoResult<T, E> {
fn into_result(self) -> Result<T, E>;
}
impl<T, E> IntoResult<T, E> for Result<T, E> {
#[inline(always)]
fn into_result(self) -> Result<T, E> {
self
}
}