#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::panic;
use std::process;
use std::thread;
#[derive(Default)]
#[must_use]
pub struct Parallel<'a> {
closures: Vec<Box<dyn FnOnce() + Send + 'a>>,
_marker: PhantomData<&'a mut &'a ()>,
}
impl<'a> Parallel<'a> {
pub fn new() -> Parallel<'a> {
Parallel {
closures: Vec::new(),
_marker: PhantomData,
}
}
pub fn add<F>(mut self, f: F) -> Parallel<'a>
where
F: FnOnce() + Send + 'a,
{
self.closures.push(Box::new(f));
self
}
pub fn each<T, I, F>(mut self, iter: I, f: F) -> Parallel<'a>
where
I: IntoIterator<Item = T>,
F: FnOnce(T) + Clone + Send + 'a,
T: Send + 'a,
{
for t in iter.into_iter() {
let f = f.clone();
self.closures.push(Box::new(|| f(t)));
}
self
}
pub fn run(self) {
let guard = NoPanic;
let mut handles = Vec::new();
for f in self.closures {
let mut f = Some(f);
let f = move || (f.take().unwrap())();
let f: Box<dyn FnMut() + Send + 'a> = Box::new(f);
let f: Box<dyn FnMut() + Send + 'static> = unsafe { mem::transmute(f) };
handles.push(thread::spawn(f));
}
let mut last_err = None;
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
}
}
drop(guard);
if let Some(err) = last_err {
panic::resume_unwind(err);
}
}
}
impl fmt::Debug for Parallel<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Parallel")
.field("len", &self.closures.len())
.finish()
}
}
struct NoPanic;
impl Drop for NoPanic {
fn drop(&mut self) {
if thread::panicking() {
process::abort();
}
}
}