use std::{
sync::Arc,
thread,
time::Duration,
};
use super::thread_pool::ThreadPool;
use super::thread_pool_build_error::ThreadPoolBuildError;
use super::thread_pool_config::ThreadPoolConfig;
use super::thread_pool_inner::ThreadPoolInner;
const DEFAULT_THREAD_NAME_PREFIX: &str = "qubit-thread-pool";
const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(60);
#[derive(Debug, Clone)]
pub struct ThreadPoolBuilder {
core_pool_size: usize,
maximum_pool_size: usize,
queue_capacity: Option<usize>,
thread_name_prefix: String,
stack_size: Option<usize>,
keep_alive: Duration,
allow_core_thread_timeout: bool,
prestart_core_threads: bool,
}
impl ThreadPoolBuilder {
#[inline]
pub fn pool_size(mut self, pool_size: usize) -> Self {
self.core_pool_size = pool_size;
self.maximum_pool_size = pool_size;
self
}
#[inline]
pub fn core_pool_size(mut self, core_pool_size: usize) -> Self {
self.core_pool_size = core_pool_size;
self
}
#[inline]
pub fn maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
self.maximum_pool_size = maximum_pool_size;
self
}
#[inline]
pub fn queue_capacity(mut self, capacity: usize) -> Self {
self.queue_capacity = Some(capacity);
self
}
#[inline]
pub fn unbounded_queue(mut self) -> Self {
self.queue_capacity = None;
self
}
#[inline]
pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
self.thread_name_prefix = prefix.to_owned();
self
}
#[inline]
pub fn stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = Some(stack_size);
self
}
#[inline]
pub fn keep_alive(mut self, keep_alive: Duration) -> Self {
self.keep_alive = keep_alive;
self
}
#[inline]
pub fn allow_core_thread_timeout(mut self, allow: bool) -> Self {
self.allow_core_thread_timeout = allow;
self
}
#[inline]
pub fn prestart_core_threads(mut self) -> Self {
self.prestart_core_threads = true;
self
}
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
self.validate()?;
let prestart_core_threads = self.prestart_core_threads;
let inner = Arc::new(ThreadPoolInner::new(ThreadPoolConfig {
core_pool_size: self.core_pool_size,
maximum_pool_size: self.maximum_pool_size,
queue_capacity: self.queue_capacity,
thread_name_prefix: self.thread_name_prefix,
stack_size: self.stack_size,
keep_alive: self.keep_alive,
allow_core_thread_timeout: self.allow_core_thread_timeout,
}));
if prestart_core_threads {
inner
.prestart_all_core_threads()
.map_err(ThreadPoolBuildError::from_rejected_execution)?;
}
Ok(ThreadPool::from_inner(inner))
}
fn validate(&self) -> Result<(), ThreadPoolBuildError> {
if self.maximum_pool_size == 0 {
return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
}
if self.core_pool_size > self.maximum_pool_size {
return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
core_pool_size: self.core_pool_size,
maximum_pool_size: self.maximum_pool_size,
});
}
if self.queue_capacity == Some(0) {
return Err(ThreadPoolBuildError::ZeroQueueCapacity);
}
if self.stack_size == Some(0) {
return Err(ThreadPoolBuildError::ZeroStackSize);
}
if self.keep_alive.is_zero() {
return Err(ThreadPoolBuildError::ZeroKeepAlive);
}
Ok(())
}
}
impl Default for ThreadPoolBuilder {
fn default() -> Self {
let pool_size = default_pool_size();
Self {
core_pool_size: pool_size,
maximum_pool_size: pool_size,
queue_capacity: None,
thread_name_prefix: DEFAULT_THREAD_NAME_PREFIX.to_owned(),
stack_size: None,
keep_alive: DEFAULT_KEEP_ALIVE,
allow_core_thread_timeout: false,
prestart_core_threads: false,
}
}
}
fn default_pool_size() -> usize {
thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
}