use crate::loom::sync::atomic::AtomicUsize;
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::usize;
pub(super) struct State {
val: AtomicUsize,
}
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize);
type UpdateResult = Result<Snapshot, Snapshot>;
const RUNNING: usize = 0b0001;
const COMPLETE: usize = 0b0010;
const LIFECYCLE_MASK: usize = 0b11;
const NOTIFIED: usize = 0b100;
#[allow(clippy::unusual_byte_groupings)] const JOIN_INTEREST: usize = 0b1_000;
#[allow(clippy::unusual_byte_groupings)] const JOIN_WAKER: usize = 0b10_000;
#[allow(clippy::unusual_byte_groupings)] const CANCELLED: usize = 0b100_000;
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
const REF_COUNT_MASK: usize = !STATE_MASK;
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
#[must_use]
pub(super) enum TransitionToRunning {
Success,
Cancelled,
Failed,
Dealloc,
}
#[must_use]
pub(super) enum TransitionToIdle {
Ok,
OkNotified,
OkDealloc,
Cancelled,
}
#[must_use]
pub(super) enum TransitionToNotifiedByVal {
DoNothing,
Submit,
Dealloc,
}
#[must_use]
pub(super) enum TransitionToNotifiedByRef {
DoNothing,
Submit,
}
impl State {
pub(super) fn new() -> State {
State {
val: AtomicUsize::new(INITIAL_STATE),
}
}
pub(super) fn load(&self) -> Snapshot {
Snapshot(self.val.load(Acquire))
}
pub(super) fn transition_to_running(&self) -> TransitionToRunning {
self.fetch_update_action(|mut next| {
let action;
assert!(next.is_notified());
if !next.is_idle() {
next.ref_dec();
if next.ref_count() == 0 {
action = TransitionToRunning::Dealloc;
} else {
action = TransitionToRunning::Failed;
}
} else {
next.set_running();
next.unset_notified();
if next.is_cancelled() {
action = TransitionToRunning::Cancelled;
} else {
action = TransitionToRunning::Success;
}
}
(action, Some(next))
})
}
pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
self.fetch_update_action(|curr| {
assert!(curr.is_running());
if curr.is_cancelled() {
return (TransitionToIdle::Cancelled, None);
}
let mut next = curr;
let action;
next.unset_running();
if !next.is_notified() {
next.ref_dec();
if next.ref_count() == 0 {
action = TransitionToIdle::OkDealloc;
} else {
action = TransitionToIdle::Ok;
}
} else {
next.ref_inc();
action = TransitionToIdle::OkNotified;
}
(action, Some(next))
})
}
pub(super) fn transition_to_complete(&self) -> Snapshot {
const DELTA: usize = RUNNING | COMPLETE;
let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
assert!(prev.is_running());
assert!(!prev.is_complete());
Snapshot(prev.0 ^ DELTA)
}
pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
assert!(
prev.ref_count() >= count,
"current: {}, sub: {}",
prev.ref_count(),
count
);
prev.ref_count() == count
}
pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
self.fetch_update_action(|mut snapshot| {
let action;
if snapshot.is_running() {
snapshot.set_notified();
snapshot.ref_dec();
assert!(snapshot.ref_count() > 0);
action = TransitionToNotifiedByVal::DoNothing;
} else if snapshot.is_complete() || snapshot.is_notified() {
snapshot.ref_dec();
if snapshot.ref_count() == 0 {
action = TransitionToNotifiedByVal::Dealloc;
} else {
action = TransitionToNotifiedByVal::DoNothing;
}
} else {
snapshot.set_notified();
snapshot.ref_inc();
action = TransitionToNotifiedByVal::Submit;
}
(action, Some(snapshot))
})
}
pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
self.fetch_update_action(|mut snapshot| {
if snapshot.is_complete() || snapshot.is_notified() {
(TransitionToNotifiedByRef::DoNothing, None)
} else if snapshot.is_running() {
snapshot.set_notified();
(TransitionToNotifiedByRef::DoNothing, Some(snapshot))
} else {
snapshot.set_notified();
snapshot.ref_inc();
(TransitionToNotifiedByRef::Submit, Some(snapshot))
}
})
}
pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
self.fetch_update_action(|mut snapshot| {
if snapshot.is_cancelled() || snapshot.is_complete() {
(false, None)
} else if snapshot.is_running() {
snapshot.set_notified();
snapshot.set_cancelled();
(false, Some(snapshot))
} else {
snapshot.set_cancelled();
if !snapshot.is_notified() {
snapshot.set_notified();
snapshot.ref_inc();
(true, Some(snapshot))
} else {
(false, Some(snapshot))
}
}
})
}
pub(super) fn transition_to_shutdown(&self) -> bool {
let mut prev = Snapshot(0);
let _ = self.fetch_update(|mut snapshot| {
prev = snapshot;
if snapshot.is_idle() {
snapshot.set_running();
}
snapshot.set_cancelled();
Some(snapshot)
});
prev.is_idle()
}
pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
use std::sync::atomic::Ordering::Relaxed;
self.val
.compare_exchange_weak(
INITIAL_STATE,
(INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
Release,
Relaxed,
)
.map(|_| ())
.map_err(|_| ())
}
pub(super) fn unset_join_interested(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
if curr.is_complete() {
return None;
}
let mut next = curr;
next.unset_join_interested();
Some(next)
})
}
pub(super) fn set_join_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
assert!(!curr.has_join_waker());
if curr.is_complete() {
return None;
}
let mut next = curr;
next.set_join_waker();
Some(next)
})
}
pub(super) fn unset_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
assert!(curr.has_join_waker());
if curr.is_complete() {
return None;
}
let mut next = curr;
next.unset_join_waker();
Some(next)
})
}
pub(super) fn ref_inc(&self) {
use std::process;
use std::sync::atomic::Ordering::Relaxed;
let prev = self.val.fetch_add(REF_ONE, Relaxed);
if prev > isize::MAX as usize {
process::abort();
}
}
pub(super) fn ref_dec(&self) -> bool {
let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
assert!(prev.ref_count() >= 1);
prev.ref_count() == 1
}
pub(super) fn ref_dec_twice(&self) -> bool {
let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
assert!(prev.ref_count() >= 2);
prev.ref_count() == 2
}
fn fetch_update_action<F, T>(&self, mut f: F) -> T
where
F: FnMut(Snapshot) -> (T, Option<Snapshot>),
{
let mut curr = self.load();
loop {
let (output, next) = f(curr);
let next = match next {
Some(next) => next,
None => return output,
};
let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
match res {
Ok(_) => return output,
Err(actual) => curr = Snapshot(actual),
}
}
}
fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
where
F: FnMut(Snapshot) -> Option<Snapshot>,
{
let mut curr = self.load();
loop {
let next = match f(curr) {
Some(next) => next,
None => return Err(curr),
};
let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
match res {
Ok(_) => return Ok(next),
Err(actual) => curr = Snapshot(actual),
}
}
}
}
impl Snapshot {
pub(super) fn is_idle(self) -> bool {
self.0 & (RUNNING | COMPLETE) == 0
}
pub(super) fn is_notified(self) -> bool {
self.0 & NOTIFIED == NOTIFIED
}
fn unset_notified(&mut self) {
self.0 &= !NOTIFIED
}
fn set_notified(&mut self) {
self.0 |= NOTIFIED
}
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
fn set_running(&mut self) {
self.0 |= RUNNING;
}
fn unset_running(&mut self) {
self.0 &= !RUNNING;
}
pub(super) fn is_cancelled(self) -> bool {
self.0 & CANCELLED == CANCELLED
}
fn set_cancelled(&mut self) {
self.0 |= CANCELLED;
}
pub(super) fn is_complete(self) -> bool {
self.0 & COMPLETE == COMPLETE
}
pub(super) fn is_join_interested(self) -> bool {
self.0 & JOIN_INTEREST == JOIN_INTEREST
}
fn unset_join_interested(&mut self) {
self.0 &= !JOIN_INTEREST
}
pub(super) fn has_join_waker(self) -> bool {
self.0 & JOIN_WAKER == JOIN_WAKER
}
fn set_join_waker(&mut self) {
self.0 |= JOIN_WAKER;
}
fn unset_join_waker(&mut self) {
self.0 &= !JOIN_WAKER
}
pub(super) fn ref_count(self) -> usize {
(self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
}
fn ref_inc(&mut self) {
assert!(self.0 <= isize::MAX as usize);
self.0 += REF_ONE;
}
pub(super) fn ref_dec(&mut self) {
assert!(self.ref_count() > 0);
self.0 -= REF_ONE
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let snapshot = self.load();
snapshot.fmt(fmt)
}
}
impl fmt::Debug for Snapshot {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Snapshot")
.field("is_running", &self.is_running())
.field("is_complete", &self.is_complete())
.field("is_notified", &self.is_notified())
.field("is_cancelled", &self.is_cancelled())
.field("is_join_interested", &self.is_join_interested())
.field("has_join_waker", &self.has_join_waker())
.field("ref_count", &self.ref_count())
.finish()
}
}