extern crate crossbeam;
extern crate rand;
mod arena;
mod job;
mod worker;
pub mod iter;
pub use iter::{BorrowSpliterator, BorrowSpliteratorMut, IntoSpliterator, Split, Spliterator};
use crossbeam::sync::chase_lev::deque;
use rand::{Rng, SeedableRng, thread_rng, XorShiftRng};
use self::arena::Arena;
use self::worker::{SharedWorkerData, Worker};
use std::io::Error;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
const INITIAL_CAPACITY: usize = 256;
struct WorkerHandle {
thread: Option<thread::JoinHandle<()>>,
}
pub struct Spawner<'pool, 'scope> {
worker: &'pool Worker,
counter: *const AtomicUsize,
_marker: PhantomData<*mut &'scope mut ()>,
}
impl<'pool, 'scope> Spawner<'pool, 'scope> {
pub fn recurse<F>(&self, f: F)
where F: 'scope + Send + FnOnce(&Spawner<'pool, 'scope>)
{
use std::mem;
struct SendCounter(*const AtomicUsize);
unsafe impl Send for SendCounter {}
unsafe {
let counter: SendCounter = SendCounter(self.counter);
if !self.counter.is_null() {
(*self.counter).fetch_add(1, Ordering::AcqRel);
}
self.worker.submit_internal(self.counter, move |worker| {
let SendCounter(count_ptr) = counter;
let spawner = make_spawner(worker, count_ptr);
f(mem::transmute(&spawner))
});
}
}
pub fn submit<F>(&self, f: F) where F: 'scope + Send + FnOnce() {
if !self.counter.is_null() {
unsafe { (*self.counter).fetch_add(1, Ordering::AcqRel) };
}
unsafe {
self.worker.submit_internal(self.counter, move |_| {
f();
});
}
}
pub fn scope<'new, F, R>(&'new self, f: F) -> R
where 'scope: 'new,
F: 'new + FnOnce(&Spawner<'pool, 'new>) -> R,
R: 'new
{
self.worker.scope(f)
}
#[allow(non_camel_case_types)]
pub fn join<A, B, R_A, R_B>(&self, oper_a: A, oper_b: B) -> (R_A, R_B)
where A: Send + for<'new> FnOnce(&Spawner<'pool, 'new>) -> R_A,
B: Send + for<'new> FnOnce(&Spawner<'pool, 'new>) -> R_B,
R_A: Send,
R_B: Send,
{
let mut a_dest = None;
let mut b_dest = None;
self.scope(|scope| {
scope.recurse(|scope| a_dest = Some(oper_a(&scope)));
b_dest = Some(oper_b(&scope));
});
(a_dest.unwrap(), b_dest.unwrap())
}
}
fn make_spawner<'a, 'b>(worker: &'a Worker, counter: *const AtomicUsize) -> Spawner<'a, 'b> {
Spawner {
worker: worker,
counter: counter,
_marker: PhantomData,
}
}
fn worker_main(worker: Worker) {
while !worker.should_shutdown() {
unsafe { worker.run_next(true) };
}
worker.clear();
}
pub struct Pool {
workers: Vec<WorkerHandle>,
local_worker: Worker,
}
impl Pool {
fn new(n: usize) -> Result<Self, Error> {
let (mut poppers, stealers) = {
let mut poppers = Vec::with_capacity(n + 1);
let mut stealers = Vec::with_capacity(n + 1);
for _ in 0..n+1 {
let (p, s) = deque();
poppers.push(p);
stealers.push(s);
}
(poppers.into_iter(), stealers)
};
let mut rng = thread_rng();
let arenas = (0..n + 1).map(|_| Arena::new()).collect::<Vec<_>>();
let shared_data = Arc::new(SharedWorkerData::new(stealers, arenas));
let mut workers = Vec::with_capacity(n);
let local_worker = Worker::new(
shared_data.clone(),
0,
XorShiftRng::from_seed(rng.gen::<[u32; 4]>()),
poppers.next().unwrap(),
);
for (i, pop) in (0..n).zip(poppers) {
let worker = Worker::new(
shared_data.clone(),
i + 1,
XorShiftRng::from_seed(rng.gen::<[u32; 4]>()),
pop,
);
let builder = thread::Builder::new().name(format!("worker_{}", i));
let handle = try!(builder.spawn(move || worker_main(worker)));
workers.push(WorkerHandle {
thread: Some(handle),
});
}
let pool = Pool {
workers: workers,
local_worker: local_worker,
};
Ok(pool)
}
pub fn scope<'pool, 'new, F, R>(&'pool mut self, f: F) -> R
where F: 'new + FnOnce(&Spawner<'pool, 'new>) -> R,
R: 'new,
{
self.local_worker.scope(f)
}
pub fn recurse<'a, F>(&'a mut self, f: F)
where F: 'static + Send + FnOnce(&Spawner<'a, 'static>)
{
self.spawner().recurse(f);
}
pub fn submit<'a, F>(&'a mut self, f: F)
where F: 'static + Send + FnOnce()
{
self.spawner().submit(f);
}
pub fn spawner<'a>(&'a mut self) -> Spawner<'a, 'static> {
use std::ptr;
make_spawner(&self.local_worker, ptr::null_mut())
}
}
impl Drop for Pool {
fn drop(&mut self) {
self.local_worker.shared_data().notify_shutdown();
self.local_worker.clear();
for worker in &mut self.workers {
let handle = worker.thread.take().unwrap();
if handle.join().is_err() {
panic!("Propagating worker thread panic");
}
}
}
}
pub fn make_pool(n: usize) -> Result<Pool, Error> {
Pool::new(n)
}
#[cfg(test)]
fn pool_harness<F>(f: F) where F: Fn(&mut Pool) {
for i in 0..32 {
let mut pool = Pool::new(i).unwrap();
f(&mut pool);
}
}
#[cfg(test)]
mod tests {
use super::{pool_harness, Pool};
#[test]
fn creation_destruction() {
pool_harness(|_| {}); }
#[test]
fn split_work() {
pool_harness(|pool| {
let mut v = vec![0; 1024];
pool.scope(|spawner| {
for (idx, v) in v.iter_mut().enumerate() {
spawner.submit(move || {
*v += idx;
});
}
});
assert_eq!(v, (0..1024).collect::<Vec<_>>());
});
}
#[test]
fn multilevel_scoping() {
pool_harness(|pool| {
pool.scope(|spawner| {
let mut v = vec![0; 256];
spawner.scope(|s| {
for i in &mut v {
s.submit(move || *i += 1)
}
});
assert_eq!(v, vec![1; 256]);
}); });
}
#[test]
fn join() {
pool_harness(|pool| {
let (a, b) = (1, 2);
let (r_a, r_b) = pool.spawner().join(|_| a, |_| b);
assert_eq!(a, r_a);
assert_eq!(b, r_b);
});
}
#[test]
fn join_scoping() {
pool_harness(|pool| {
let mut v = vec![0; 256];
{
let (a, b) = v.split_at_mut(128);
pool.spawner().join(
|_| for i in a { *i += 1},
|_| for i in b { *i += 1},
);
}
assert_eq!(v, vec![1; 256])
});
}
#[test]
fn outlives_pool() {
let v = vec![0; 256];
pool_harness(|pool| {
let mut v = v.clone();
pool.submit(move || {
for i in &mut v {
*i += 1
}
});
});
}
#[test]
fn scope_return() {
pool_harness(|pool| {
let x = pool.scope(|_| 0);
assert_eq!(x, 0);
});
}
#[test]
#[should_panic]
fn job_panic() {
let mut pool = Pool::new(1).unwrap();
pool.submit(|| panic!("Eep!"));
}
#[test]
fn submit_large_jobs() {
pool_harness(|pool| {
let mut v = vec![0; 1024];
pool.scope(|scope| {
let v = &mut v;
let a = [0; 4096];
scope.submit(move || {
let _ = a;
for i in v.iter_mut() { *i += 1 }
});
});
assert_eq!(v, vec![1; 1024]);
});
}
}