use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
use orengine_utils::backoff::Backoff;
use orengine_utils::cache_padded::CachePaddedAtomicUsize;
use orengine_utils::hints::{likely, unlikely};
const ALL_SHIFT: usize = (usize::BITS / 2) as usize;
#[cfg(target_pointer_width = "64")]
type HalfUsize = u32;
#[cfg(target_pointer_width = "32")]
type HalfUsize = u16;
#[cfg(target_pointer_width = "16")]
type HalfUsize = u8;
#[derive(Copy, Clone)]
#[repr(C)]
pub(crate) struct NumberOfExecutorsInEpochData {
pub(crate) all: HalfUsize,
pub(crate) in_current_epoch: HalfUsize,
}
pub(crate) struct NumberOfExecutorsInEpoch {
data: CachePaddedAtomicUsize,
}
impl NumberOfExecutorsInEpoch {
pub(crate) const fn new() -> Self {
Self {
data: CachePaddedAtomicUsize::new(0),
}
}
const fn parse_data(data: usize) -> NumberOfExecutorsInEpochData {
NumberOfExecutorsInEpochData {
all: (data >> ALL_SHIFT) as HalfUsize,
in_current_epoch: data as HalfUsize,
}
}
const fn encode_data(data: NumberOfExecutorsInEpochData) -> usize {
((data.all as usize) << ALL_SHIFT) | data.in_current_epoch as usize
}
pub(crate) fn parsed_data_for_drop(&mut self) -> NumberOfExecutorsInEpochData {
let encoded_data = self.data.get_mut();
Self::parse_data(*encoded_data)
}
pub(crate) fn register_new_executor(&self) {
let backoff = Backoff::new();
loop {
let encoded_data = self.data.load(Relaxed);
let data = Self::parse_data(encoded_data);
if data.in_current_epoch == 0 && data.all > 0 {
backoff.snooze();
continue;
}
let new_data = NumberOfExecutorsInEpochData {
all: data.all + 1,
in_current_epoch: data.in_current_epoch + 1,
};
let new_encoded_data = Self::encode_data(new_data);
if self
.data
.compare_exchange_weak(encoded_data, new_encoded_data, AcqRel, Relaxed)
.is_ok()
{
return;
}
backoff.snooze();
}
}
#[must_use]
pub(crate) fn deregister_executor_and_decrement_counter(&self) -> bool {
let backoff = Backoff::new();
loop {
let encoded_data = self.data.load(Relaxed);
let data = Self::parse_data(encoded_data);
let mut new_in_current_epoch = data.in_current_epoch - 1;
let new_all = data.all - 1;
let should_update_epoch = new_in_current_epoch == 0;
if should_update_epoch {
new_in_current_epoch = new_all;
}
let new_data = NumberOfExecutorsInEpochData {
all: new_all,
in_current_epoch: new_in_current_epoch,
};
let new_encoded_data = Self::encode_data(new_data);
if self
.data
.compare_exchange_weak(encoded_data, new_encoded_data, AcqRel, Relaxed)
.is_ok()
{
return should_update_epoch;
}
backoff.snooze();
}
}
pub(crate) fn prepare_to_update_epoch(&self, all: HalfUsize) {
debug_assert_eq!(
Self::parse_data(self.data.load(Acquire)).in_current_epoch,
0
);
let new_data = NumberOfExecutorsInEpochData {
all,
in_current_epoch: all,
};
let new_encoded_data = Self::encode_data(new_data);
let prev = self.data.swap(new_encoded_data, AcqRel);
assert!(!unlikely(Self::parse_data(prev).in_current_epoch != 0),
"executor_passed_epoch was called in the current epoch more times \
than executors are registered"
);
}
#[must_use]
pub(crate) fn executor_passed_epoch(&self) -> bool {
let encoded_data = self.data.fetch_sub(1, AcqRel) - 1;
let data = Self::parse_data(encoded_data);
debug_assert!(data.in_current_epoch < data.all);
if likely(data.in_current_epoch > 0) {
return false;
}
self.prepare_to_update_epoch(data.all);
true
}
}