use crate::loom::sync::atomic::AtomicU64;
use crate::sync::AtomicWaker;
use crate::time::driver::{Handle, Inner};
use crate::time::{Duration, Error, Instant};
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::{self, Poll};
use std::u64;
#[derive(Debug)]
pub(crate) struct Entry {
time: CachePadded<UnsafeCell<Time>>,
inner: Weak<Inner>,
state: AtomicU64,
waker: AtomicWaker,
pub(super) queued: AtomicBool,
pub(super) next_atomic: UnsafeCell<*mut Entry>,
when: UnsafeCell<Option<u64>>,
pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
pub(super) prev_stack: UnsafeCell<*const Entry>,
}
#[derive(Debug)]
pub(crate) struct Time {
pub(crate) deadline: Instant,
pub(crate) duration: Duration,
}
const ELAPSED: u64 = 1 << 63;
const ERROR: u64 = u64::MAX;
impl Entry {
pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
let inner = handle.inner().unwrap();
let entry: Entry;
if inner.increment().is_err() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR)
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
ELAPSED
} else {
when
};
entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
}
let entry = Arc::new(entry);
if inner.queue(&entry).is_err() {
entry.error();
}
entry
}
pub(crate) fn time_ref(&self) -> &Time {
unsafe { &*self.time.0.get() }
}
#[allow(clippy::mut_from_ref)] pub(crate) unsafe fn time_mut(&self) -> &mut Time {
&mut *self.time.0.get()
}
pub(crate) fn when_internal(&self) -> Option<u64> {
unsafe { (*self.when.get()) }
}
pub(crate) fn set_when_internal(&self, when: Option<u64>) {
unsafe {
(*self.when.get()) = when;
}
}
pub(crate) fn load_state(&self) -> Option<u64> {
let state = self.state.load(SeqCst);
if is_elapsed(state) {
None
} else {
Some(state)
}
}
pub(crate) fn is_elapsed(&self) -> bool {
let state = self.state.load(SeqCst);
is_elapsed(state)
}
pub(crate) fn fire(&self, when: u64) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) || curr > when {
return;
}
let next = ELAPSED | curr;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.waker.wake();
}
pub(crate) fn error(&self) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) {
return;
}
let next = ERROR;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.waker.wake();
}
pub(crate) fn cancel(entry: &Arc<Entry>) {
let state = entry.state.fetch_or(ELAPSED, SeqCst);
if is_elapsed(state) {
return;
}
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
let _ = inner.queue(entry);
}
pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let mut curr = self.state.load(SeqCst);
if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
} else {
Ok(())
});
}
self.waker.register_by_ref(cx.waker());
curr = self.state.load(SeqCst);
if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
} else {
Ok(())
});
}
Poll::Pending
}
pub(crate) fn reset(entry: &mut Arc<Entry>) {
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
let deadline = entry.time_ref().deadline;
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();
let mut curr = entry.state.load(SeqCst);
let mut notify;
loop {
if curr == ERROR || curr == when {
return;
}
let next;
if when <= elapsed {
next = ELAPSED;
notify = !is_elapsed(curr);
} else {
next = when;
notify = true;
}
let actual = entry.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
if notify {
let _ = inner.queue(entry);
}
}
fn new2(deadline: Instant, duration: Duration, inner: Weak<Inner>, state: u64) -> Self {
Self {
time: CachePadded(UnsafeCell::new(Time { deadline, duration })),
inner,
waker: AtomicWaker::new(),
state: AtomicU64::new(state),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
fn upgrade_inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
}
fn is_elapsed(state: u64) -> bool {
state & ELAPSED == ELAPSED
}
impl Drop for Entry {
fn drop(&mut self) {
let inner = match self.upgrade_inner() {
Some(inner) => inner,
None => return,
};
inner.decrement();
}
}
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
#[derive(Debug)]
struct CachePadded<T>(T);