#[cfg(feature = "parallel")]
mod stepped {
use crate::parallel::num_threads;
pub struct Stepwise<Reduce: super::Reduce> {
receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
_threads: Vec<std::thread::JoinHandle<()>>,
reducer: Option<Reduce>,
}
impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
fn drop(&mut self) {
let (_, sink) = std::sync::mpsc::channel();
drop(std::mem::replace(&mut self.receive_result, sink));
let mut last_err = None;
for handle in std::mem::take(&mut self._threads) {
if let Err(err) = handle.join() {
last_err = Some(err);
};
}
if let Some(thread_err) = last_err {
std::panic::resume_unwind(thread_err);
}
}
}
impl<Reduce: super::Reduce> Stepwise<Reduce> {
pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
input: InputIter,
thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
InputIter: Iterator<Item = I> + Send + 'static,
ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
Reduce: super::Reduce<Input = O> + 'static,
I: Send + 'static,
O: Send + 'static,
{
let num_threads = num_threads(thread_limit);
let mut threads = Vec::with_capacity(num_threads + 1);
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
for thread_id in 0..num_threads {
let handle = std::thread::spawn({
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
break;
}
}
}
});
threads.push(handle);
}
threads.push(std::thread::spawn(move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
}));
receive_result
};
Stepwise {
_threads: threads,
receive_result,
reducer: Some(reducer),
}
}
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer
.take()
.expect("this is the last call before consumption")
.finalize()
}
}
impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.receive_result
.recv()
.ok()
.and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
}
}
impl<R: super::Reduce> super::Finalize for Stepwise<R> {
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
#[cfg(not(feature = "parallel"))]
mod stepped {
pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
input: InputIter,
consume: ConsumeFn,
thread_state: ThreadState,
reducer: Reduce,
}
impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
where
InputIter: Iterator<Item = I> + Send,
ConsumeFn: Fn(I, &mut S) -> O + Send + Sync,
Reduce: super::Reduce<Input = O>,
I: Send,
O: Send,
{
pub fn new<ThreadStateFn>(
input: InputIter,
_thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
ThreadStateFn: Fn(usize) -> S + Send + Sync,
{
Stepwise {
input,
consume,
thread_state: new_thread_state(0),
reducer,
}
}
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer.finalize()
}
}
impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
where
InputIter: Iterator<Item = I> + Send,
ConsumeFn: Fn(I, &mut ThreadState) -> O + Send + Sync,
Reduce: super::Reduce<Input = O>,
I: Send,
O: Send,
{
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.input
.next()
.map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
}
}
impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
where
InputIter: Iterator<Item = I> + Send,
ConsumeFn: Fn(I, &mut S) -> O + Send + Sync,
R: super::Reduce<Input = O>,
I: Send,
O: Send,
{
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
use std::marker::PhantomData;
pub use stepped::Stepwise;
pub trait Reduce {
type Input;
type FeedProduce;
type Output;
type Error;
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
fn finalize(self) -> Result<Self::Output, Self::Error>;
}
pub struct IdentityWithResult<Input, Error> {
_input: PhantomData<Input>,
_error: PhantomData<Error>,
}
impl<Input, Error> Default for IdentityWithResult<Input, Error> {
fn default() -> Self {
IdentityWithResult {
_input: Default::default(),
_error: Default::default(),
}
}
}
impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
type Input = Result<Input, Self::Error>;
type FeedProduce = Input;
type Output = ();
type Error = Error;
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
item
}
fn finalize(self) -> Result<Self::Output, Self::Error> {
Ok(())
}
}
pub trait Finalize {
type Reduce: self::Reduce;
fn finalize(
self,
) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
}