use std::sync::Arc;
use std::sync::mpsc::{self, Sender, Receiver};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::iter::IntoIterator;
use crossbeam::{Scope, ScopedJoinHandle};
struct Packet<T> {
idx: usize,
data: Option<T>,
}
impl<T> PartialOrd for Packet<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) }
}
impl<T> Ord for Packet<T> {
fn cmp(&self, other: &Self) -> Ordering { other.idx.cmp(&self.idx) }
}
impl<T> PartialEq for Packet<T> {
fn eq(&self, other: &Self) -> bool { self.idx == other.idx }
}
impl<T> Eq for Packet<T> {}
pub struct UnorderedParMap<T: Send> {
rx: Receiver<Packet<T>>,
_guards: Vec<ScopedJoinHandle<()>>
}
impl<T: Send> Iterator for UnorderedParMap<T> {
type Item = (usize, T);
fn next(&mut self) -> Option<(usize, T)> {
match self.rx.recv() {
Ok(Packet { data: Some(x), idx }) => Some((idx, x)),
Ok(Packet { data: None, .. }) => {
panic!("simple_parallel::unordered_map: closure panicked")
}
Err(mpsc::RecvError) => None,
}
}
}
struct Panicker<T: Send> {
tx: Sender<Packet<T>>,
idx: usize,
all_ok: bool
}
impl<T: Send> Drop for Panicker<T> {
fn drop(&mut self) {
if !self.all_ok {
let _ = self.tx.send(Packet { idx: self.idx, data: None });
}
}
}
pub fn unordered_map<'a, I: IntoIterator, F, T>(scope: &Scope<'a>, iter: I, f: F) -> UnorderedParMap<T>
where I::Item: Send + 'a,
F: 'a + Send + Sync + Fn(I::Item) -> T,
T: Send + 'a
{
let (tx, rx) = mpsc::channel();
let f = Arc::new(f);
let guards = iter.into_iter().enumerate().map(|(idx, elem)| {
let tx = tx.clone();
let f = f.clone();
scope.spawn(move || {
let mut p = Panicker { tx: tx, idx: idx, all_ok: false };
let val = f(elem);
let _ = p.tx.send(Packet { idx: idx, data: Some(val) });
p.all_ok = true;
})
}).collect();
UnorderedParMap {
rx: rx,
_guards: guards,
}
}
pub struct ParMap<T: Send> {
unordered: UnorderedParMap<T>,
looking_for: usize,
queue: BinaryHeap<Packet<T>>
}
impl<T: Send> Iterator for ParMap<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
loop {
if self.queue.peek().map_or(false, |x| x.idx == self.looking_for) {
let packet = self.queue.pop().unwrap();
self.looking_for += 1;
match packet.data {
Some(x) => return Some(x),
None => panic!("simple_parallel::map: closure panicked")
}
}
match self.unordered.rx.recv() {
Ok(packet) => self.queue.push(packet),
Err(mpsc::RecvError) => return None,
}
}
}
}
pub fn map<'a, I: IntoIterator, F, T>(scope: &Scope<'a>, iter: I, f: F) -> ParMap<T>
where I::Item: 'a + Send,
F: 'a + Send + Sync + Fn(I::Item) -> T,
T: Send + 'a
{
ParMap {
unordered: unordered_map(scope, iter, f),
looking_for: 0,
queue: BinaryHeap::new(),
}
}