tokio_timerfd/
interval.rs1use std::io::Error as IoError;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::{Duration, Instant};
5
6use crate::{ClockId, TimerFd};
7use futures_core::{ready, Stream};
8use timerfd::{SetTimeFlags, TimerState};
9use tokio::io::{AsyncRead, ReadBuf};
10
11pub struct Interval {
13 timerfd: TimerFd,
14 at: Instant,
15 duration: Duration,
16 initialized: bool,
17}
18
19impl Interval {
20 pub fn new(at: Instant, duration: Duration) -> Result<Interval, IoError> {
28 let timerfd = TimerFd::new(ClockId::Monotonic)?;
29 assert!(
30 duration > Duration::new(0, 0),
31 "`duration` must be non-zero."
32 );
33 Ok(Interval {
34 timerfd,
35 at,
36 duration,
37 initialized: false,
38 })
39 }
40
41 pub fn new_interval(duration: Duration) -> Result<Interval, IoError> {
51 Self::new(Instant::now() + duration, duration)
52 }
53}
54
55impl Stream for Interval {
56 type Item = Result<(), IoError>;
57
58 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 if !self.initialized {
60 let now = Instant::now();
61 let first_duration = if self.at > now {
62 self.at - now
63 } else {
64 Duration::from_nanos(1)
66 };
67 let duration = self.duration;
68 self.as_mut().timerfd.set_state(
69 TimerState::Periodic {
70 current: first_duration,
71 interval: duration,
72 },
73 SetTimeFlags::Default,
74 );
75 self.initialized = true;
76 }
77 let mut buf = [0u8; 8];
78 let mut buf = ReadBuf::new(&mut buf);
79 ready!(Pin::new(&mut self.as_mut().timerfd).poll_read(cx, &mut buf)?);
80 Poll::Ready(Some(Ok(())))
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use futures::stream::StreamExt;
88 use std::time::Instant;
89
90 #[tokio::test]
91 async fn interval_works() {
92 let mut interval = Interval::new_interval(Duration::from_micros(1)).unwrap();
93
94 let start = Instant::now();
95 for _ in 0..5 {
96 interval.next().await.unwrap().unwrap();
97 }
98 let elapsed = start.elapsed();
99 println!("{:?}", elapsed);
100 assert!(elapsed < Duration::from_millis(1));
101 }
102
103 #[tokio::test]
104 async fn long_interval_works() {
105 let mut interval = Interval::new_interval(Duration::from_secs(1)).unwrap();
106
107 let start = Instant::now();
108 for _ in 0..5 {
109 let before = Instant::now();
110 interval.next().await.unwrap().unwrap();
111 let elapsed = before.elapsed();
112 assert!(elapsed.as_secs_f64() > 0.99 && elapsed.as_secs_f64() < 1.1);
113 }
114 let elapsed = start.elapsed();
115 println!("long interval elapsed: {:?}", elapsed);
116 assert!(elapsed < Duration::from_secs(6));
117 }
118}