use std::{
sync::{
Arc,
Mutex,
atomic::{
AtomicUsize,
Ordering,
},
},
thread,
time::Duration,
};
use crossbeam_deque::{
Injector,
Steal,
};
use qubit_lock::{
Monitor,
MonitorGuard,
WaitTimeoutStatus,
};
use super::pool_job::PoolJob;
use super::thread_pool_build_error::ThreadPoolBuildError;
use super::thread_pool_config::ThreadPoolConfig;
use super::thread_pool_lifecycle::ThreadPoolLifecycle;
use super::thread_pool_state::ThreadPoolState;
use super::thread_pool_stats::ThreadPoolStats;
use qubit_executor::service::{
RejectedExecution,
ShutdownReport,
};
struct WorkerQueue {
worker_index: usize,
jobs: Injector<PoolJob>,
}
impl WorkerQueue {
fn new(worker_index: usize) -> Self {
Self {
worker_index,
jobs: Injector::new(),
}
}
#[inline]
fn worker_index(&self) -> usize {
self.worker_index
}
fn push_back(&self, job: PoolJob) {
self.jobs.push(job);
}
fn pop_front(&self) -> Option<PoolJob> {
self.steal_one()
}
fn steal_back(&self) -> Option<PoolJob> {
self.steal_one()
}
fn drain(&self) -> Vec<PoolJob> {
let mut jobs = Vec::new();
while let Some(job) = self.steal_one() {
jobs.push(job);
}
jobs
}
fn steal_one(&self) -> Option<PoolJob> {
loop {
match self.jobs.steal() {
Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => continue,
}
}
}
}
pub(crate) struct ThreadPoolInner {
state_monitor: Monitor<ThreadPoolState>,
worker_queues: Mutex<Vec<Arc<WorkerQueue>>>,
next_enqueue_worker: AtomicUsize,
thread_name_prefix: String,
stack_size: Option<usize>,
}
impl ThreadPoolInner {
pub(crate) fn new(config: ThreadPoolConfig) -> Self {
let mut config = config;
let thread_name_prefix = std::mem::take(&mut config.thread_name_prefix);
let stack_size = config.stack_size;
Self {
state_monitor: Monitor::new(ThreadPoolState::new(config)),
worker_queues: Mutex::new(Vec::new()),
next_enqueue_worker: AtomicUsize::new(0),
thread_name_prefix,
stack_size,
}
}
#[inline]
pub(crate) fn lock_state(&self) -> MonitorGuard<'_, ThreadPoolState> {
self.state_monitor.lock()
}
#[inline]
pub(crate) fn read_state<R, F>(&self, f: F) -> R
where
F: FnOnce(&ThreadPoolState) -> R,
{
self.state_monitor.read(f)
}
#[inline]
pub(crate) fn write_state<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut ThreadPoolState) -> R,
{
self.state_monitor.write(f)
}
pub(crate) fn submit(self: &Arc<Self>, job: PoolJob) -> Result<(), RejectedExecution> {
let mut state = self.lock_state();
if !state.lifecycle.is_running() {
return Err(RejectedExecution::Shutdown);
}
if state.live_workers < state.core_pool_size {
self.spawn_worker_locked(&mut state, Some(job))?;
state.submitted_tasks += 1;
return Ok(());
}
if !state.is_saturated() {
state.submitted_tasks += 1;
state.queued_tasks += 1;
let should_wake_one_idle_worker = state.idle_workers > 0;
let use_local_queue =
state.queue_capacity.is_some() && state.idle_workers == 0 && state.live_workers > 0;
if !use_local_queue {
state.queue.push_back(job);
} else {
let mut pending_job = Some(job);
self.try_enqueue_worker_job_locked(&mut pending_job);
if let Some(job) = pending_job {
state.queue.push_back(job);
}
}
if state.live_workers == 0
&& let Err(error) = self.spawn_worker_locked(&mut state, None)
{
if let Some(job) = state.queue.pop_back() {
state.submitted_tasks = state
.submitted_tasks
.checked_sub(1)
.expect("thread pool submitted task counter underflow");
state.queued_tasks = state
.queued_tasks
.checked_sub(1)
.expect("thread pool queued task counter underflow");
drop(state);
job.cancel();
}
return Err(error);
}
drop(state);
if should_wake_one_idle_worker {
self.state_monitor.notify_one();
}
return Ok(());
}
if state.live_workers < state.maximum_pool_size {
self.spawn_worker_locked(&mut state, Some(job))?;
state.submitted_tasks += 1;
Ok(())
} else {
Err(RejectedExecution::Saturated)
}
}
fn try_enqueue_worker_job_locked(&self, job: &mut Option<PoolJob>) -> bool {
let queues = self
.worker_queues
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if queues.is_empty() {
return false;
}
let start = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed);
let slot = start % queues.len();
if let Some(job) = job.take() {
queues[slot].push_back(job);
true
} else {
false
}
}
pub(crate) fn prestart_core_thread(self: &Arc<Self>) -> Result<bool, RejectedExecution> {
let mut state = self.lock_state();
if !state.lifecycle.is_running() {
return Err(RejectedExecution::Shutdown);
}
if state.live_workers >= state.core_pool_size {
return Ok(false);
}
self.spawn_worker_locked(&mut state, None)?;
Ok(true)
}
pub(crate) fn prestart_all_core_threads(self: &Arc<Self>) -> Result<usize, RejectedExecution> {
let mut started = 0;
while self.prestart_core_thread()? {
started += 1;
}
Ok(started)
}
fn spawn_worker_locked(
self: &Arc<Self>,
state: &mut ThreadPoolState,
first_task: Option<PoolJob>,
) -> Result<(), RejectedExecution> {
let index = state.next_worker_index;
state.next_worker_index += 1;
state.live_workers += 1;
if first_task.is_some() {
state.running_tasks += 1;
}
let worker_queue = self.register_worker_queue_locked(index);
let worker_inner = Arc::clone(self);
let mut builder =
thread::Builder::new().name(format!("{}-{index}", self.thread_name_prefix));
if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}
match builder.spawn(move || run_worker(worker_inner, worker_queue, first_task)) {
Ok(_) => Ok(()),
Err(source) => {
self.remove_worker_queue_locked(index);
state.live_workers = state
.live_workers
.checked_sub(1)
.expect("thread pool live worker counter underflow");
if state.running_tasks > 0 {
state.running_tasks -= 1;
}
self.notify_if_terminated(state);
Err(RejectedExecution::WorkerSpawnFailed {
source: Arc::new(source),
})
}
}
}
fn register_worker_queue_locked(&self, worker_index: usize) -> Arc<WorkerQueue> {
let queue = Arc::new(WorkerQueue::new(worker_index));
self.worker_queues
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(Arc::clone(&queue));
queue
}
fn remove_worker_queue_locked(&self, worker_index: usize) -> Vec<PoolJob> {
let queue = {
let mut queues = self
.worker_queues
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
queues
.iter()
.position(|queue| queue.worker_index() == worker_index)
.map(|position| queues.remove(position))
};
queue.map_or_else(Vec::new, |queue| queue.drain())
}
fn try_take_queued_job_locked(
&self,
state: &mut ThreadPoolState,
worker_queue: &WorkerQueue,
) -> Option<PoolJob> {
if state.queue_capacity.is_some() {
let worker_index = worker_queue.worker_index();
let own_job = worker_queue.pop_front();
if let Some(job) = own_job {
state.queued_tasks = state
.queued_tasks
.checked_sub(1)
.expect("thread pool queued task counter underflow");
state.running_tasks += 1;
return Some(job);
}
if let Some(job) = self.try_steal_job_locked(worker_index) {
state.queued_tasks = state
.queued_tasks
.checked_sub(1)
.expect("thread pool queued task counter underflow");
state.running_tasks += 1;
return Some(job);
}
}
if let Some(job) = state.queue.pop_front() {
state.queued_tasks = state
.queued_tasks
.checked_sub(1)
.expect("thread pool queued task counter underflow");
state.running_tasks += 1;
return Some(job);
}
None
}
fn try_steal_job_locked(&self, worker_index: usize) -> Option<PoolJob> {
let queues = self
.worker_queues
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let queue_count = queues.len();
if queue_count <= 1 {
return None;
}
let start = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
for offset in 0..queue_count {
let victim = &queues[(start + offset) % queue_count];
if victim.worker_index() == worker_index {
continue;
}
if let Some(job) = victim.steal_back() {
return Some(job);
}
}
None
}
fn drain_all_worker_queued_jobs_locked(&self) -> Vec<PoolJob> {
let queues = self
.worker_queues
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.cloned()
.collect::<Vec<_>>();
let mut jobs = Vec::new();
for queue in queues {
jobs.extend(queue.drain());
}
jobs
}
pub(crate) fn shutdown(&self) {
let mut state = self.lock_state();
if state.lifecycle.is_running() {
state.lifecycle = ThreadPoolLifecycle::Shutdown;
}
self.state_monitor.notify_all();
self.notify_if_terminated(&state);
}
pub(crate) fn shutdown_now(&self) -> ShutdownReport {
let (jobs, report) = {
let mut state = self.lock_state();
if state.lifecycle.is_running() || state.lifecycle.is_shutdown() {
state.lifecycle = ThreadPoolLifecycle::Stopping;
}
let queued = state.queued_tasks;
let running = state.running_tasks;
let mut jobs = state.queue.drain(..).collect::<Vec<_>>();
jobs.extend(self.drain_all_worker_queued_jobs_locked());
debug_assert_eq!(jobs.len(), queued);
state.queued_tasks = 0;
state.cancelled_tasks += queued;
self.state_monitor.notify_all();
self.notify_if_terminated(&state);
(jobs, ShutdownReport::new(queued, running, queued))
};
for job in jobs {
job.cancel();
}
report
}
pub(crate) fn is_shutdown(&self) -> bool {
self.read_state(|state| !state.lifecycle.is_running())
}
pub(crate) fn is_terminated(&self) -> bool {
self.read_state(ThreadPoolState::is_terminated)
}
pub(crate) fn wait_for_termination(&self) {
self.state_monitor
.wait_until(|state| state.is_terminated(), |_| ());
}
pub(crate) fn stats(&self) -> ThreadPoolStats {
self.read_state(ThreadPoolStats::new)
}
pub(crate) fn set_core_pool_size(
self: &Arc<Self>,
core_pool_size: usize,
) -> Result<(), ThreadPoolBuildError> {
let err = self.write_state(|state| {
if core_pool_size > state.maximum_pool_size {
Some(state.maximum_pool_size)
} else {
state.core_pool_size = core_pool_size;
None
}
});
if let Some(maximum_pool_size) = err {
return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
core_pool_size,
maximum_pool_size,
});
}
self.state_monitor.notify_all();
Ok(())
}
pub(crate) fn set_maximum_pool_size(
self: &Arc<Self>,
maximum_pool_size: usize,
) -> Result<(), ThreadPoolBuildError> {
if maximum_pool_size == 0 {
return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
}
let exceeds = self.write_state(|state| {
if state.core_pool_size > maximum_pool_size {
Some(state.core_pool_size)
} else {
state.maximum_pool_size = maximum_pool_size;
None
}
});
if let Some(core_pool_size) = exceeds {
return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
core_pool_size,
maximum_pool_size,
});
}
self.state_monitor.notify_all();
Ok(())
}
pub(crate) fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
if keep_alive.is_zero() {
return Err(ThreadPoolBuildError::ZeroKeepAlive);
}
self.write_state(|state| state.keep_alive = keep_alive);
self.state_monitor.notify_all();
Ok(())
}
pub(crate) fn allow_core_thread_timeout(&self, allow: bool) {
self.write_state(|state| state.allow_core_thread_timeout = allow);
self.state_monitor.notify_all();
}
fn notify_if_terminated(&self, state: &ThreadPoolState) {
if state.is_terminated() {
self.state_monitor.notify_all();
}
}
}
fn run_worker(
inner: Arc<ThreadPoolInner>,
worker_queue: Arc<WorkerQueue>,
first_task: Option<PoolJob>,
) {
if let Some(job) = first_task {
job.run();
finish_running_job(&inner);
}
loop {
let job = wait_for_job(&inner, &worker_queue);
match job {
Some(job) => {
job.run();
finish_running_job(&inner);
}
None => return,
}
}
}
fn wait_for_job(inner: &ThreadPoolInner, worker_queue: &WorkerQueue) -> Option<PoolJob> {
let worker_index = worker_queue.worker_index();
let mut state = inner.lock_state();
loop {
match state.lifecycle {
ThreadPoolLifecycle::Running => {
if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_queue) {
return Some(job);
}
if state.live_workers > state.maximum_pool_size && state.live_workers > 0 {
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
if state.worker_wait_is_timed() {
let keep_alive = state.keep_alive;
state.idle_workers += 1;
let (next_state, status) = state.wait_timeout(keep_alive);
state = next_state;
state.idle_workers = state
.idle_workers
.checked_sub(1)
.expect("thread pool idle worker counter underflow");
if status == WaitTimeoutStatus::TimedOut
&& state.queued_tasks == 0
&& state.idle_worker_can_retire()
{
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
} else {
state.idle_workers += 1;
state = state.wait();
state.idle_workers = state
.idle_workers
.checked_sub(1)
.expect("thread pool idle worker counter underflow");
}
}
ThreadPoolLifecycle::Shutdown => {
if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_queue) {
return Some(job);
}
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
ThreadPoolLifecycle::Stopping => {
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
}
}
}
fn finish_running_job(inner: &ThreadPoolInner) {
let mut state = inner.lock_state();
state.running_tasks = state
.running_tasks
.checked_sub(1)
.expect("thread pool running task counter underflow");
state.completed_tasks += 1;
inner.notify_if_terminated(&state);
}
fn unregister_exiting_worker(
inner: &ThreadPoolInner,
state: &mut ThreadPoolState,
worker_index: usize,
) {
let requeued_jobs = inner.remove_worker_queue_locked(worker_index);
for job in requeued_jobs {
state.queue.push_back(job);
}
state.live_workers = state
.live_workers
.checked_sub(1)
.expect("thread pool live worker counter underflow");
inner.notify_if_terminated(state);
}