use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use super::thread_pool_lifecycle::ThreadPoolLifecycle;
const LIFECYCLE_RUNNING: u8 = 0;
const LIFECYCLE_SHUTDOWN: u8 = 1;
const LIFECYCLE_STOPPING: u8 = 2;
fn encode_lifecycle(lifecycle: ThreadPoolLifecycle) -> u8 {
match lifecycle {
ThreadPoolLifecycle::Running => LIFECYCLE_RUNNING,
ThreadPoolLifecycle::Shutdown => LIFECYCLE_SHUTDOWN,
ThreadPoolLifecycle::Stopping => LIFECYCLE_STOPPING,
}
}
fn decode_lifecycle(tag: u8) -> ThreadPoolLifecycle {
match tag {
LIFECYCLE_RUNNING => ThreadPoolLifecycle::Running,
LIFECYCLE_SHUTDOWN => ThreadPoolLifecycle::Shutdown,
LIFECYCLE_STOPPING => ThreadPoolLifecycle::Stopping,
_ => panic!("invalid thread pool lifecycle tag: {tag}"),
}
}
pub(super) struct LifecycleStateMachine {
state: AtomicU8,
}
impl LifecycleStateMachine {
pub(super) fn new_running() -> Self {
Self {
state: AtomicU8::new(encode_lifecycle(ThreadPoolLifecycle::Running)),
}
}
pub(super) fn load(&self) -> ThreadPoolLifecycle {
decode_lifecycle(self.state.load(Ordering::Acquire))
}
pub(super) fn transition_running_to_shutdown(&self) -> bool {
self.state
.compare_exchange(
LIFECYCLE_RUNNING,
LIFECYCLE_SHUTDOWN,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
}
pub(super) fn transition_to_stopping(&self) -> bool {
loop {
let current = self.state.load(Ordering::Acquire);
if current == LIFECYCLE_STOPPING {
return false;
}
if self
.state
.compare_exchange(
current,
LIFECYCLE_STOPPING,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
return true;
}
}
}
}
pub(super) struct SubmissionAdmission {
open: AtomicBool,
}
impl SubmissionAdmission {
pub(super) fn new_open() -> Self {
Self {
open: AtomicBool::new(true),
}
}
pub(super) fn is_open(&self) -> bool {
self.open.load(Ordering::Acquire)
}
pub(super) fn close(&self) -> bool {
self.open
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
}
pub(super) enum SubmitEnterOutcome {
Entered,
Rejected { became_zero_after_rollback: bool },
}
pub(super) struct InflightSubmitCounter {
count: AtomicUsize,
}
impl InflightSubmitCounter {
pub(super) fn new() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
pub(super) fn load(&self) -> usize {
self.count.load(Ordering::Acquire)
}
pub(super) fn try_enter(&self, admission: &SubmissionAdmission) -> SubmitEnterOutcome {
if !admission.is_open() {
return SubmitEnterOutcome::Rejected {
became_zero_after_rollback: false,
};
}
self.count.fetch_add(1, Ordering::AcqRel);
if admission.is_open() {
return SubmitEnterOutcome::Entered;
}
let previous = self.count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(
previous > 0,
"thread pool inflight submit counter underflow"
);
SubmitEnterOutcome::Rejected {
became_zero_after_rollback: previous == 1,
}
}
pub(super) fn leave(&self) -> bool {
let previous = self.count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(
previous > 0,
"thread pool inflight submit counter underflow"
);
previous == 1
}
}