use std::{
sync::Arc,
time::Duration,
};
use qubit_function::{
Callable,
Runnable,
};
use qubit_executor::task::spi::TaskEndpointPair;
use qubit_executor::{
TaskHandle,
TrackedTask,
};
use super::thread_pool_builder::ThreadPoolBuilder;
use super::thread_pool_inner::ThreadPoolInner;
use crate::{
ExecutorServiceBuilderError,
PoolJob,
ThreadPoolStats,
};
use qubit_executor::service::{
ExecutorService,
ExecutorServiceLifecycle,
StopReport,
SubmissionError,
};
pub struct ThreadPool {
inner: Arc<ThreadPoolInner>,
}
impl ThreadPool {
pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
Self { inner }
}
#[inline]
pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
Self::builder().pool_size(pool_size).build()
}
#[inline]
pub fn builder() -> ThreadPoolBuilder {
ThreadPoolBuilder::default()
}
#[inline]
pub fn queued_count(&self) -> usize {
self.inner.queued_count()
}
#[inline]
pub fn running_count(&self) -> usize {
self.inner.running_count()
}
#[inline]
pub fn live_worker_count(&self) -> usize {
self.inner.read_state(|state| state.live_workers)
}
#[inline]
pub fn core_pool_size(&self) -> usize {
self.inner.read_state(|state| state.core_pool_size)
}
#[inline]
pub fn maximum_pool_size(&self) -> usize {
self.inner.read_state(|state| state.maximum_pool_size)
}
#[inline]
pub fn stats(&self) -> ThreadPoolStats {
self.inner.stats()
}
#[inline]
pub fn submit_job(&self, job: PoolJob) -> Result<(), SubmissionError> {
self.inner.submit(job)
}
#[inline]
pub fn join(&self) {
self.inner.wait_until_idle();
}
#[inline]
pub fn prestart_core_thread(&self) -> Result<bool, SubmissionError> {
self.inner.prestart_core_thread()
}
#[inline]
pub fn prestart_all_core_threads(&self) -> Result<usize, SubmissionError> {
self.inner.prestart_all_core_threads()
}
pub fn set_core_pool_size(
&self,
core_pool_size: usize,
) -> Result<(), ExecutorServiceBuilderError> {
self.inner.set_core_pool_size(core_pool_size)
}
pub fn set_maximum_pool_size(
&self,
maximum_pool_size: usize,
) -> Result<(), ExecutorServiceBuilderError> {
self.inner.set_maximum_pool_size(maximum_pool_size)
}
pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ExecutorServiceBuilderError> {
self.inner.set_keep_alive(keep_alive)
}
pub fn allow_core_thread_timeout(&self, allow: bool) {
self.inner.allow_core_thread_timeout(allow);
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.inner.shutdown();
}
}
impl ExecutorService for ThreadPool {
type ResultHandle<R, E>
= TaskHandle<R, E>
where
R: Send + 'static,
E: Send + 'static;
type TrackedHandle<R, E>
= TrackedTask<R, E>
where
R: Send + 'static,
E: Send + 'static;
fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
where
T: Runnable<E> + Send + 'static,
E: Send + 'static,
{
self.inner.submit(PoolJob::detached(task))
}
fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (handle, completion) = TaskEndpointPair::new().into_parts();
let job = PoolJob::from_task(task, completion);
self.inner.submit(job)?;
Ok(handle)
}
fn submit_tracked_callable<C, R, E>(
&self,
task: C,
) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
where
C: Callable<R, E> + Send + 'static,
R: Send + 'static,
E: Send + 'static,
{
let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
let job = PoolJob::from_task(task, completion);
self.inner.submit(job)?;
Ok(handle)
}
#[inline]
fn shutdown(&self) {
self.inner.shutdown();
}
#[inline]
fn stop(&self) -> StopReport {
self.inner.stop()
}
#[inline]
fn lifecycle(&self) -> ExecutorServiceLifecycle {
self.inner.lifecycle()
}
#[inline]
fn is_not_running(&self) -> bool {
self.inner.is_not_running()
}
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
fn wait_termination(&self) {
self.inner.wait_for_termination();
}
}