#![cfg_attr(test, deny(warnings))]
#![deny(missing_docs)]
extern crate variance;
extern crate crossbeam;
#[macro_use]
extern crate scopeguard;
use variance::InvariantLifetime as Id;
use crossbeam::sync::MsQueue;
use std::{thread, mem};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Condvar};
#[derive(Clone, Default)]
pub struct Pool {
wait: Arc<WaitGroup>,
inner: Arc<PoolInner>
}
impl Pool {
#[inline]
pub fn new(size: usize) -> Pool {
let pool = Pool::empty();
for _ in 0..size { pool.expand(); }
pool
}
#[inline]
pub fn with_thread_config(size: usize, thread_config: ThreadConfig) -> Pool {
let pool = Pool {
inner: Arc::new(PoolInner::with_thread_config(thread_config)),
..Pool::default()
};
for _ in 0..size { pool.expand(); }
pool
}
#[inline]
pub fn empty() -> Pool {
Pool::default()
}
#[inline]
pub fn workers(&self) -> usize {
self.wait.waiting()
}
#[inline]
pub fn spawn<F: FnOnce() + Send + 'static>(&self, job: F) {
Scope::forever(self.clone()).execute(job)
}
#[inline]
pub fn scoped<'scope, F, R>(&self, scheduler: F) -> R
where F: FnOnce(&Scope<'scope>) -> R {
Scope::forever(self.clone()).zoom(scheduler)
}
#[inline]
pub fn shutdown(&self) {
self.inner.queue.push(PoolMessage::Quit);
self.wait.join()
}
#[inline]
pub fn expand(&self) {
let pool = self.clone();
pool.wait.submit();
let thread_number = self.inner.thread_counter.fetch_add(1, Ordering::SeqCst);
let mut builder = thread::Builder::new();
if let Some(ref prefix) = self.inner.thread_config.prefix {
let name = format!("{}{}", prefix, thread_number);
builder = builder.name(name);
}
if let Some(stack_size) = self.inner.thread_config.stack_size {
builder = builder.stack_size(stack_size);
}
builder.spawn(move || pool.run_thread()).unwrap();
}
fn run_thread(self) {
let mut thread_sentinel = ThreadSentinel(Some(self.clone()));
loop {
match self.inner.queue.pop() {
PoolMessage::Quit => {
self.inner.queue.push(PoolMessage::Quit);
thread_sentinel.cancel();
break
},
PoolMessage::Task(job, wait) => {
let sentinel = Sentinel(self.clone(), Some(wait.clone()));
job.run();
sentinel.cancel();
}
}
}
}
}
struct PoolInner {
queue: MsQueue<PoolMessage>,
thread_config: ThreadConfig,
thread_counter: AtomicUsize
}
impl PoolInner {
fn with_thread_config(thread_config: ThreadConfig) -> Self {
PoolInner { thread_config: thread_config, ..Self::default() }
}
}
impl Default for PoolInner {
fn default() -> Self {
PoolInner {
queue: MsQueue::new(),
thread_config: ThreadConfig::default(),
thread_counter: AtomicUsize::new(1)
}
}
}
#[derive(Default)]
pub struct ThreadConfig {
prefix: Option<String>,
stack_size: Option<usize>,
}
impl ThreadConfig {
pub fn new() -> ThreadConfig {
ThreadConfig {
prefix: None,
stack_size: None,
}
}
pub fn prefix<S: Into<String>>(self, prefix: S) -> ThreadConfig {
ThreadConfig {
prefix: Some(prefix.into()),
..self
}
}
pub fn stack_size(self, stack_size: usize) -> ThreadConfig {
ThreadConfig {
stack_size: Some(stack_size),
..self
}
}
}
pub struct Scope<'scope> {
pool: Pool,
wait: Arc<WaitGroup>,
_scope: Id<'scope>
}
impl<'scope> Scope<'scope> {
#[inline]
pub fn forever(pool: Pool) -> Scope<'static> {
Scope {
pool: pool,
wait: Arc::new(WaitGroup::new()),
_scope: Id::default()
}
}
pub fn execute<F>(&self, job: F)
where F: FnOnce() + Send + 'scope {
self.wait.submit();
let task = unsafe {
mem::transmute::<Box<Task + Send + 'scope>,
Box<Task + Send + 'static>>(Box::new(job))
};
self.pool.inner.queue.push(PoolMessage::Task(task, self.wait.clone()));
}
pub fn recurse<F>(&self, job: F)
where F: FnOnce(&Self) + Send + 'scope {
let this = unsafe { self.clone() };
self.execute(move || job(&this));
}
pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R
where F: FnOnce(&Scope<'smaller>) -> R,
'scope: 'smaller {
let scope = unsafe { self.refine::<'smaller>() };
defer!(scope.join());
scheduler(&scope)
}
#[inline]
pub fn join(&self) {
self.wait.join()
}
#[inline]
unsafe fn clone(&self) -> Self {
Scope {
pool: self.pool.clone(),
wait: self.wait.clone(),
_scope: Id::default()
}
}
#[inline]
unsafe fn refine<'other>(&self) -> Scope<'other> where 'scope: 'other {
Scope {
pool: self.pool.clone(),
wait: Arc::new(WaitGroup::new()),
_scope: Id::default()
}
}
}
enum PoolMessage {
Quit,
Task(Box<Task + Send>, Arc<WaitGroup>)
}
pub struct WaitGroup {
pending: AtomicUsize,
poisoned: AtomicBool,
lock: Mutex<()>,
cond: Condvar
}
impl Default for WaitGroup {
fn default() -> Self {
WaitGroup {
pending: AtomicUsize::new(0),
poisoned: AtomicBool::new(false),
lock: Mutex::new(()),
cond: Condvar::new()
}
}
}
impl WaitGroup {
#[inline]
pub fn new() -> Self {
WaitGroup::default()
}
#[inline]
pub fn waiting(&self) -> usize {
self.pending.load(Ordering::SeqCst)
}
#[inline]
pub fn submit(&self) {
self.pending.fetch_add(1, Ordering::SeqCst);
}
#[inline]
pub fn complete(&self) {
let old = self.pending.fetch_sub(1, Ordering::SeqCst);
if old == 1 {
let _lock = self.lock.lock().unwrap();
self.cond.notify_all()
}
}
#[inline]
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst);
let old = self.pending.fetch_sub(1, Ordering::SeqCst);
if old == 1 {
let _lock = self.lock.lock().unwrap();
self.cond.notify_all()
}
}
#[inline]
pub fn join(&self) {
let mut lock = self.lock.lock().unwrap();
while self.pending.load(Ordering::SeqCst) > 0 {
lock = self.cond.wait(lock).unwrap();
}
if self.poisoned.load(Ordering::SeqCst) {
panic!("WaitGroup explicitly poisoned!")
}
}
}
struct Sentinel(Pool, Option<Arc<WaitGroup>>);
impl Sentinel {
fn cancel(mut self) {
self.1.take().map(|wait| wait.complete());
}
}
impl Drop for Sentinel {
fn drop(&mut self) {
self.1.take().map(|wait| wait.poison());
}
}
struct ThreadSentinel(Option<Pool>);
impl ThreadSentinel {
fn cancel(&mut self) {
self.0.take().map(|pool| {
pool.wait.complete();
});
}
}
impl Drop for ThreadSentinel {
fn drop(&mut self) {
self.0.take().map(|pool| {
pool.expand();
pool.wait.poison();
});
}
}
trait Task {
fn run(self: Box<Self>);
}
impl<F: FnOnce()> Task for F {
fn run(self: Box<Self>) { (*self)() }
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::thread::sleep;
use {Pool, Scope, ThreadConfig};
#[test]
fn test_simple_use() {
let pool = Pool::new(4);
let mut buf = [0, 0, 0, 0];
pool.scoped(|scope| {
for i in &mut buf {
scope.execute(move || *i += 1);
}
});
assert_eq!(&buf, &[1, 1, 1, 1]);
}
#[test]
fn test_zoom() {
let pool = Pool::new(4);
let mut outer = 0;
pool.scoped(|scope| {
let mut inner = 0;
scope.zoom(|scope2| scope2.execute(|| inner = 1));
assert_eq!(inner, 1);
outer = 1;
});
assert_eq!(outer, 1);
}
#[test]
fn test_recurse() {
let pool = Pool::new(12);
let mut buf = [0, 0, 0, 0];
pool.scoped(|next| {
next.recurse(|next| {
buf[0] = 1;
next.execute(|| {
buf[1] = 1;
});
});
});
assert_eq!(&buf, &[1, 1, 0, 0]);
}
#[test]
fn test_spawn_doesnt_hang() {
let pool = Pool::new(1);
pool.spawn(move || loop {});
}
#[test]
fn test_forever_zoom() {
let pool = Pool::new(16);
let forever = Scope::forever(pool.clone());
let ran = AtomicBool::new(false);
forever.zoom(|scope| scope.execute(|| ran.store(true, Ordering::SeqCst)));
assert!(ran.load(Ordering::SeqCst));
}
#[test]
fn test_shutdown() {
let pool = Pool::new(4);
pool.shutdown();
}
#[test]
#[should_panic]
fn test_scheduler_panic() {
let pool = Pool::new(4);
pool.scoped(|_| panic!());
}
#[test]
#[should_panic]
fn test_scoped_execute_panic() {
let pool = Pool::new(4);
pool.scoped(|scope| scope.execute(|| panic!()));
}
#[test]
#[should_panic]
fn test_pool_panic() {
let _pool = Pool::new(1);
panic!();
}
#[test]
#[should_panic]
fn test_zoomed_scoped_execute_panic() {
let pool = Pool::new(4);
pool.scoped(|scope| scope.zoom(|scope2| scope2.execute(|| panic!())));
}
#[test]
#[should_panic]
fn test_recurse_scheduler_panic() {
let pool = Pool::new(4);
pool.scoped(|scope| scope.recurse(|_| panic!()));
}
#[test]
#[should_panic]
fn test_recurse_execute_panic() {
let pool = Pool::new(4);
pool.scoped(|scope| scope.recurse(|scope2| scope2.execute(|| panic!())));
}
struct Canary<'a> {
drops: DropCounter<'a>,
expected: usize
}
#[derive(Clone)]
struct DropCounter<'a>(&'a AtomicUsize);
impl<'a> Drop for DropCounter<'a> {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
impl<'a> Drop for Canary<'a> {
fn drop(&mut self) {
let drops = self.drops.0.load(Ordering::SeqCst);
assert_eq!(drops, self.expected);
}
}
#[test]
#[should_panic]
fn test_scoped_panic_waits_for_all_tasks() {
let tasks = 50;
let panicking_task_fraction = 10;
let panicking_tasks = tasks / panicking_task_fraction;
let expected_drops = tasks + panicking_tasks;
let counter = Box::new(AtomicUsize::new(0));
let drops = DropCounter(&*counter);
let _canary = Canary {
drops: drops.clone(),
expected: expected_drops
};
let pool = Pool::new(12);
pool.scoped(|scope| {
for task in 0..tasks {
let drop_counter = drops.clone();
scope.execute(move || {
sleep(Duration::from_millis(10));
drop::<DropCounter>(drop_counter);
});
if task % panicking_task_fraction == 0 {
let drop_counter = drops.clone();
scope.execute(move || {
let _drops = drop_counter;
panic!();
});
}
}
});
}
#[test]
#[should_panic]
fn test_scheduler_panic_waits_for_tasks() {
let tasks = 50;
let counter = Box::new(AtomicUsize::new(0));
let drops = DropCounter(&*counter);
let _canary = Canary {
drops: drops.clone(),
expected: tasks
};
let pool = Pool::new(12);
pool.scoped(|scope| {
for _ in 0..tasks {
let drop_counter = drops.clone();
scope.execute(move || {
sleep(Duration::from_millis(25));
drop::<DropCounter>(drop_counter);
});
}
panic!();
});
}
#[test]
fn test_no_thread_config() {
let pool = Pool::new(1);
pool.scoped(|scope| {
scope.execute(|| {
assert!(::std::thread::current().name().is_none());
});
});
}
#[test]
fn test_with_thread_config() {
let config = ThreadConfig::new().prefix("pool-");
let pool = Pool::with_thread_config(1, config);
pool.scoped(|scope| {
scope.execute(|| {
assert_eq!(::std::thread::current().name().unwrap(), "pool-1");
});
});
}
}