tokio_timerfd/
interval.rs

1use std::io::Error as IoError;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::{Duration, Instant};
5
6use crate::{ClockId, TimerFd};
7use futures_core::{ready, Stream};
8use timerfd::{SetTimeFlags, TimerState};
9use tokio::io::{AsyncRead, ReadBuf};
10
11/// A stream representing notifications at fixed interval
12pub struct Interval {
13    timerfd: TimerFd,
14    at: Instant,
15    duration: Duration,
16    initialized: bool,
17}
18
19impl Interval {
20    /// Create a new `Interval` that starts at `at` and yields every `duration`
21    /// interval after that.
22    /// The `duration` argument must be a non-zero duration.
23    ///
24    /// # Panics
25    ///
26    /// This function panics if `duration` is zero.
27    pub fn new(at: Instant, duration: Duration) -> Result<Interval, IoError> {
28        let timerfd = TimerFd::new(ClockId::Monotonic)?;
29        assert!(
30            duration > Duration::new(0, 0),
31            "`duration` must be non-zero."
32        );
33        Ok(Interval {
34            timerfd,
35            at,
36            duration,
37            initialized: false,
38        })
39    }
40
41    /// Creates new `Interval` that yields with interval of `duration`.
42    ///
43    /// The function is shortcut for `Interval::new(Instant::now() + duration, duration)`.
44    ///
45    /// The `duration` argument must be a non-zero duration.
46    ///
47    /// # Panics
48    ///
49    /// This function panics if `duration` is zero.
50    pub fn new_interval(duration: Duration) -> Result<Interval, IoError> {
51        Self::new(Instant::now() + duration, duration)
52    }
53}
54
55impl Stream for Interval {
56    type Item = Result<(), IoError>;
57
58    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        if !self.initialized {
60            let now = Instant::now();
61            let first_duration = if self.at > now {
62                self.at - now
63            } else {
64                /* can't set it to zero as it disables timer */
65                Duration::from_nanos(1)
66            };
67            let duration = self.duration;
68            self.as_mut().timerfd.set_state(
69                TimerState::Periodic {
70                    current: first_duration,
71                    interval: duration,
72                },
73                SetTimeFlags::Default,
74            );
75            self.initialized = true;
76        }
77        let mut buf = [0u8; 8];
78        let mut buf = ReadBuf::new(&mut buf);
79        ready!(Pin::new(&mut self.as_mut().timerfd).poll_read(cx, &mut buf)?);
80        Poll::Ready(Some(Ok(())))
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use futures::stream::StreamExt;
88    use std::time::Instant;
89
90    #[tokio::test]
91    async fn interval_works() {
92        let mut interval = Interval::new_interval(Duration::from_micros(1)).unwrap();
93
94        let start = Instant::now();
95        for _ in 0..5 {
96            interval.next().await.unwrap().unwrap();
97        }
98        let elapsed = start.elapsed();
99        println!("{:?}", elapsed);
100        assert!(elapsed < Duration::from_millis(1));
101    }
102
103    #[tokio::test]
104    async fn long_interval_works() {
105        let mut interval = Interval::new_interval(Duration::from_secs(1)).unwrap();
106
107        let start = Instant::now();
108        for _ in 0..5 {
109            let before = Instant::now();
110            interval.next().await.unwrap().unwrap();
111            let elapsed = before.elapsed();
112            assert!(elapsed.as_secs_f64() > 0.99 && elapsed.as_secs_f64() < 1.1);
113        }
114        let elapsed = start.elapsed();
115        println!("long interval elapsed: {:?}", elapsed);
116        assert!(elapsed < Duration::from_secs(6));
117    }
118}