use std::{
future::Future,
pin::Pin,
sync::Arc,
time::Duration,
};
use qubit_function::Callable;
use qubit_executor::{
TaskCompletionPair,
TaskHandle,
TaskRunner,
};
use super::pool_job::PoolJob;
use super::thread_pool_build_error::ThreadPoolBuildError;
use super::thread_pool_builder::ThreadPoolBuilder;
use super::thread_pool_inner::ThreadPoolInner;
use super::thread_pool_stats::ThreadPoolStats;
use qubit_executor::service::{
ExecutorService,
RejectedExecution,
ShutdownReport,
};
pub struct ThreadPool {
inner: Arc<ThreadPoolInner>,
}
impl ThreadPool {
pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
Self { inner }
}
#[inline]
pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
Self::builder().pool_size(pool_size).build()
}
#[inline]
pub fn builder() -> ThreadPoolBuilder {
ThreadPoolBuilder::default()
}
#[inline]
pub fn queued_count(&self) -> usize {
self.inner.read_state(|state| state.queued_tasks)
}
#[inline]
pub fn running_count(&self) -> usize {
self.inner.read_state(|state| state.running_tasks)
}
#[inline]
pub fn live_worker_count(&self) -> usize {
self.inner.read_state(|state| state.live_workers)
}
#[inline]
pub fn core_pool_size(&self) -> usize {
self.inner.read_state(|state| state.core_pool_size)
}
#[inline]
pub fn maximum_pool_size(&self) -> usize {
self.inner.read_state(|state| state.maximum_pool_size)
}
#[inline]
pub fn stats(&self) -> ThreadPoolStats {
self.inner.stats()
}
#[inline]
pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
self.inner.prestart_core_thread()
}
#[inline]
pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
self.inner.prestart_all_core_threads()
}
pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
self.inner.set_core_pool_size(core_pool_size)
}
pub fn set_maximum_pool_size(
&self,
maximum_pool_size: usize,
) -> Result<(), ThreadPoolBuildError> {
self.inner.set_maximum_pool_size(maximum_pool_size)
}
pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
self.inner.set_keep_alive(keep_alive)
}
pub fn allow_core_thread_timeout(&self, allow: bool) {
self.inner.allow_core_thread_timeout(allow);
}
pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
self.inner.submit(job)
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.inner.shutdown();
}
}
impl ExecutorService for ThreadPool {
type Handle<R, E>
= TaskHandle<R, E>
where
R: Send + 'static,
E: Send + 'static;
type Termination<'a>
= Pin<Box<dyn Future<Output = ()> + Send + 'a>>
where
Self: 'a;
fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (handle, completion) = TaskCompletionPair::new().into_parts();
let completion_for_run = completion.clone();
let job = PoolJob::new(
Box::new(move || {
TaskRunner::new(task).run(completion_for_run);
}),
Box::new(move || {
completion.cancel();
}),
);
self.inner.submit(job)?;
Ok(handle)
}
#[inline]
fn shutdown(&self) {
self.inner.shutdown();
}
#[inline]
fn shutdown_now(&self) -> ShutdownReport {
self.inner.shutdown_now()
}
#[inline]
fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
fn await_termination(&self) -> Self::Termination<'_> {
Box::pin(async move {
self.inner.wait_for_termination();
})
}
}