use std::io;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::time::{Duration, Instant};
use futures::{Future, Poll, Async};
use futures::task::AtomicTask;
use arc_list::Node;
use {TimerHandle, ScheduledTimer};
pub struct Delay {
state: Option<Arc<Node<ScheduledTimer>>>,
when: Instant,
}
impl Delay {
pub fn new(dur: Duration) -> Delay {
Delay::new_at(Instant::now() + dur)
}
pub fn new_at(at: Instant) -> Delay {
Delay::new_handle(at, Default::default())
}
pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
let inner = match handle.inner.upgrade() {
Some(i) => i,
None => return Delay { state: None, when: at },
};
let state = Arc::new(Node::new(ScheduledTimer {
at: Mutex::new(Some(at)),
state: AtomicUsize::new(0),
task: AtomicTask::new(),
inner: handle.inner,
slot: Mutex::new(None),
}));
if inner.list.push(&state).is_err() {
return Delay { state: None, when: at }
}
inner.task.notify();
Delay {
state: Some(state),
when: at,
}
}
pub fn reset(&mut self, dur: Duration) {
self.reset_at(Instant::now() + dur)
}
pub fn reset_at(&mut self, at: Instant) {
self.when = at;
if self._reset(at).is_err() {
self.state = None
}
}
fn _reset(&mut self, at: Instant) -> Result<(), ()> {
let state = match self.state {
Some(ref state) => state,
None => return Err(()),
};
if let Some(timeouts) = state.inner.upgrade() {
let mut bits = state.state.load(SeqCst);
loop {
if bits & 0b10 != 0 {
return Err(())
}
let new = bits.wrapping_add(0b100) & !0b11;
match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
Ok(_) => break,
Err(s) => bits = s,
}
}
*state.at.lock().unwrap() = Some(at);
timeouts.list.push(state)?;
timeouts.task.notify();
}
Ok(())
}
}
pub fn fires_at(timeout: &Delay) -> Instant {
timeout.when
}
impl Future for Delay {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let state = match self.state {
Some(ref state) => state,
None => return Err(io::Error::new(io::ErrorKind::Other,
"timer has gone away")),
};
if state.state.load(SeqCst) & 1 != 0 {
return Ok(Async::Ready(()))
}
state.task.register();
match state.state.load(SeqCst) {
n if n & 0b01 != 0 => Ok(Async::Ready(())),
n if n & 0b10 != 0 => Err(io::Error::new(io::ErrorKind::Other,
"timer has gone away")),
_ => Ok(Async::NotReady),
}
}
}
impl Drop for Delay {
fn drop(&mut self) {
let state = match self.state {
Some(ref s) => s,
None => return,
};
if let Some(timeouts) = state.inner.upgrade() {
*state.at.lock().unwrap() = None;
if timeouts.list.push(state).is_ok() {
timeouts.task.notify();
}
}
}
}