mod promise;
use std::sync::{Arc, Mutex};
use std::thread::spawn;
use PipelineBuilder;
use unordered::{propagate_panics};
pub(crate) struct PipelineIter<I, T>
where I: Iterator<Item=promise::Receiver<T>>
{
output: Option<I>,
worker_threads: Vec<std::thread::JoinHandle<()>>,
}
impl<I, T> PipelineIter<I, T>
where I: Iterator<Item=promise::Receiver<T>>
{
fn propagate_panics(&mut self) {
std::mem::drop(self.output.take());
propagate_panics(&mut self.worker_threads);
}
}
pub fn new<I, F, Out>(builder: PipelineBuilder<I>, callable: F) -> impl Iterator<Item=Out>
where
I: Iterator + Send + 'static,
I::Item: Send + 'static,
Out: Send + 'static,
F: Fn(I::Item) -> Out + Send + Sync + 'static,
{
let PipelineBuilder{input, num_threads, mut out_buffer} = builder;
if out_buffer < num_threads {
out_buffer = num_threads
}
let (output_tx, output_rx) = crossbeam_channel::bounded(out_buffer);
let input = PromiseIterator::wrap(input, output_tx);
let callable = Arc::new(callable);
let mut iter = PipelineIter {
output: Some(output_rx.into_iter()),
worker_threads: Vec::with_capacity(num_threads),
};
for _ in 0..num_threads {
let input = input.clone();
let callable = callable.clone();
let worker = move || {
for (value, promise_tx) in input {
let output = callable(value);
promise_tx.send(output);
}
};
iter.worker_threads.push(spawn(worker)); } iter
}
impl<I, T> std::iter::Iterator for PipelineIter<I,T>
where I: Iterator<Item=promise::Receiver<T>>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let next_promise = match self.output {
None => return None,
Some(ref mut iterator) => iterator,
}.next();
let next_promise = match next_promise {
None => {
self.propagate_panics();
return None;
},
Some(p) => p,
};
let value = match next_promise.receive() {
Ok(value) => value,
Err(_) => {
self.propagate_panics();
panic!("self.propagate_panics() should have panicked.");
},
};
Some(value)
}
}
struct PromiseIterator<I, Out>
where I: Iterator
{
iterator: Arc<Mutex<std::iter::Fuse<I>>>,
promise_sink: crossbeam_channel::Sender<promise::Receiver<Out>>,
}
impl<I, Out> PromiseIterator<I, Out>
where I: Iterator
{
fn wrap(iterator: I, chan: crossbeam_channel::Sender<promise::Receiver<Out>>) -> Self
{
PromiseIterator{
iterator: Arc::new(Mutex::new(iterator.fuse())),
promise_sink: chan,
}
}
}
impl<I, Out> Iterator for PromiseIterator<I, Out>
where I: Iterator
{
type Item = (I::Item, promise::Sender<Out>);
fn next(&mut self) -> Option<Self::Item> {
let mut iterator = self.iterator.lock().expect(
"There shoudln't be poisioning in PromiseIterator"
);
let next_value = match iterator.next() {
None => return None,
Some(value) => value,
};
let (tx, rx) = promise::new();
let result = self.promise_sink.send(rx);
if result.is_err() {
return None;
}
return Some((next_value, tx));
}
}
impl<I, Out> Clone for PromiseIterator<I, Out>
where I: Iterator
{
fn clone(&self) -> Self {
PromiseIterator {
iterator: self.iterator.clone(),
promise_sink: self.promise_sink.clone(),
}
}
}