moduvex_runtime/time/
interval.rs1use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant};
11
12use super::{with_timer_wheel, TimerId};
13
14pub struct Interval {
16 period: Duration,
18 next_deadline: Instant,
20 missed: u64,
22}
23
24impl Interval {
25 pub(crate) fn new(period: Duration) -> Self {
26 assert!(!period.is_zero(), "interval period must be non-zero");
27 Self {
28 period,
29 next_deadline: Instant::now() + period,
30 missed: 0,
31 }
32 }
33
34 pub fn tick(&mut self) -> TickFuture<'_> {
39 TickFuture {
40 interval: self,
41 timer_id: None,
42 }
43 }
44}
45
46pub struct TickFuture<'a> {
48 interval: &'a mut Interval,
49 timer_id: Option<TimerId>,
50}
51
52impl<'a> Future for TickFuture<'a> {
53 type Output = Instant;
54
55 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56 let now = Instant::now();
57
58 if now >= self.interval.next_deadline {
60 if let Some(id) = self.timer_id.take() {
62 with_timer_wheel(|w| {
63 w.cancel(id);
64 });
65 }
66
67 let fired_at = self.interval.next_deadline;
68
69 let elapsed = now.duration_since(fired_at);
71 let extra_ticks = (elapsed.as_nanos() / self.interval.period.as_nanos()) as u64;
72 self.interval.missed += extra_ticks;
73 let advance = extra_ticks.saturating_add(1).min(u32::MAX as u64) as u32;
75 let skip = self
76 .interval
77 .period
78 .checked_mul(advance)
79 .unwrap_or(Duration::MAX);
80 self.interval.next_deadline = fired_at + skip;
81
82 return Poll::Ready(fired_at);
83 }
84
85 if let Some(old_id) = self.timer_id.take() {
87 with_timer_wheel(|w| {
88 w.cancel(old_id);
89 });
90 }
91 let deadline = self.interval.next_deadline;
92 let id = with_timer_wheel(|w| w.insert(deadline, cx.waker().clone()));
93 self.timer_id = Some(id);
94
95 Poll::Pending
96 }
97}
98
99impl<'a> Drop for TickFuture<'a> {
100 fn drop(&mut self) {
101 if let Some(id) = self.timer_id.take() {
102 with_timer_wheel(|w| {
103 w.cancel(id);
104 });
105 }
106 }
107}
108
109pub fn interval(period: Duration) -> Interval {
130 Interval::new(period)
131}
132
133#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::executor::block_on_with_spawn;
139
140 #[test]
141 fn interval_fires_multiple_times() {
142 block_on_with_spawn(async {
143 let mut ticker = interval(Duration::from_millis(50));
144 let before = Instant::now();
145
146 ticker.tick().await;
147 ticker.tick().await;
148 ticker.tick().await;
149
150 let elapsed = before.elapsed();
151 assert!(
153 elapsed >= Duration::from_millis(120),
154 "interval fired too fast: {:?}",
155 elapsed
156 );
157 assert!(
158 elapsed < Duration::from_millis(1000),
159 "interval took too long: {:?}",
160 elapsed
161 );
162 });
163 }
164
165 #[test]
166 #[should_panic(expected = "non-zero")]
167 fn interval_zero_period_panics() {
168 let _ = interval(Duration::ZERO);
169 }
170
171 #[test]
172 fn interval_tracks_missed_ticks() {
173 let period = Duration::from_millis(20);
176 let mut ticker = interval(period);
177
178 let wait_until = Instant::now() + period * 3;
180 while Instant::now() < wait_until {
181 std::hint::spin_loop();
182 }
183
184 block_on_with_spawn(async move {
186 let now = Instant::now();
187 ticker.tick().await;
188 let elapsed = now.elapsed();
189 assert!(
191 elapsed < Duration::from_millis(50),
192 "missed tick must resolve immediately, took {:?}",
193 elapsed
194 );
195 });
196 }
197
198 #[test]
201 fn interval_first_tick_after_one_period() {
202 block_on_with_spawn(async {
203 let period = Duration::from_millis(30);
204 let mut ticker = interval(period);
205 let before = Instant::now();
206 ticker.tick().await;
207 assert!(before.elapsed() >= Duration::from_millis(25));
208 });
209 }
210
211 #[test]
212 fn interval_drop_tick_future_cleans_timer() {
213 block_on_with_spawn(async {
214 let mut ticker = interval(Duration::from_secs(10));
215 {
216 let tick_fut = ticker.tick();
217 drop(tick_fut); }
219 });
221 }
222
223 #[test]
224 fn interval_concurrent_5_tickers() {
225 use crate::executor::spawn;
226 block_on_with_spawn(async {
227 let before = Instant::now();
228 let mut handles = Vec::new();
229 for _ in 0..5 {
230 handles.push(spawn(async {
231 let mut t = interval(Duration::from_millis(40));
232 t.tick().await;
233 }));
234 }
235 for h in handles {
236 h.await.unwrap();
237 }
238 assert!(before.elapsed() < Duration::from_millis(500));
240 });
241 }
242
243 #[test]
244 fn interval_missed_tick_returns_fast() {
245 let period = Duration::from_millis(10);
246 let mut ticker = interval(period);
247 let wait = Instant::now() + period * 4;
249 while Instant::now() < wait {
250 std::hint::spin_loop();
251 }
252 block_on_with_spawn(async move {
253 let now = Instant::now();
254 ticker.tick().await;
255 assert!(
256 now.elapsed() < Duration::from_millis(50),
257 "missed tick must return fast"
258 );
259 });
260 }
261
262 #[test]
263 fn interval_two_ticks_cumulative_time() {
264 block_on_with_spawn(async {
265 let period = Duration::from_millis(20);
266 let mut ticker = interval(period);
267 let before = Instant::now();
268 ticker.tick().await;
269 ticker.tick().await;
270 assert!(before.elapsed() >= Duration::from_millis(30));
272 });
273 }
274
275 #[test]
276 fn interval_tick_returns_instant() {
277 block_on_with_spawn(async {
278 let period = Duration::from_millis(20);
279 let mut ticker = interval(period);
280 let fired_at = ticker.tick().await;
281 assert!(fired_at <= Instant::now());
283 });
284 }
285
286 #[test]
287 fn interval_three_sequential_ticks() {
288 block_on_with_spawn(async {
289 let period = Duration::from_millis(20);
290 let mut ticker = interval(period);
291 for _ in 0..3 {
292 ticker.tick().await;
293 }
294 });
296 }
297
298 #[test]
299 fn interval_period_1ms_fires_fast() {
300 block_on_with_spawn(async {
301 let mut ticker = interval(Duration::from_millis(1));
302 let before = Instant::now();
303 ticker.tick().await;
304 assert!(before.elapsed() < Duration::from_millis(200));
306 });
307 }
308}