use atomic::AtomicU64;
use timer::{HandlePriv, Inner};
use Error;
use crossbeam_utils::CachePadded;
use futures::task::AtomicTask;
use futures::Poll;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use std::u64;
#[derive(Debug)]
pub(crate) struct Entry {
time: CachePadded<UnsafeCell<Time>>,
inner: Option<Weak<Inner>>,
state: AtomicU64,
task: AtomicTask,
pub(super) queued: AtomicBool,
pub(super) next_atomic: UnsafeCell<*mut Entry>,
when: UnsafeCell<Option<u64>>,
pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
pub(super) prev_stack: UnsafeCell<*const Entry>,
}
#[derive(Debug)]
pub(crate) struct Time {
pub(crate) deadline: Instant,
pub(crate) duration: Duration,
}
const ELAPSED: u64 = 1 << 63;
const ERROR: u64 = u64::MAX;
impl Entry {
pub fn new(deadline: Instant, duration: Duration) -> Entry {
Entry {
time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
inner: None,
task: AtomicTask::new(),
state: AtomicU64::new(0),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
pub fn time_ref(&self) -> &Time {
unsafe { &*self.time.get() }
}
pub fn time_mut(&self) -> &mut Time {
unsafe { &mut *self.time.get() }
}
pub fn is_registered(&self) -> bool {
self.inner.is_some()
}
pub fn register(me: &mut Arc<Self>) {
let handle = match HandlePriv::try_current() {
Ok(handle) => handle,
Err(_) => {
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
};
Entry::register_with(me, handle)
}
pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
assert!(!me.is_registered(), "only register an entry once");
let deadline = me.time_ref().deadline;
let inner = match handle.inner() {
Some(inner) => inner,
None => {
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
};
if inner.increment().is_err() {
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
let when = inner.normalize_deadline(deadline);
if when <= inner.elapsed() {
me.state.store(ELAPSED, Relaxed);
return;
} else {
me.state.store(when, Relaxed);
}
if inner.queue(me).is_err() {
me.error();
}
}
fn transition_to_error(&mut self) {
self.inner = Some(Weak::new());
self.state = AtomicU64::new(ERROR);
}
pub fn when_internal(&self) -> Option<u64> {
unsafe { (*self.when.get()) }
}
pub fn set_when_internal(&self, when: Option<u64>) {
unsafe {
(*self.when.get()) = when;
}
}
pub fn load_state(&self) -> Option<u64> {
let state = self.state.load(SeqCst);
if is_elapsed(state) {
None
} else {
Some(state)
}
}
pub fn is_elapsed(&self) -> bool {
let state = self.state.load(SeqCst);
is_elapsed(state)
}
pub fn fire(&self, when: u64) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) || curr > when {
return;
}
let next = ELAPSED | curr;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn error(&self) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) {
return;
}
let next = ERROR;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn cancel(entry: &Arc<Entry>) {
let state = entry.state.fetch_or(ELAPSED, SeqCst);
if is_elapsed(state) {
return;
}
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
let _ = inner.queue(entry);
}
pub fn poll_elapsed(&self) -> Poll<(), Error> {
use futures::Async::NotReady;
let mut curr = self.state.load(SeqCst);
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
self.task.register();
curr = self.state.load(SeqCst).into();
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
Ok(NotReady)
}
pub fn reset(entry: &mut Arc<Entry>) {
if !entry.is_registered() {
return;
}
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
let deadline = entry.time_ref().deadline;
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();
let mut curr = entry.state.load(SeqCst);
let mut notify;
loop {
if curr == ERROR || curr == when {
return;
}
let next;
if when <= elapsed {
next = ELAPSED;
notify = !is_elapsed(curr);
} else {
next = when;
notify = true;
}
let actual = entry.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
if notify {
let _ = inner.queue(entry);
}
}
fn upgrade_inner(&self) -> Option<Arc<Inner>> {
self.inner.as_ref().and_then(|inner| inner.upgrade())
}
}
fn is_elapsed(state: u64) -> bool {
state & ELAPSED == ELAPSED
}
impl Drop for Entry {
fn drop(&mut self) {
let inner = match self.upgrade_inner() {
Some(inner) => inner,
None => return,
};
inner.decrement();
}
}
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}