async_std/stream/
interval.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use std::time::{Duration, Instant};
4
5use futures_timer::Delay;
6
7use crate::prelude::*;
8
9/// Creates a new stream that yields at a set interval.
10///
11/// The stream first yields after `dur`, and continues to yield every
12/// `dur` after that. The stream accounts for time elapsed between calls, and
13/// will adjust accordingly to prevent time skews.
14///
15/// Each interval may be slightly longer than the specified duration, but never
16/// less.
17///
18/// Note that intervals are not intended for high resolution timers, but rather
19/// they will likely fire some granularity after the exact instant that they're
20/// otherwise indicated to fire at.
21///
22/// See also: [`task::sleep`].
23///
24/// [`task::sleep`]: ../task/fn.sleep.html
25///
26/// # Examples
27///
28/// Basic example:
29///
30/// ```no_run
31/// use async_std::prelude::*;
32/// use async_std::stream;
33/// use std::time::Duration;
34///
35/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
36/// #
37/// let mut interval = stream::interval(Duration::from_secs(4));
38/// while let Some(_) = interval.next().await {
39///     println!("prints every four seconds");
40/// }
41/// #
42/// # Ok(()) }) }
43/// ```
44#[cfg(feature = "unstable")]
45#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
46pub fn interval(dur: Duration) -> Interval {
47    Interval {
48        delay: Delay::new(dur),
49        interval: dur,
50    }
51}
52
53/// A stream representing notifications at fixed interval
54///
55/// This stream is created by the [`interval`] function. See its
56/// documentation for more.
57///
58/// [`interval`]: fn.interval.html
59#[cfg(feature = "unstable")]
60#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
61#[derive(Debug)]
62pub struct Interval {
63    delay: Delay,
64    interval: Duration,
65}
66
67impl Stream for Interval {
68    type Item = ();
69
70    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        if Pin::new(&mut self.delay).poll(cx).is_pending() {
72            return Poll::Pending;
73        }
74        let when = Instant::now();
75        let next = next_interval(when, Instant::now(), self.interval);
76        self.delay.reset(next);
77        Poll::Ready(Some(()))
78    }
79}
80
81/// Converts Duration object to raw nanoseconds if possible
82///
83/// This is useful to divide intervals.
84///
85/// While technically for large duration it's impossible to represent any
86/// duration as nanoseconds, the largest duration we can represent is about
87/// 427_000 years. Large enough for any interval we would use or calculate in
88/// async-std.
89fn duration_to_nanos(dur: Duration) -> Option<u64> {
90    dur.as_secs()
91        .checked_mul(1_000_000_000)
92        .and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
93}
94
95fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
96    let new = prev + interval;
97    if new > now {
98        return new;
99    }
100
101    let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
102    let interval_ns =
103        duration_to_nanos(interval).expect("interval is less that 427 thousand years");
104    let mult = spent_ns / interval_ns + 1;
105    assert!(
106        mult < (1 << 32),
107        "can't skip more than 4 billion intervals of {:?} \
108         (trying to skip {})",
109        interval,
110        mult
111    );
112    prev + interval * (mult as u32)
113}
114
115#[cfg(test)]
116mod test {
117    use super::next_interval;
118    use std::cmp::Ordering;
119    use std::time::{Duration, Instant};
120
121    struct Timeline(Instant);
122
123    impl Timeline {
124        fn new() -> Timeline {
125            Timeline(Instant::now())
126        }
127        fn at(&self, millis: u64) -> Instant {
128            self.0 + Duration::from_millis(millis)
129        }
130        fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
131            self.0 + Duration::new(sec, nanos)
132        }
133    }
134
135    fn dur(millis: u64) -> Duration {
136        Duration::from_millis(millis)
137    }
138
139    // The math around Instant/Duration isn't 100% precise due to rounding
140    // errors, see #249 for more info
141    fn almost_eq(a: Instant, b: Instant) -> bool {
142        match a.cmp(&b) {
143            Ordering::Equal => true,
144            Ordering::Greater => a - b < Duration::from_millis(1),
145            Ordering::Less => b - a < Duration::from_millis(1),
146        }
147    }
148
149    #[test]
150    fn norm_next() {
151        let tm = Timeline::new();
152        assert!(almost_eq(
153            next_interval(tm.at(1), tm.at(2), dur(10)),
154            tm.at(11)
155        ));
156        assert!(almost_eq(
157            next_interval(tm.at(7777), tm.at(7788), dur(100)),
158            tm.at(7877)
159        ));
160        assert!(almost_eq(
161            next_interval(tm.at(1), tm.at(1000), dur(2100)),
162            tm.at(2101)
163        ));
164    }
165
166    #[test]
167    fn fast_forward() {
168        let tm = Timeline::new();
169        assert!(almost_eq(
170            next_interval(tm.at(1), tm.at(1000), dur(10)),
171            tm.at(1001)
172        ));
173        assert!(almost_eq(
174            next_interval(tm.at(7777), tm.at(8888), dur(100)),
175            tm.at(8977)
176        ));
177        assert!(almost_eq(
178            next_interval(tm.at(1), tm.at(10000), dur(2100)),
179            tm.at(10501)
180        ));
181    }
182
183    /// TODO: this test actually should be successful, but since we can't
184    ///       multiply Duration on anything larger than u32 easily we decided
185    ///       to allow it to fail for now
186    #[test]
187    #[should_panic(expected = "can't skip more than 4 billion intervals")]
188    fn large_skip() {
189        let tm = Timeline::new();
190        assert_eq!(
191            next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
192            tm.at_ns(25, 1)
193        );
194    }
195}