#![doc(test(attr(allow(unused_variables))))]
extern crate num_cpus;
extern crate coco;
extern crate synchronoise;
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::SeqCst;
use std::sync::mpsc::channel;
use std::thread;
use coco::deque;
use synchronoise::{SignalEvent, SignalKind};
pub trait Polyester<T>
{
fn par_fold<Acc, InnerFold, OuterFold>(
self,
seed: Acc,
inner_fold: InnerFold,
outer_fold: OuterFold,
) -> Acc
where
Acc: Clone + Send + 'static,
InnerFold: Fn(Acc, T) -> Acc + Send + Sync + 'static,
OuterFold: Fn(Acc, Acc) -> Acc;
fn par_map<Map, Out>(self, map: Map) -> ParMap<Self, Map, Out>
where
Self: Sized,
Map: Fn(T) -> Out + Send + Sync + 'static,
Out: Send + 'static;
}
impl<T, I> Polyester<T> for I
where
I: Iterator<Item=T> + Send + 'static,
T: Send + 'static,
{
fn par_fold<Acc, InnerFold, OuterFold>(
self,
seed: Acc,
inner_fold: InnerFold,
outer_fold: OuterFold,
) -> Acc
where
T: Send + 'static,
Acc: Clone + Send + 'static,
InnerFold: Fn(Acc, T) -> Acc + Send + Sync + 'static,
OuterFold: Fn(Acc, Acc) -> Acc
{
let num_jobs = get_thread_count();
if num_jobs == 1 {
return self.fold(seed, inner_fold);
}
let hopper = Hopper::new(self, num_jobs);
let inner_fold = Arc::new(inner_fold);
let (report, recv) = channel();
for id in 0..num_jobs {
let hopper = hopper.clone();
let acc = seed.clone();
let inner_fold = inner_fold.clone();
let report = report.clone();
std::thread::spawn(move || {
let mut acc = acc;
loop {
let item = hopper.get_item(id);
if let Some(item) = item {
acc = inner_fold(acc, item);
} else {
break;
}
}
report.send(acc).unwrap();
});
}
drop(report);
let mut acc: Option<Acc> = None;
for res in recv.iter() {
if acc.is_none() {
acc = Some(res);
} else {
acc = acc.map(|acc| outer_fold(acc, res));
}
}
acc.unwrap_or(seed)
}
fn par_map<Map, Out>(self, map: Map) -> ParMap<I, Map, Out>
where
Self: Sized,
T: Send + 'static,
Map: Fn(T) -> Out + Send + Sync + 'static,
Out: Send + 'static
{
ParMap {
iter: Some(self),
map: Some(map),
recv: None,
}
}
}
pub struct ParMap<Iter, Map, T>
{
iter: Option<Iter>,
map: Option<Map>,
recv: Option<mpsc::IntoIter<T>>,
}
impl<Iter, Map, T> Iterator for ParMap<Iter, Map, T>
where
Iter: Iterator + Send + 'static,
Iter::Item: Send + 'static,
Map: Fn(Iter::Item) -> T + Send + Sync + 'static,
T: Send + 'static,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if let (Some(iter), Some(map)) = (self.iter.take(), self.map.take()) {
let num_jobs = get_thread_count();
let hopper = Hopper::new(iter, num_jobs);
let map = Arc::new(map);
let (report, recv) = channel();
for id in 0..num_jobs {
let hopper = hopper.clone();
let report = report.clone();
let map = map.clone();
std::thread::spawn(move || {
loop {
let item = hopper.get_item(id);
if let Some(item) = item {
report.send(map(item)).unwrap();
} else {
break;
}
}
});
}
self.recv = Some(recv.into_iter());
}
self.recv.as_mut().and_then(|r| r.next())
}
}
struct Hopper<T> {
cache: Vec<deque::Stealer<T>>,
signals: Vec<SignalEvent>,
done: AtomicBool,
}
impl<T> Hopper<T> {
fn new<I>(iter: I, slots: usize) -> Arc<Hopper<T>>
where
I: Iterator<Item=T> + Send + 'static,
T: Send + 'static,
{
let mut fillers = Vec::with_capacity(slots);
let mut cache = Vec::with_capacity(slots);
let mut signals = Vec::<SignalEvent>::with_capacity(slots);
for _ in 0..slots {
let (filler, stealer) = deque::new();
fillers.push(filler);
cache.push(stealer);
signals.push(SignalEvent::new(true, SignalKind::Auto));
}
let ret = Arc::new(Hopper {
cache: cache,
signals: signals,
done: AtomicBool::new(false),
});
let hopper = ret.clone();
thread::spawn(move || {
let hopper = hopper;
let fillers = fillers;
let mut current_slot = 0;
let mut rounds = 0usize;
for item in iter {
fillers[current_slot].push(item);
current_slot = (current_slot + 1) % slots;
if current_slot == 0 {
rounds += 1;
}
if (rounds % (slots * 2)) == 0 {
hopper.signals[current_slot].signal();
}
}
hopper.done.store(true, SeqCst);
for signal in &hopper.signals {
signal.signal();
}
});
ret
}
fn get_item(&self, id: usize) -> Option<T> {
loop {
if let Some(item) = self.cache[id].steal() {
return Some(item);
}
let mut current_id = id;
loop {
current_id = (current_id + 1) % self.cache.len();
if current_id == id { break; }
if let Some(item) = self.cache[current_id].steal() {
return Some(item);
}
}
if self.cache[id].len() == 0 {
if self.done.load(SeqCst) {
return None;
}
self.signals[id].wait();
}
}
}
}
static THREAD_COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
pub fn set_thread_count(count: usize) {
THREAD_COUNT.store(count, SeqCst);
}
fn get_thread_count() -> usize {
let count = THREAD_COUNT.load(SeqCst);
if count == 0 {
num_cpus::get()
} else {
count
}
}
#[cfg(test)]
mod tests {
use super::Polyester;
use std::time::{Instant, Duration};
fn secs_millis(dur: Duration) -> (u64, u32) {
(dur.as_secs(), dur.subsec_nanos() / 1_000_000)
}
#[test]
fn basic_fold() {
let before = Instant::now();
let par = (0..1_000_000).par_fold(0usize, |l,r| l+r, |l,r| l+r);
let after_par = Instant::now();
let seq = (0..1_000_000).fold(0usize, |l,r| l+r);
let after_seq = Instant::now();
let par_dur = secs_millis(after_par.duration_since(before));
let seq_dur = secs_millis(after_seq.duration_since(after_par));
println!("");
println!(" parallel fold: {}.{:03}s", par_dur.0, par_dur.1);
println!(" sequential fold: {}.{:03}s", seq_dur.0, seq_dur.1);
assert_eq!(par, seq);
}
#[test]
fn basic_map() {
let before = Instant::now();
let mut par = (0..1_000_000).par_map(|x| x*x).collect::<Vec<usize>>();
let after_par = Instant::now();
let mut seq = (0..1_000_000).map(|x| x*x).collect::<Vec<usize>>();
let after_seq = Instant::now();
par.sort();
seq.sort();
let par_dur = secs_millis(after_par.duration_since(before));
let seq_dur = secs_millis(after_seq.duration_since(after_par));
println!("");
println!(" parallel map: {}.{:03}s", par_dur.0, par_dur.1);
println!(" sequential map: {}.{:03}s", seq_dur.0, seq_dur.1);
assert_eq!(par, seq);
}
}