#![doc = include_str!("../README.md")]
use std::sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
};
mod parallel_map;
pub use self::parallel_map::{ParallelMap, ParallelMapBuilder};
mod readahead;
pub use self::readahead::{Readahead, ReadaheadBuilder};
mod parallel_filter;
pub use self::parallel_filter::{ParallelFilter, ParallelFilterBuilder};
pub mod profile;
pub use self::profile::{
ProfileEgress, ProfileIngress, Profiler, TotalTimeProfiler, TotalTimeStats,
};
use std::thread::Scope;
pub trait IteratorExt {
fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O>
where
Self: Sized,
Self: Iterator,
F: 'static + Send + Clone,
Self::Item: Send + 'static,
F: FnMut(Self::Item) -> O,
O: Send + 'static,
{
ParallelMapBuilder::new(self).with(f)
}
fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O>
where
Self: Sized,
Self: Iterator,
F: 'static + Send + Clone,
F: FnMut(Self::Item) -> O,
Self::Item: Send + 'static,
O: Send + 'static,
OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
{
of(ParallelMapBuilder::new(self)).with(f)
}
fn parallel_map_scoped<'env, 'scope, F, O>(
self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> ParallelMap<Self, O>
where
Self: Sized,
Self: Iterator,
F: 'env + Send + Clone,
Self::Item: Send + 'env,
F: FnMut(Self::Item) -> O,
O: Send + 'env,
{
ParallelMapBuilder::new(self).with_scoped(scope, f)
}
fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>(
self,
scope: &'scope Scope<'scope, 'env>,
of: OF,
f: F,
) -> ParallelMap<Self, O>
where
Self: Sized,
Self: Iterator,
F: 'env + Send + Clone,
Self::Item: Send + 'env,
F: FnMut(Self::Item) -> O,
O: Send + 'env,
OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
{
of(ParallelMapBuilder::new(self)).with_scoped(scope, f)
}
fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self>
where
Self: Sized,
Self: Iterator,
F: 'static + Send + Clone,
Self::Item: Send + 'static,
F: FnMut(&Self::Item) -> bool,
{
ParallelFilterBuilder::new(self).with(f)
}
fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self>
where
Self: Sized,
Self: Iterator,
F: 'static + Send + Clone,
Self::Item: Send + 'static,
F: FnMut(&Self::Item) -> bool,
OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
{
of(ParallelFilterBuilder::new(self)).with(f)
}
fn parallel_filter_scoped<'env, 'scope, F>(
self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> ParallelFilter<Self>
where
Self: Sized,
Self: Iterator,
F: 'env + Send + Clone,
Self::Item: Send + 'env,
F: FnMut(&Self::Item) -> bool,
{
ParallelFilterBuilder::new(self).with_scoped(scope, f)
}
fn parallel_filter_scoped_custom<'env, 'scope, F, OF>(
self,
scope: &'scope Scope<'scope, 'env>,
of: OF,
f: F,
) -> ParallelFilter<Self>
where
Self: Sized,
Self: Iterator,
F: 'env + Send + Clone,
Self::Item: Send + 'env,
F: FnMut(&Self::Item) -> bool,
OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
{
of(ParallelFilterBuilder::new(self)).with_scoped(scope, f)
}
fn readahead(self) -> Readahead<Self>
where
Self: Iterator + Send + 'static,
Self: Sized,
Self::Item: Send + 'static,
{
ReadaheadBuilder::new(self).with()
}
fn readahead_custom<OF>(self, of: OF) -> Readahead<Self>
where
Self: Iterator,
Self: Sized + Send + 'static,
Self::Item: Send + 'static,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
{
of(ReadaheadBuilder::new(self)).with()
}
fn readahead_scoped<'env, 'scope>(self, scope: &'scope Scope<'scope, 'env>) -> Readahead<Self>
where
Self: Sized + Send,
Self: Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope + Send,
{
ReadaheadBuilder::new(self).with_scoped(scope)
}
fn readahead_scoped_custom<'env, 'scope, OF>(
self,
scope: &'scope Scope<'scope, 'env>,
of: OF,
) -> Readahead<Self>
where
Self: Sized + Send,
Self: Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope + Send,
OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
{
of(ReadaheadBuilder::new(self)).with_scoped(scope)
}
fn profile_egress<P: profile::Profiler>(self, profiler: P) -> ProfileEgress<Self, P>
where
Self: Iterator,
Self: Sized,
{
ProfileEgress::new(self, profiler)
}
fn profile_ingress<P: profile::Profiler>(self, profiler: P) -> ProfileIngress<Self, P>
where
Self: Iterator,
Self: Sized,
{
ProfileIngress::new(self, profiler)
}
fn readahead_profiled<TxP, RxP>(
self,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
where
TxP: profile::Profiler + Send + 'static,
RxP: profile::Profiler,
Self: Iterator,
Self: Sized,
Self: Send + 'static,
Self::Item: Send + 'static,
{
self.profile_egress(tx_profiler)
.readahead()
.profile_ingress(rx_profiler)
}
fn readahead_scoped_profiled<'env, 'scope, TxP, RxP>(
self,
scope: &'scope Scope<'scope, 'env>,
tx_profiler: TxP,
rx_profiler: RxP,
) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
where
Self: Sized + Send,
Self: Iterator + 'scope + 'env,
Self::Item: Send + 'env + 'scope + Send,
TxP: profile::Profiler + Send + 'static,
RxP: profile::Profiler,
{
self.profile_egress(tx_profiler)
.readahead_scoped(scope)
.profile_ingress(rx_profiler)
}
}
impl<I> IteratorExt for I where I: Iterator {}
struct DropIndicator {
canceled: bool,
indicator: Arc<AtomicBool>,
}
impl DropIndicator {
fn new(indicator: Arc<AtomicBool>) -> Self {
Self {
canceled: false,
indicator,
}
}
fn cancel(mut self) {
self.canceled = true;
}
}
impl Drop for DropIndicator {
fn drop(&mut self) {
if !self.canceled {
self.indicator.store(true, SeqCst);
}
}
}
#[cfg(test)]
mod tests;