use {interval, Interval, Builder, wheel};
use worker::Worker;
use wheel::{Token, Wheel};
use futures::{Future, Stream, Async, Poll};
use futures::task::{self, Task};
use std::{fmt, io};
use std::error::Error;
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct Timer {
worker: Worker,
}
#[must_use = "futures do nothing unless polled"]
pub struct Sleep {
timer: Timer,
when: Instant,
handle: Option<(Task, Token)>,
}
#[must_use = "futures do nothing unless polled"]
pub struct Timeout<T> {
future: Option<T>,
sleep: Sleep,
}
pub struct TimeoutStream<T> {
stream: Option<T>,
duration: Duration,
sleep: Sleep,
}
#[derive(Debug, Clone)]
pub enum TimerError {
TooLong,
NoCapacity,
}
#[derive(Clone)]
pub enum TimeoutError<T> {
Timer(T, TimerError),
TimedOut(T),
}
pub fn build(builder: Builder) -> Timer {
let wheel = Wheel::new(&builder);
let worker = Worker::spawn(wheel, &builder);
Timer { worker: worker }
}
impl Timer {
pub fn sleep(&self, duration: Duration) -> Sleep {
Sleep::new(self.clone(), duration)
}
pub fn timeout<F, E>(&self, future: F, duration: Duration) -> Timeout<F>
where F: Future<Error = E>,
E: From<TimeoutError<F>>,
{
Timeout {
future: Some(future),
sleep: self.sleep(duration),
}
}
pub fn timeout_stream<T, E>(&self, stream: T, duration: Duration) -> TimeoutStream<T>
where T: Stream<Error = E>,
E: From<TimeoutError<T>>,
{
TimeoutStream {
stream: Some(stream),
duration: duration,
sleep: self.sleep(duration),
}
}
pub fn interval(&self, dur: Duration) -> Interval {
interval::new(self.sleep(dur), dur)
}
pub fn interval_at(&self, at: Instant, dur: Duration) -> Interval {
let now = Instant::now();
let sleep = if at > now {
self.sleep(at - now)
} else {
self.sleep(dur)
};
interval::new(sleep, dur)
}
}
impl Default for Timer {
fn default() -> Timer {
wheel().build()
}
}
impl Sleep {
fn new(timer: Timer, duration: Duration) -> Sleep {
Sleep {
timer: timer,
when: Instant::now() + duration,
handle: None,
}
}
pub fn is_expired(&self) -> bool {
Instant::now() >= self.when - *self.timer.worker.tolerance()
}
pub fn remaining(&self) -> Duration {
let now = Instant::now();
if now >= self.when {
Duration::from_millis(0)
} else {
self.when - now
}
}
pub fn timer(&self) -> &Timer {
&self.timer
}
}
impl Future for Sleep {
type Item = ();
type Error = TimerError;
fn poll(&mut self) -> Poll<(), TimerError> {
if self.is_expired() {
return Ok(Async::Ready(()));
}
let handle = match self.handle {
None => {
if (self.when - Instant::now()) > *self.timer.worker.max_timeout() {
return Err(TimerError::TooLong);
}
let task = task::park();
match self.timer.worker.set_timeout(self.when, task.clone()) {
Ok(token) => {
(task, token)
}
Err(task) => {
task.unpark();
return Ok(Async::NotReady);
}
}
}
Some((ref task, token)) => {
if task.is_current() {
return Ok(Async::NotReady);
}
let task = task::park();
match self.timer.worker.move_timeout(token, self.when, task.clone()) {
Ok(_) => (task, token),
Err(task) => {
task.unpark();
return Ok(Async::NotReady);
}
}
}
};
self.handle = Some(handle);
Ok(Async::NotReady)
}
}
impl Drop for Sleep {
fn drop(&mut self) {
if let Some((_, token)) = self.handle {
self.timer.worker.cancel_timeout(token, self.when);
}
}
}
impl<T> Timeout<T> {
pub fn get_ref(&self) -> &T {
self.future.as_ref().expect("the future has already been consumed")
}
pub fn get_mut(&mut self) -> &mut T {
self.future.as_mut().expect("the future has already been consumed")
}
pub fn into_inner(self) -> T {
self.future.expect("the future has already been consumed")
}
}
impl<F, E> Future for Timeout<F>
where F: Future<Error = E>,
E: From<TimeoutError<F>>,
{
type Item = F::Item;
type Error = E;
fn poll(&mut self) -> Poll<F::Item, E> {
match self.future {
Some(ref mut f) => {
match f.poll() {
Ok(Async::NotReady) => {}
v => return v,
}
}
None => panic!("cannot call poll once value is consumed"),
}
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
let f = self.future.take().unwrap();
Err(TimeoutError::TimedOut(f).into())
}
Err(e) => {
let f = self.future.take().unwrap();
Err(TimeoutError::Timer(f, e).into())
}
}
}
}
impl<T> TimeoutStream<T> {
pub fn get_ref(&self) -> &T {
self.stream.as_ref().expect("the stream has already been consumed")
}
pub fn get_mut(&mut self) -> &mut T {
self.stream.as_mut().expect("the stream has already been consumed")
}
pub fn into_inner(self) -> T {
self.stream.expect("the stream has already been consumed")
}
}
impl<T, E> Stream for TimeoutStream<T>
where T: Stream<Error = E>,
E: From<TimeoutError<T>>,
{
type Item = T::Item;
type Error = E;
fn poll(&mut self) -> Poll<Option<T::Item>, E> {
match self.stream {
Some(ref mut s) => {
match s.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(Some(v))) => {
self.sleep = Sleep::new(self.sleep.timer.clone(), self.duration);
return Ok(Async::Ready(Some(v)));
}
v => return v,
}
}
None => panic!("cannot call poll once value is consumed"),
}
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
let s = self.stream.take().unwrap();
Err(TimeoutError::TimedOut(s).into())
}
Err(e) => {
let s = self.stream.take().unwrap();
Err(TimeoutError::Timer(s, e).into())
}
}
}
}
impl fmt::Display for TimerError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", Error::description(self))
}
}
impl Error for TimerError {
fn description(&self) -> &str {
match *self {
TimerError::TooLong => "requested timeout too long",
TimerError::NoCapacity => "timer out of capacity",
}
}
}
impl<T> fmt::Display for TimeoutError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", Error::description(self))
}
}
impl<T> fmt::Debug for TimeoutError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", Error::description(self))
}
}
impl<T> Error for TimeoutError<T> {
fn description(&self) -> &str {
use self::TimerError::*;
use self::TimeoutError::*;
match *self {
Timer(_, TooLong) => "requested timeout too long",
Timer(_, NoCapacity) => "timer out of capacity",
TimedOut(_) => "the future timed out",
}
}
}
impl<T> From<TimeoutError<T>> for io::Error {
fn from(src: TimeoutError<T>) -> io::Error {
use self::TimerError::*;
use self::TimeoutError::*;
match src {
Timer(_, TooLong) => io::Error::new(io::ErrorKind::InvalidInput, "requested timeout too long"),
Timer(_, NoCapacity) => io::Error::new(io::ErrorKind::Other, "timer out of capacity"),
TimedOut(_) => io::Error::new(io::ErrorKind::TimedOut, "the future timed out"),
}
}
}
impl From<TimerError> for io::Error {
fn from(src: TimerError) -> io::Error {
io::Error::new(io::ErrorKind::Other, src)
}
}
impl From<TimerError> for () {
fn from(_: TimerError) -> () {
}
}
impl<T> From<TimeoutError<T>> for () {
fn from(_: TimeoutError<T>) -> () {
}
}