1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use futures::stream::poll_fn;
use futures::{try_ready, Async, Stream};
use mio::unix::EventedFd;
use mio::{Evented, Poll, PollOpt, Ready, Token};
use std::io::Result;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};
use timerfd::{SetTimeFlags, TimerFd as InnerTimerFd, TimerState};
use tokio_reactor::PollEvented;
pub use timerfd::ClockId;
mod delay;
mod delay_queue;
mod interval;
pub use delay::Delay;
pub use delay_queue::DelayQueue;
pub use interval::Interval;
struct Inner(InnerTimerFd);
impl Evented for Inner {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> Result<()> {
poll.register(&EventedFd(&self.0.as_raw_fd()), token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> Result<()> {
poll.reregister(&EventedFd(&self.0.as_raw_fd()), token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> Result<()> {
poll.deregister(&EventedFd(&self.0.as_raw_fd()))
}
}
pub struct TimerFd(PollEvented<Inner>);
impl TimerFd {
pub fn new(clock: ClockId) -> std::io::Result<Self> {
let inner = PollEvented::new(Inner(InnerTimerFd::new_custom(clock, true, true)?));
Ok(TimerFd(inner))
}
fn set_state(&mut self, state: TimerState, flags: SetTimeFlags) {
(self.0).get_mut().0.set_state(state, flags);
}
fn poll_read(&mut self) -> Result<Async<()>> {
let ready = try_ready!(self.0.poll_read_ready(Ready::readable()));
self.0.get_mut().0.read();
self.0.clear_read_ready(ready)?;
Ok(Async::Ready(()))
}
#[deprecated(note = "please use Interval")]
pub fn periodic(mut self, dur: Duration) -> impl Stream<Item = (), Error = std::io::Error> {
self.set_state(
TimerState::Periodic {
current: dur,
interval: dur,
},
SetTimeFlags::Default,
);
poll_fn(move || {
try_ready!(self.poll_read());
Ok(Async::Ready(Some(())))
})
}
}
pub fn sleep(duration: Duration) -> Delay {
Delay::new(Instant::now() + duration).expect("can't create delay")
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
use tokio::prelude::*;
#[test]
fn periodic_works() {
let timer = TimerFd::new(ClockId::Monotonic).unwrap();
tokio::run(future::lazy(|| {
let now = Instant::now();
timer
.periodic(Duration::from_micros(1))
.take(2)
.map_err(|err| println!("{:?}", err))
.for_each(move |_| Ok(()))
.and_then(move |_| {
let elapsed = now.elapsed();
println!("{:?}", elapsed);
assert!(elapsed < Duration::from_millis(1));
Ok(())
})
}));
}
}