use core::fmt;
use crate::loomish::{
hint,
sync::atomic::{self, AtomicUsize},
};
#[derive(Debug)]
pub struct Schedule {
states: [AtomicState; 3],
}
impl Schedule {
pub fn new() -> Self {
Self {
states: [
AtomicState::new(State::EMPTY_PRIMARY),
AtomicState::new(State::DEAD),
AtomicState::new(State::DEAD),
],
}
}
pub fn register(&self) -> User {
loop {
for (index, phase) in self.states.iter().enumerate() {
let state = phase.load(atomic::Ordering::Relaxed);
if !state.is_primary() {
continue;
}
match phase.compare_exchange(
state,
state.add_user(),
atomic::Ordering::Acquire,
atomic::Ordering::Relaxed,
) {
Ok(_) => return User { index: index as u8 },
Err(_) => continue,
}
}
hint::spin_loop();
}
}
}
impl Default for Schedule {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct User {
index: u8,
}
impl User {
#[must_use]
pub const fn index(&self) -> usize {
self.index as usize
}
pub fn progress<'s>(&mut self, schedule: &'s Schedule) -> Option<Leaving<'s>> {
let curr = &schedule.states[self.index as usize];
let state = curr.load(atomic::Ordering::Acquire);
if state.is_primary() {
let prev = &schedule.states[(self.index as usize + 2) % 3];
let state = prev.load(atomic::Ordering::Acquire);
debug_assert_ne!(state, State::EMPTY_PRIMARY);
if !state.is_dead() {
return None;
}
let next = &schedule.states[(self.index as usize + 1) % 3];
let initialized = next
.compare_exchange(
State::DEAD,
State::EMPTY_PRIMARY.add_user(),
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
)
.is_ok();
if !initialized {
next.update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|state| state.add_user(),
);
}
if initialized {
curr.update(
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
|state| state.with_primary(false),
);
}
let leaving_index = self.index;
self.index = (self.index + 1) % 3;
Some(Leaving {
schedule,
index: leaving_index,
})
} else {
let next = &schedule.states[(self.index as usize + 1) % 3];
next.update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|state| state.add_user(),
);
let leaving_index = self.index;
self.index = (self.index + 1) % 3;
Some(Leaving {
schedule,
index: leaving_index,
})
}
}
pub fn deregister<'s>(self, schedule: &'s Schedule) -> Leaving<'s> {
let curr = &schedule.states[self.index as usize];
let state = curr.load(atomic::Ordering::Acquire);
if !state.is_primary() {
return Leaving {
schedule,
index: self.index,
};
}
let prev = &schedule.states[(self.index as usize + 2) % 3];
let state = prev.load(atomic::Ordering::Acquire);
debug_assert_ne!(state, State::EMPTY_PRIMARY);
if !state.is_dead() {
return Leaving {
schedule,
index: self.index,
};
}
let next = &schedule.states[(self.index as usize + 1) % 3];
let initialized = next
.compare_exchange(
State::DEAD,
State::EMPTY_PRIMARY,
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
)
.is_ok();
if initialized {
curr.update(
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
|state| state.with_primary(false),
);
}
Leaving {
schedule,
index: self.index,
}
}
}
pub struct Leaving<'s> {
schedule: &'s Schedule,
index: u8,
}
impl<'s> Leaving<'s> {
#[must_use]
pub const fn schedule(&self) -> &'s Schedule {
self.schedule
}
#[must_use]
pub const fn index(&self) -> usize {
self.index as usize
}
pub fn leave(self) {
let _ = self;
}
pub fn leave_last(self) -> Option<LastLeaving<'s>> {
let (schedule, index) = (self.schedule, self.index);
core::mem::forget(self);
let state = &schedule.states[index as usize];
match state.try_update(
atomic::Ordering::Release,
atomic::Ordering::Acquire,
|state| Some(state.del_user()).filter(|s| !s.is_dead()),
) {
Ok(_) => None,
Err(_) => Some(LastLeaving { schedule, index }),
}
}
}
impl Drop for Leaving<'_> {
fn drop(&mut self) {
let state = &self.schedule.states[self.index as usize];
state.update(
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
|state| state.del_user(),
);
}
}
pub struct LastLeaving<'s> {
schedule: &'s Schedule,
index: u8,
}
impl<'s> LastLeaving<'s> {
#[must_use]
pub const fn schedule(&self) -> &'s Schedule {
self.schedule
}
#[must_use]
pub const fn index(&self) -> usize {
self.index as usize
}
}
impl Drop for LastLeaving<'_> {
fn drop(&mut self) {
let state = &self.schedule.states[self.index as usize];
state.store(State::DEAD, atomic::Ordering::Release);
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
#[repr(transparent)]
struct State(usize);
impl State {
pub const DEAD: Self = Self(0);
pub const EMPTY_PRIMARY: Self = Self(1);
#[must_use]
pub const fn is_primary(&self) -> bool {
self.0 & 1 != 0
}
#[must_use]
pub const fn is_dead(&self) -> bool {
self.0 == 0
}
#[must_use]
pub const fn users(&self) -> usize {
self.0 / 2
}
#[must_use]
pub const fn with_primary(self, primary: bool) -> Self {
Self(self.0 & !1 | primary as usize)
}
#[must_use]
pub const fn add_user(self) -> Self {
Self(self.0.strict_add(2))
}
#[must_use]
pub const fn del_user(self) -> Self {
Self(self.0.strict_sub(2))
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("State")
.field("primary", &self.is_primary())
.field("users", &self.users())
.finish()
}
}
#[repr(transparent)]
struct AtomicState(AtomicUsize);
impl AtomicState {
pub fn new(state: State) -> Self {
Self(AtomicUsize::new(state.0))
}
pub fn load(&self, order: atomic::Ordering) -> State {
State(self.0.load(order))
}
pub fn store(&self, val: State, order: atomic::Ordering) {
self.0.store(val.0, order);
}
pub fn compare_exchange(
&self,
current: State,
new: State,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> Result<State, State> {
match self.0.compare_exchange(current.0, new.0, success, failure) {
Ok(state) => Ok(State(state)),
Err(state) => Err(State(state)),
}
}
pub fn compare_exchange_weak(
&self,
current: State,
new: State,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> Result<State, State> {
match self
.0
.compare_exchange_weak(current.0, new.0, success, failure)
{
Ok(state) => Ok(State(state)),
Err(state) => Err(State(state)),
}
}
pub fn try_update<F>(
&self,
set_order: atomic::Ordering,
fetch_order: atomic::Ordering,
mut f: F,
) -> Result<State, State>
where
F: FnMut(State) -> Option<State>,
{
let mut curr = self.load(fetch_order);
loop {
let Some(next) = (f)(curr) else {
break Err(curr);
};
match self.compare_exchange_weak(curr, next, set_order, fetch_order) {
Ok(curr) => break Ok(curr),
Err(actual) => curr = actual,
}
}
}
pub fn update<F>(
&self,
set_order: atomic::Ordering,
fetch_order: atomic::Ordering,
mut f: F,
) -> State
where
F: FnMut(State) -> State,
{
let mut curr = self.load(fetch_order);
loop {
let next = (f)(curr);
match self.compare_exchange_weak(curr, next, set_order, fetch_order) {
Ok(curr) => break curr,
Err(actual) => curr = actual,
}
}
}
}
impl fmt::Debug for AtomicState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.load(atomic::Ordering::Relaxed).fmt(f)
}
}