use std::{
collections::VecDeque,
time::Duration,
};
use qubit_executor::service::ExecutorServiceLifecycle;
use super::thread_pool_config::ThreadPoolConfig;
use crate::{
PoolJob,
ThreadPoolStats,
};
pub(crate) struct ThreadPoolState {
pub(super) lifecycle: ExecutorServiceLifecycle,
pub(super) queue: VecDeque<PoolJob>,
pub(super) queued_tasks: usize,
pub(super) queue_capacity: Option<usize>,
pub(super) running_tasks: usize,
pub(super) cancelling_tasks: usize,
pub(super) live_workers: usize,
pub(super) idle_workers: usize,
pub(super) submitted_tasks: usize,
pub(super) completed_tasks: usize,
pub(super) cancelled_tasks: usize,
pub(super) core_pool_size: usize,
pub(super) maximum_pool_size: usize,
pub(super) keep_alive: Duration,
pub(super) allow_core_thread_timeout: bool,
pub(super) next_worker_index: usize,
}
impl ThreadPoolState {
pub(super) fn new(config: ThreadPoolConfig) -> Self {
Self {
lifecycle: ExecutorServiceLifecycle::Running,
queue: VecDeque::new(),
queued_tasks: 0,
queue_capacity: config.queue_capacity,
running_tasks: 0,
cancelling_tasks: 0,
live_workers: 0,
idle_workers: 0,
submitted_tasks: 0,
completed_tasks: 0,
cancelled_tasks: 0,
core_pool_size: config.core_pool_size,
maximum_pool_size: config.maximum_pool_size,
keep_alive: config.keep_alive,
allow_core_thread_timeout: config.allow_core_thread_timeout,
next_worker_index: 0,
}
}
pub(super) fn is_saturated(&self) -> bool {
self.queue_capacity
.is_some_and(|capacity| self.queued_tasks >= capacity)
}
pub(super) fn is_terminated(&self) -> bool {
self.lifecycle != ExecutorServiceLifecycle::Running
&& self.queued_tasks == 0
&& self.running_tasks == 0
&& self.cancelling_tasks == 0
&& self.live_workers == 0
}
pub(super) fn is_idle(&self) -> bool {
self.queued_tasks == 0 && self.running_tasks == 0 && self.cancelling_tasks == 0
}
pub(super) fn worker_wait_is_timed(&self) -> bool {
self.allow_core_thread_timeout || self.live_workers > self.core_pool_size
}
pub(super) fn idle_worker_can_retire(&self) -> bool {
self.live_workers > self.maximum_pool_size
|| (self.worker_wait_is_timed()
&& (self.live_workers > self.core_pool_size || self.allow_core_thread_timeout))
}
pub(super) fn stats(&self) -> ThreadPoolStats {
ThreadPoolStats {
lifecycle: if self.is_terminated() {
ExecutorServiceLifecycle::Terminated
} else {
self.lifecycle
},
core_pool_size: self.core_pool_size,
maximum_pool_size: self.maximum_pool_size,
live_workers: self.live_workers,
idle_workers: self.idle_workers,
queued_tasks: self.queued_tasks,
running_tasks: self.running_tasks,
submitted_tasks: self.submitted_tasks,
completed_tasks: self.completed_tasks,
cancelled_tasks: self.cancelled_tasks,
terminated: self.is_terminated(),
}
}
}