use std::{
panic::{
AssertUnwindSafe,
catch_unwind,
},
sync::Arc,
};
use qubit_executor::service::ExecutorServiceLifecycle;
use qubit_lock::WaitTimeoutStatus;
use super::thread_pool_inner::ThreadPoolInner;
use super::thread_pool_state::ThreadPoolState;
use super::thread_pool_worker_runtime::ThreadPoolWorkerRuntime;
use crate::{
PoolJob,
ThreadPoolHooks,
};
pub(crate) struct ThreadPoolWorker;
impl ThreadPoolWorker {
pub(crate) fn run(
inner: Arc<ThreadPoolInner>,
worker_runtime: ThreadPoolWorkerRuntime,
initial_job: Option<PoolJob>,
) {
let worker_index = worker_runtime.worker_index();
inner.hooks().run_before_worker_start(worker_index);
let has_task_hooks = inner.hooks().has_task_hooks();
if let Some(job) = initial_job {
run_initial_job(&inner, job, has_task_hooks, worker_index);
}
loop {
let job = wait_for_job(&inner, &worker_runtime);
match job {
Some(job) => {
if has_task_hooks {
run_with_task_hooks(job, inner.hooks(), worker_index);
} else {
run_without_hooks(job);
}
finish_running_job(&inner);
}
None => {
inner.hooks().run_after_worker_stop(worker_index);
return;
}
}
}
}
}
fn run_initial_job(
inner: &ThreadPoolInner,
job: PoolJob,
has_task_hooks: bool,
worker_index: usize,
) {
if job.accept() {
if has_task_hooks {
run_with_task_hooks(job, inner.hooks(), worker_index);
} else {
run_without_hooks(job);
}
}
finish_running_job(inner);
}
fn run_without_hooks(job: PoolJob) {
let _ignored = catch_unwind(AssertUnwindSafe(|| job.run()));
}
fn run_with_task_hooks(job: PoolJob, hooks: &ThreadPoolHooks, worker_index: usize) {
hooks.run_before_task(worker_index);
let _ignored = catch_unwind(AssertUnwindSafe(|| job.run()));
hooks.run_after_task(worker_index);
}
fn wait_for_job(
inner: &ThreadPoolInner,
worker_runtime: &ThreadPoolWorkerRuntime,
) -> Option<PoolJob> {
let worker_index = worker_runtime.worker_index();
let mut state = inner.lock_state();
loop {
match state.lifecycle {
ExecutorServiceLifecycle::Running => {
if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_runtime) {
return Some(job);
}
if state.live_workers > state.maximum_pool_size && state.live_workers > 0 {
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
if state.worker_wait_is_timed() {
let keep_alive = state.keep_alive;
state.idle_workers += 1;
let (next_state, status) = state.wait_timeout(keep_alive);
state = next_state;
state.idle_workers = state
.idle_workers
.checked_sub(1)
.expect("thread pool idle worker counter underflow");
if status == WaitTimeoutStatus::TimedOut
&& state.queued_tasks == 0
&& state.idle_worker_can_retire()
{
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
} else {
state.idle_workers += 1;
state = state.wait();
state.idle_workers = state
.idle_workers
.checked_sub(1)
.expect("thread pool idle worker counter underflow");
}
}
ExecutorServiceLifecycle::ShuttingDown => {
if let Some(job) = inner.try_take_queued_job_locked(&mut state, worker_runtime) {
return Some(job);
}
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
ExecutorServiceLifecycle::Stopping | ExecutorServiceLifecycle::Terminated => {
unregister_exiting_worker(inner, &mut state, worker_index);
return None;
}
}
}
}
fn finish_running_job(inner: &ThreadPoolInner) {
let mut state = inner.lock_state();
state.running_tasks = state
.running_tasks
.checked_sub(1)
.expect("thread pool running task counter underflow");
state.completed_tasks += 1;
inner.notify_if_idle_or_terminated(&state);
}
fn unregister_exiting_worker(
inner: &ThreadPoolInner,
state: &mut ThreadPoolState,
worker_index: usize,
) {
state
.queue
.extend(inner.remove_worker_queue_locked(worker_index));
state.live_workers = state
.live_workers
.checked_sub(1)
.expect("thread pool live worker counter underflow");
inner.notify_if_terminated(state);
}