1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! This crates provides [tokio-timer](https://docs.rs/tokio-timer)-like API
//! on top of timerfd. `timerfd` is a Linux-specific API providing timer notifications as
//! file descriptor read events.
//!
//! The advantage of `timerfd` is that it has more granularity than epoll_wait(),
//! which only provides 1 millisecond timeouts. `timerfd` API allows for nanosecond
//! precision, but precise timing of the wakeup is not guaranteed on a normal
//! multitasking system.
//!
//! Despite the name, this crate is *not* a part of the tokio project.
//!
//! * [`Delay`]: A future that completes at a specified instant in time.
//! * [`Interval`] A stream that yields at fixed time intervals.
//! * [`DelayQueue`]: A queue where items are returned once the requested delay
//!   has expired.
//!
//! [`Delay`]: struct.Delay.html
//! [`DelayQueue`]: struct.DelayQueue.html
//! [`Interval`]: struct.Interval.html

use futures::stream::poll_fn;
use futures::{try_ready, Async, Stream};
use mio::unix::EventedFd;
use mio::{Evented, Poll, PollOpt, Ready, Token};
use std::io::Result;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};
use timerfd::{SetTimeFlags, TimerFd as InnerTimerFd, TimerState};
use tokio_reactor::PollEvented;

pub use timerfd::ClockId;

mod delay;
mod delay_queue;
mod interval;

pub use delay::Delay;
pub use delay_queue::DelayQueue;
pub use interval::Interval;

struct Inner(InnerTimerFd);

impl Evented for Inner {
    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> Result<()> {
        poll.register(&EventedFd(&self.0.as_raw_fd()), token, interest, opts)
    }

    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> Result<()> {
        poll.reregister(&EventedFd(&self.0.as_raw_fd()), token, interest, opts)
    }

    fn deregister(&self, poll: &Poll) -> Result<()> {
        poll.deregister(&EventedFd(&self.0.as_raw_fd()))
    }
}

pub struct TimerFd(PollEvented<Inner>);

impl TimerFd {
    pub fn new(clock: ClockId) -> std::io::Result<Self> {
        let inner = PollEvented::new(Inner(InnerTimerFd::new_custom(clock, true, true)?));
        Ok(TimerFd(inner))
    }

    fn set_state(&mut self, state: TimerState, flags: SetTimeFlags) {
        (self.0).get_mut().0.set_state(state, flags);
    }

    fn poll_read(&mut self) -> Result<Async<()>> {
        let ready = try_ready!(self.0.poll_read_ready(Ready::readable()));
        self.0.get_mut().0.read();
        self.0.clear_read_ready(ready)?;
        Ok(Async::Ready(()))
    }

    #[deprecated(note = "please use Interval")]
    pub fn periodic(mut self, dur: Duration) -> impl Stream<Item = (), Error = std::io::Error> {
        self.set_state(
            TimerState::Periodic {
                current: dur,
                interval: dur,
            },
            SetTimeFlags::Default,
        );
        poll_fn(move || {
            try_ready!(self.poll_read());
            Ok(Async::Ready(Some(())))
        })
    }
}

/// Create a Future that completes in `duration` from now.
pub fn sleep(duration: Duration) -> Delay {
    Delay::new(Instant::now() + duration).expect("can't create delay")
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Instant;
    use tokio::prelude::*;

    #[test]
    fn periodic_works() {
        let timer = TimerFd::new(ClockId::Monotonic).unwrap();
        tokio::run(future::lazy(|| {
            let now = Instant::now();
            timer
                .periodic(Duration::from_micros(1))
                .take(2)
                .map_err(|err| println!("{:?}", err))
                .for_each(move |_| Ok(()))
                .and_then(move |_| {
                    let elapsed = now.elapsed();
                    println!("{:?}", elapsed);
                    assert!(elapsed < Duration::from_millis(1));
                    Ok(())
                })
        }));
    }
}