use std::sync::atomic::{AtomicUsize, Ordering};
pub(super) struct AtomicCounters {
value: AtomicUsize,
}
#[derive(Copy, Clone)]
pub(super) struct Counters {
word: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub(super) struct JobsEventCounter(usize);
impl JobsEventCounter {
pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
#[inline]
pub(super) fn as_usize(self) -> usize {
self.0
}
#[inline]
pub(super) fn is_sleepy(self) -> bool {
(self.as_usize() & 1) == 0
}
#[inline]
pub(super) fn is_active(self) -> bool {
!self.is_sleepy()
}
}
#[cfg(target_pointer_width = "64")]
const THREADS_BITS: usize = 16;
#[cfg(target_pointer_width = "32")]
const THREADS_BITS: usize = 8;
#[allow(clippy::erasing_op)]
const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
#[allow(clippy::identity_op)]
const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
const JEC_SHIFT: usize = 2 * THREADS_BITS;
pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
const ONE_SLEEPING: usize = 1;
const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
const ONE_JEC: usize = 1 << JEC_SHIFT;
impl AtomicCounters {
#[inline]
pub(super) fn new() -> AtomicCounters {
AtomicCounters {
value: AtomicUsize::new(0),
}
}
#[inline]
pub(super) fn load(&self, ordering: Ordering) -> Counters {
Counters::new(self.value.load(ordering))
}
#[inline]
fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
self.value
.compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
.is_ok()
}
#[inline]
pub(super) fn add_inactive_thread(&self) {
self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
}
pub(super) fn increment_jobs_event_counter_if(
&self,
increment_when: impl Fn(JobsEventCounter) -> bool,
) -> Counters {
loop {
let old_value = self.load(Ordering::SeqCst);
if increment_when(old_value.jobs_counter()) {
let new_value = old_value.increment_jobs_counter();
if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
return new_value;
}
} else {
return old_value;
}
}
}
#[inline]
pub(super) fn sub_inactive_thread(&self) -> usize {
let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
debug_assert!(
old_value.inactive_threads() > 0,
"sub_inactive_thread: old_value {:?} has no inactive threads",
old_value,
);
debug_assert!(
old_value.sleeping_threads() <= old_value.inactive_threads(),
"sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
old_value,
old_value.sleeping_threads(),
old_value.inactive_threads(),
);
let sleeping_threads = old_value.sleeping_threads();
std::cmp::min(sleeping_threads, 2)
}
#[inline]
pub(super) fn sub_sleeping_thread(&self) {
let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
debug_assert!(
old_value.sleeping_threads() > 0,
"sub_sleeping_thread: old_value {:?} had no sleeping threads",
old_value,
);
debug_assert!(
old_value.sleeping_threads() <= old_value.inactive_threads(),
"sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
old_value,
old_value.sleeping_threads(),
old_value.inactive_threads(),
);
}
#[inline]
pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
debug_assert!(
old_value.inactive_threads() > 0,
"try_add_sleeping_thread: old_value {:?} has no inactive threads",
old_value,
);
debug_assert!(
old_value.sleeping_threads() < THREADS_MAX,
"try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
old_value,
);
let mut new_value = old_value;
new_value.word += ONE_SLEEPING;
self.try_exchange(old_value, new_value, Ordering::SeqCst)
}
}
#[inline]
fn select_thread(word: usize, shift: usize) -> usize {
((word >> shift) as usize) & THREADS_MAX
}
#[inline]
fn select_jec(word: usize) -> usize {
(word >> JEC_SHIFT) as usize
}
impl Counters {
#[inline]
fn new(word: usize) -> Counters {
Counters { word }
}
#[inline]
fn increment_jobs_counter(self) -> Counters {
Counters {
word: self.word.wrapping_add(ONE_JEC),
}
}
#[inline]
pub(super) fn jobs_counter(self) -> JobsEventCounter {
JobsEventCounter(select_jec(self.word))
}
#[inline]
pub(super) fn inactive_threads(self) -> usize {
select_thread(self.word, INACTIVE_SHIFT)
}
#[inline]
pub(super) fn awake_but_idle_threads(self) -> usize {
debug_assert!(
self.sleeping_threads() <= self.inactive_threads(),
"sleeping threads: {} > raw idle threads {}",
self.sleeping_threads(),
self.inactive_threads()
);
self.inactive_threads() - self.sleeping_threads()
}
#[inline]
pub(super) fn sleeping_threads(self) -> usize {
select_thread(self.word, SLEEPING_SHIFT)
}
}
impl std::fmt::Debug for Counters {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let word = format!("{:016x}", self.word);
fmt.debug_struct("Counters")
.field("word", &word)
.field("jobs", &self.jobs_counter().0)
.field("inactive", &self.inactive_threads())
.field("sleeping", &self.sleeping_threads())
.finish()
}
}