async_std/stream/
interval.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3use std::time::{Duration, Instant};
4
5use futures_timer::Delay;
6
7use crate::prelude::*;
8
9#[cfg(feature = "unstable")]
45#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
46pub fn interval(dur: Duration) -> Interval {
47 Interval {
48 delay: Delay::new(dur),
49 interval: dur,
50 }
51}
52
53#[cfg(feature = "unstable")]
60#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
61#[derive(Debug)]
62pub struct Interval {
63 delay: Delay,
64 interval: Duration,
65}
66
67impl Stream for Interval {
68 type Item = ();
69
70 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 if Pin::new(&mut self.delay).poll(cx).is_pending() {
72 return Poll::Pending;
73 }
74 let when = Instant::now();
75 let next = next_interval(when, Instant::now(), self.interval);
76 self.delay.reset(next);
77 Poll::Ready(Some(()))
78 }
79}
80
81fn duration_to_nanos(dur: Duration) -> Option<u64> {
90 dur.as_secs()
91 .checked_mul(1_000_000_000)
92 .and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
93}
94
95fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
96 let new = prev + interval;
97 if new > now {
98 return new;
99 }
100
101 let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
102 let interval_ns =
103 duration_to_nanos(interval).expect("interval is less that 427 thousand years");
104 let mult = spent_ns / interval_ns + 1;
105 assert!(
106 mult < (1 << 32),
107 "can't skip more than 4 billion intervals of {:?} \
108 (trying to skip {})",
109 interval,
110 mult
111 );
112 prev + interval * (mult as u32)
113}
114
115#[cfg(test)]
116mod test {
117 use super::next_interval;
118 use std::cmp::Ordering;
119 use std::time::{Duration, Instant};
120
121 struct Timeline(Instant);
122
123 impl Timeline {
124 fn new() -> Timeline {
125 Timeline(Instant::now())
126 }
127 fn at(&self, millis: u64) -> Instant {
128 self.0 + Duration::from_millis(millis)
129 }
130 fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
131 self.0 + Duration::new(sec, nanos)
132 }
133 }
134
135 fn dur(millis: u64) -> Duration {
136 Duration::from_millis(millis)
137 }
138
139 fn almost_eq(a: Instant, b: Instant) -> bool {
142 match a.cmp(&b) {
143 Ordering::Equal => true,
144 Ordering::Greater => a - b < Duration::from_millis(1),
145 Ordering::Less => b - a < Duration::from_millis(1),
146 }
147 }
148
149 #[test]
150 fn norm_next() {
151 let tm = Timeline::new();
152 assert!(almost_eq(
153 next_interval(tm.at(1), tm.at(2), dur(10)),
154 tm.at(11)
155 ));
156 assert!(almost_eq(
157 next_interval(tm.at(7777), tm.at(7788), dur(100)),
158 tm.at(7877)
159 ));
160 assert!(almost_eq(
161 next_interval(tm.at(1), tm.at(1000), dur(2100)),
162 tm.at(2101)
163 ));
164 }
165
166 #[test]
167 fn fast_forward() {
168 let tm = Timeline::new();
169 assert!(almost_eq(
170 next_interval(tm.at(1), tm.at(1000), dur(10)),
171 tm.at(1001)
172 ));
173 assert!(almost_eq(
174 next_interval(tm.at(7777), tm.at(8888), dur(100)),
175 tm.at(8977)
176 ));
177 assert!(almost_eq(
178 next_interval(tm.at(1), tm.at(10000), dur(2100)),
179 tm.at(10501)
180 ));
181 }
182
183 #[test]
187 #[should_panic(expected = "can't skip more than 4 billion intervals")]
188 fn large_skip() {
189 let tm = Timeline::new();
190 assert_eq!(
191 next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
192 tm.at_ns(25, 1)
193 );
194 }
195}