use park::DefaultPark;
use worker::WorkerId;
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub(crate) struct Backup {
handoff: UnsafeCell<Option<WorkerId>>,
state: AtomicUsize,
next_sleeper: UnsafeCell<BackupId>,
park: DefaultPark,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub(crate) struct BackupId(pub(crate) usize);
#[derive(Debug)]
pub(crate) enum Handoff {
Worker(WorkerId),
Idle,
Terminated,
}
#[derive(Clone, Copy, Eq, PartialEq)]
struct State(usize);
pub const PUSHED: usize = 0b001;
pub const RUNNING: usize = 0b010;
pub const TERMINATED: usize = 0b100;
impl Backup {
pub fn new() -> Backup {
Backup {
handoff: UnsafeCell::new(None),
state: AtomicUsize::new(State::new().into()),
next_sleeper: UnsafeCell::new(BackupId(0)),
park: DefaultPark::new(),
}
}
pub fn start(&self, worker_id: &WorkerId) {
debug_assert!({
let state: State = self.state.load(Relaxed).into();
debug_assert!(!state.is_pushed());
debug_assert!(state.is_running());
debug_assert!(!state.is_terminated());
true
});
debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
unsafe {
*self.handoff.get() = None;
}
}
pub fn is_running(&self) -> bool {
let state: State = self.state.load(Relaxed).into();
state.is_running()
}
pub fn worker_handoff(&self, worker_id: WorkerId) -> bool {
unsafe {
debug_assert!((*self.handoff.get()).is_none());
*self.handoff.get() = Some(worker_id);
}
let prev = State::worker_handoff(&self.state);
debug_assert!(prev.is_pushed());
if prev.is_running() {
self.park.notify();
false
} else {
true
}
}
pub fn signal_stop(&self) {
let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into();
debug_assert!(!prev.is_terminated());
debug_assert!(prev.is_pushed());
if prev.is_running() {
self.park.notify();
}
}
pub fn release(&self) {
let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into();
debug_assert!(prev.is_running());
}
pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff {
let sleep_until = timeout.map(|dur| Instant::now() + dur);
let mut state: State = self.state.load(Acquire).into();
loop {
if !state.is_pushed() {
if state.is_terminated() {
return Handoff::Terminated;
}
let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") };
return Handoff::Worker(worker_id);
}
match sleep_until {
None => {
self.park.park_sync(None);
state = self.state.load(Acquire).into();
}
Some(when) => {
let now = Instant::now();
if now < when {
self.park.park_sync(Some(when - now));
state = self.state.load(Acquire).into();
} else {
debug_assert!(state.is_running());
let mut next = state;
next.unset_running();
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if actual == state {
debug_assert!(!next.is_running());
return Handoff::Idle;
}
state = actual;
}
}
}
}
}
pub fn is_pushed(&self) -> bool {
let state: State = self.state.load(Relaxed).into();
state.is_pushed()
}
pub fn set_pushed(&self, ordering: Ordering) {
let prev: State = self.state.fetch_or(PUSHED, ordering).into();
debug_assert!(!prev.is_pushed());
}
#[inline]
pub fn next_sleeper(&self) -> BackupId {
unsafe { *self.next_sleeper.get() }
}
#[inline]
pub fn set_next_sleeper(&self, val: BackupId) {
unsafe {
*self.next_sleeper.get() = val;
}
}
}
impl State {
pub fn new() -> State {
State(0)
}
pub fn is_pushed(&self) -> bool {
self.0 & PUSHED == PUSHED
}
fn unset_pushed(&mut self) {
self.0 &= !PUSHED;
}
pub fn is_running(&self) -> bool {
self.0 & RUNNING == RUNNING
}
pub fn set_running(&mut self) {
self.0 |= RUNNING;
}
pub fn unset_running(&mut self) {
self.0 &= !RUNNING;
}
pub fn is_terminated(&self) -> bool {
self.0 & TERMINATED == TERMINATED
}
fn worker_handoff(state: &AtomicUsize) -> State {
let mut curr: State = state.load(Acquire).into();
loop {
let mut next = curr;
next.set_running();
next.unset_pushed();
let actual = state
.compare_and_swap(curr.into(), next.into(), AcqRel)
.into();
if actual == curr {
return curr;
}
curr = actual;
}
}
}
impl From<usize> for State {
fn from(src: usize) -> State {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> usize {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("backup::State")
.field("is_pushed", &self.is_pushed())
.field("is_running", &self.is_running())
.field("is_terminated", &self.is_terminated())
.finish()
}
}