use std::sync::{Arc, Mutex};
use std::thread::spawn;
use PipelineBuilder;
use panic_guard::PanicGuard;
pub(crate) struct PipelineIter<I>
where I: Iterator
{
output: Option<I>,
worker_threads: Vec<std::thread::JoinHandle<()>>,
}
impl<I> PipelineIter<I>
where I: Iterator
{
pub fn new<F, Out>(builder: PipelineBuilder<I>, callable: F) -> impl Iterator<Item=Out>
where I: Iterator + Send + 'static,
I::Item: Send + 'static,
Out: Send + 'static,
F: Fn(I::Item) -> Out + Send + Sync + 'static,
{
let PipelineBuilder{input, num_threads, out_buffer} = builder;
let input = SharedIterator::wrap(input);
let (output_tx, output_rx) = crossbeam_channel::bounded(out_buffer);
let callable = Arc::new(callable);
let mut iter = PipelineIter {
output: Some(output_rx.into_iter()),
worker_threads: Vec::with_capacity(num_threads),
};
for _ in 0..num_threads {
let input = input.clone();
let output_tx = PanicGuard::new(output_tx.clone());
let callable = callable.clone();
iter.worker_threads.push(spawn(move || {
for value in input {
let output = callable(value);
let result = output_tx.send(output);
if result.is_err() {
break;
}
}
})); } iter
}
fn propagate_panics(&mut self) {
std::mem::drop(self.output.take());
propagate_panics(&mut self.worker_threads)
}
}
pub(crate) fn propagate_panics(threads: &mut Vec<std::thread::JoinHandle<()>>) {
let workers = std::mem::replace(threads, Vec::new());
for joiner in workers {
let panic_err = match joiner.join() {
Ok(_) => continue, Err(err) => err,
};
let orig_msg = panic_msg_from(panic_err.as_ref());
panic!("Worker thread panicked with message: [{}]", orig_msg);
}
}
use std::any::Any;
fn panic_msg_from<'a>(panic_data: &'a dyn Any) -> &'a str {
if let Some(msg) = panic_data.downcast_ref::<&'static str>() {
return msg;
}
if let Some(msg) = panic_data.downcast_ref::<String>() {
return msg.as_str();
}
"<Unrecoverable panic message.>"
}
impl<I> std::iter::Iterator for PipelineIter<I>
where I: Iterator, I::Item: ResultTrait {
type Item = <I::Item as ResultTrait>::Ok;
fn next(&mut self) -> Option<Self::Item> {
let next = {
let output = match self.output {
None => return None,
Some(ref mut output) => output,
};
output.next()
};
let next_result = match next {
Some(result) => result,
None => {
self.propagate_panics();
return None
},
};
let next_value = match next_result.result() {
Ok(value) => value,
Err(_) => {
self.propagate_panics();
return None;
}
};
Some(next_value)
}
}
struct SharedIterator<I: Iterator> {
iterator: Arc<Mutex<std::iter::Fuse<I>>>,
}
impl<I: Iterator> SharedIterator<I> {
fn wrap(iterator: I) -> Self {
SharedIterator{
iterator: Arc::new(
Mutex::new(
iterator.fuse()
)
)
}
}
}
impl<I: Iterator> Clone for SharedIterator<I> {
fn clone(&self) -> Self {
SharedIterator{iterator: self.iterator.clone()}
}
}
impl<I: Iterator> Iterator for SharedIterator<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
let mut iterator = self.iterator.lock().expect("No poisoning in SharedIterator");
iterator.next()
}
}
pub(crate) trait ResultTrait {
type Ok;
type Err;
fn result(self) -> Result<Self::Ok, Self::Err>;
}
impl<O, E> ResultTrait for Result<O, E> {
type Ok = O;
type Err = E;
#[inline]
fn result(self) -> Result<O, E> { self }
}