use config::MAX_WORKERS;
use worker;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
use std::{fmt, usize};
#[derive(Debug)]
pub(crate) struct Stack {
state: AtomicUsize,
}
#[derive(Eq, PartialEq, Clone, Copy)]
pub struct State(usize);
const STACK_MASK: usize = ((1 << 16) - 1);
pub(crate) const EMPTY: usize = MAX_WORKERS;
pub(crate) const TERMINATED: usize = EMPTY + 1;
const ABA_GUARD_SHIFT: usize = 16;
#[cfg(target_pointer_width = "64")]
const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
#[cfg(target_pointer_width = "32")]
const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
impl Stack {
pub fn new() -> Stack {
let state = AtomicUsize::new(State::new().into());
Stack { state }
}
pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> {
let mut state: State = self.state.load(Acquire).into();
debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed());
loop {
let mut next = state;
let head = state.head();
if head == TERMINATED {
return Err(());
}
entries[idx].set_next_sleeper(head);
next.set_head(idx);
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if state == actual {
return Ok(());
}
state = actual;
}
}
pub fn pop(
&self,
entries: &[worker::Entry],
max_lifecycle: worker::Lifecycle,
terminate: bool,
) -> Option<(usize, worker::State)> {
let terminal = match terminate {
true => TERMINATED,
false => EMPTY,
};
debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled);
let mut state: State = self.state.load(Acquire).into();
loop {
let head = state.head();
if head == EMPTY {
let mut next = state;
next.set_head(terminal);
if next == state {
debug_assert!(terminal == EMPTY);
return None;
}
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if actual != state {
state = actual;
continue;
}
return None;
} else if head == TERMINATED {
return None;
}
debug_assert!(head < MAX_WORKERS);
let mut next = state;
let next_head = entries[head].next_sleeper();
debug_assert!(next_head != TERMINATED);
if next_head == EMPTY {
next.set_head(terminal);
} else {
next.set_head(next_head);
}
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if actual == state {
let state = entries[head].fetch_unset_pushed(AcqRel);
if state.lifecycle() >= max_lifecycle {
continue;
}
return Some((head, state));
}
state = actual;
}
}
}
impl State {
#[inline]
fn new() -> State {
State(EMPTY)
}
#[inline]
fn head(&self) -> usize {
self.0 & STACK_MASK
}
#[inline]
fn set_head(&mut self, val: usize) {
let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
}
}
impl From<usize> for State {
fn from(src: usize) -> Self {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> Self {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let head = self.head();
let mut fmt = fmt.debug_struct("stack::State");
if head < MAX_WORKERS {
fmt.field("head", &head);
} else if head == EMPTY {
fmt.field("head", &"EMPTY");
} else if head == TERMINATED {
fmt.field("head", &"TERMINATED");
}
fmt.finish()
}
}