#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::fmt;
use std::mem;
use std::panic;
use std::process;
use std::sync::mpsc;
use std::thread;
#[derive(Default)]
#[must_use]
pub struct Parallel<'a, T> {
closures: Vec<Box<dyn FnOnce() -> T + Send + 'a>>,
}
impl<'a, T> Parallel<'a, T> {
pub fn new() -> Parallel<'a, T> {
Parallel {
closures: Vec::new(),
}
}
pub fn add<F>(mut self, f: F) -> Parallel<'a, T>
where
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
self.closures.push(Box::new(f));
self
}
pub fn each<A, I, F>(mut self, iter: I, f: F) -> Parallel<'a, T>
where
I: IntoIterator<Item = A>,
F: FnOnce(A) -> T + Clone + Send + 'a,
A: Send + 'a,
T: Send + 'a,
{
for t in iter.into_iter() {
let f = f.clone();
self.closures.push(Box::new(|| f(t)));
}
self
}
pub fn run(self) -> Vec<T>
where
T: Send + 'a,
{
let mut closures = self.closures.into_iter();
let f = match closures.next() {
None => return Vec::new(),
Some(f) => f,
};
let guard = NoPanic;
let mut handles = Vec::new();
let mut receivers = Vec::new();
for f in closures {
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();
let f: Box<dyn FnOnce() + Send + 'a> = Box::new(f);
let f: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(f) };
handles.push(thread::spawn(f));
receivers.push(receiver);
}
let mut results = Vec::new();
let mut last_err = None;
match panic::catch_unwind(panic::AssertUnwindSafe(f)) {
Ok(r) => results.push(r),
Err(err) => last_err = Some(err),
}
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
}
}
drop(guard);
if let Some(err) = last_err {
panic::resume_unwind(err);
}
for receiver in receivers {
results.push(receiver.recv().unwrap());
}
results
}
}
impl<T> fmt::Debug for Parallel<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Parallel")
.field("len", &self.closures.len())
.finish()
}
}
struct NoPanic;
impl Drop for NoPanic {
fn drop(&mut self) {
if thread::panicking() {
process::abort();
}
}
}