use callback::Callback;
use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use pool::{Pool, MAX_BACKUP};
use shutdown::ShutdownTrigger;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
use std::any::Any;
use std::cmp::max;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use crossbeam_deque::Injector;
use num_cpus;
use tokio_executor::park::Park;
use tokio_executor::Enter;
pub struct Builder {
config: Config,
pool_size: usize,
max_blocking: usize,
new_park: Box<dyn Fn(&WorkerId) -> BoxPark>,
}
impl Builder {
pub fn new() -> Builder {
let num_cpus = max(1, num_cpus::get());
let new_park =
Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
Builder {
pool_size: num_cpus,
max_blocking: 100,
config: Config {
keep_alive: None,
name_prefix: None,
stack_size: None,
around_worker: None,
after_start: None,
before_stop: None,
panic_handler: None,
},
new_park,
}
}
pub fn pool_size(&mut self, val: usize) -> &mut Self {
assert!(val >= 1, "at least one thread required");
assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS);
self.pool_size = val;
self
}
pub fn max_blocking(&mut self, val: usize) -> &mut Self {
assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
self.max_blocking = val;
self
}
pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
self.config.keep_alive = val;
self
}
pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
where
F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
{
self.config.panic_handler = Some(Arc::new(f));
self
}
pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
self.config.name_prefix = Some(val.into());
self
}
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.config.stack_size = Some(val);
self
}
pub fn around_worker<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
{
self.config.around_worker = Some(Callback::new(f));
self
}
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.config.after_start = Some(Arc::new(f));
self
}
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.config.before_stop = Some(Arc::new(f));
self
}
pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
where
F: Fn(&WorkerId) -> P + 'static,
P: Park + Send + 'static,
P::Error: Error,
{
self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
self
}
pub fn build(&self) -> ThreadPool {
trace!("build; num-workers={}", self.pool_size);
let workers: Arc<[worker::Entry]> = {
let mut workers = vec![];
for i in 0..self.pool_size {
let id = WorkerId::new(i);
let park = (self.new_park)(&id);
let unpark = park.unpark();
workers.push(worker::Entry::new(park, unpark));
}
workers.into()
};
let queue = Arc::new(Injector::new());
let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
let pool = Arc::new(Pool::new(
workers,
Arc::downgrade(&trigger),
self.max_blocking,
self.config.clone(),
queue,
));
ThreadPool::new2(pool, trigger)
}
}
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Builder")
.field("config", &self.config)
.field("pool_size", &self.pool_size)
.field("new_park", &"Box<Fn() -> BoxPark>")
.finish()
}
}