use latch::{LockLatch, SpinLatch};
#[allow(unused_imports)]
use log::Event::*;
use job::StackJob;
use std::sync::Arc;
use std::error::Error;
use std::fmt;
use thread_pool::{self, Registry, WorkerThread};
use std::mem;
use unwind;
#[derive(Debug,PartialEq)]
pub enum InitError {
NumberOfThreadsZero,
GlobalPoolAlreadyInitialized,
}
impl fmt::Display for InitError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
InitError::NumberOfThreadsZero =>
write!(f, "The number of threads was set to zero but must be greater than zero."),
InitError::GlobalPoolAlreadyInitialized =>
write!(f, "The gobal thread pool has already been initialized with a different \
configuration. Only one valid configuration is allowed.")
}
}
}
impl Error for InitError {
fn description(&self) -> &str {
match *self {
InitError::NumberOfThreadsZero =>
"number of threads set to zero",
InitError::GlobalPoolAlreadyInitialized =>
"global thread pool has already been initialized"
}
}
}
#[derive(Clone, Debug)]
pub struct Configuration {
num_threads: Option<usize>
}
impl Configuration {
pub fn new() -> Configuration {
Configuration { num_threads: None }
}
pub fn num_threads(&self) -> Option<usize> {
self.num_threads
}
pub fn set_num_threads(mut self, num_threads: usize) -> Configuration {
self.num_threads = Some(num_threads);
self
}
fn validate(&self) -> Result<(), InitError> {
if let Some(value) = self.num_threads {
if value == 0 {
return Err(InitError::NumberOfThreadsZero);
}
}
Ok(())
}
}
pub fn initialize(config: Configuration) -> Result<(), InitError> {
try!(config.validate());
let num_threads = config.num_threads;
let registry = thread_pool::get_registry_with_config(config);
if let Some(value) = num_threads {
if value != registry.num_threads() {
return Err(InitError::GlobalPoolAlreadyInitialized);
}
}
registry.wait_until_primed();
Ok(())
}
pub fn dump_stats() {
dump_stats!();
}
pub fn join<A,B,RA,RB>(oper_a: A,
oper_b: B)
-> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
return join_inject(oper_a, oper_b);
}
log!(Join { worker: (*worker_thread).index() });
let job_b = StackJob::new(oper_b, SpinLatch::new());
(*worker_thread).push(job_b.as_job_ref());
let result_a;
{
let guard = unwind::finally(&job_b.latch, |job_b_latch| {
if (*WorkerThread::current()).pop().is_none() {
job_b_latch.spin();
}
});
result_a = oper_a();
mem::forget(guard);
}
let result_b;
if (*worker_thread).pop().is_some() {
log!(PoppedJob { worker: (*worker_thread).index() });
result_b = job_b.run_inline(); } else {
log!(LostJob { worker: (*worker_thread).index() });
(*worker_thread).steal_until(&job_b.latch); result_b = job_b.into_result();
}
(result_a, result_b)
}
}
#[cold] unsafe fn join_inject<A,B,RA,RB>(oper_a: A,
oper_b: B)
-> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
let job_a = StackJob::new(oper_a, LockLatch::new());
let job_b = StackJob::new(oper_b, LockLatch::new());
thread_pool::get_registry().inject(&[job_a.as_job_ref(), job_b.as_job_ref()]);
job_a.latch.wait();
job_b.latch.wait();
(job_a.into_result(), job_b.into_result())
}
pub struct ThreadPool {
registry: Arc<Registry>
}
impl ThreadPool {
pub fn new(configuration: Configuration) -> Result<ThreadPool,InitError> {
try!(configuration.validate());
Ok(ThreadPool {
registry: Registry::new(configuration.num_threads)
})
}
pub fn install<OP,R>(&self, op: OP) -> R
where OP: FnOnce() -> R + Send
{
unsafe {
let job_a = StackJob::new(op, LockLatch::new());
self.registry.inject(&[job_a.as_job_ref()]);
job_a.latch.wait();
job_a.into_result()
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
}
}