use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task::AtomicWaker;
use crate::arc_list::Node;
use crate::{ScheduledTimer, TimerHandle};
pub struct Delay {
state: Option<Arc<Node<ScheduledTimer>>>,
when: Instant,
}
impl Delay {
#[inline]
pub fn new(dur: Duration) -> Delay {
Delay::new_at(Instant::now() + dur)
}
#[inline]
pub fn new_at(at: Instant) -> Delay {
Delay::new_handle(at, Default::default())
}
pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
let inner = match handle.inner.upgrade() {
Some(i) => i,
None => {
return Delay {
state: None,
when: at,
}
}
};
let state = Arc::new(Node::new(ScheduledTimer {
at: Mutex::new(Some(at)),
state: AtomicUsize::new(0),
waker: AtomicWaker::new(),
inner: handle.inner,
slot: Mutex::new(None),
}));
if inner.list.push(&state).is_err() {
return Delay {
state: None,
when: at,
};
}
inner.waker.wake();
Delay {
state: Some(state),
when: at,
}
}
#[inline]
pub fn reset(&mut self, dur: Duration) {
self.reset_at(Instant::now() + dur)
}
#[inline]
pub fn reset_at(&mut self, at: Instant) {
self.when = at;
if self._reset(at).is_err() {
self.state = None
}
}
fn _reset(&mut self, at: Instant) -> Result<(), ()> {
let state = match self.state {
Some(ref state) => state,
None => return Err(()),
};
if let Some(timeouts) = state.inner.upgrade() {
let mut bits = state.state.load(SeqCst);
loop {
if bits & 0b10 != 0 {
return Err(());
}
let new = bits.wrapping_add(0b100) & !0b11;
match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
Ok(_) => break,
Err(s) => bits = s,
}
}
*state.at.lock().unwrap() = Some(at);
timeouts.list.push(state)?;
timeouts.waker.wake();
}
Ok(())
}
}
#[inline]
pub fn fires_at(timeout: &Delay) -> Instant {
timeout.when
}
impl Future for Delay {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = match self.state {
Some(ref state) => state,
None => {
let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
return Poll::Ready(err);
}
};
if state.state.load(SeqCst) & 1 != 0 {
return Poll::Ready(Ok(()));
}
state.waker.register(&cx.waker());
match state.state.load(SeqCst) {
n if n & 0b01 != 0 => Poll::Ready(Ok(())),
n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"timer has gone away",
))),
_ => Poll::Pending,
}
}
}
impl Drop for Delay {
fn drop(&mut self) {
let state = match self.state {
Some(ref s) => s,
None => return,
};
if let Some(timeouts) = state.inner.upgrade() {
*state.at.lock().unwrap() = None;
if timeouts.list.push(state).is_ok() {
timeouts.waker.wake();
}
}
}
}
impl fmt::Debug for Delay {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Delay").field("when", &self.when).finish()
}
}