agnostic_lite/async_io/
interval.rs

1use core::{
2  task::{Context, Poll},
3  time::Duration,
4};
5use std::time::Instant;
6
7use ::async_io::Timer;
8use futures_util::FutureExt;
9
10use crate::time::{AsyncLocalInterval, AsyncLocalIntervalExt};
11
12/// The [`AsyncInterval`](crate::time::AsyncInterval) implementation for any runtime based on [`async-io`](async_io), e.g. `async-std` and `smol`.
13pub type AsyncIoInterval = Timer;
14
15impl AsyncLocalInterval for Timer {
16  type Instant = Instant;
17
18  fn reset(&mut self, interval: Duration) {
19    self.set_after(interval)
20  }
21
22  fn reset_at(&mut self, deadline: Instant) {
23    self.set_at(deadline);
24  }
25
26  fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
27    self.poll_unpin(cx)
28  }
29}
30
31impl AsyncLocalIntervalExt for Timer {
32  fn interval_local(period: Duration) -> Self
33  where
34    Self: Sized,
35  {
36    Timer::interval(period)
37  }
38
39  fn interval_local_at(start: Instant, period: Duration) -> Self
40  where
41    Self: Sized,
42  {
43    Timer::interval_at(start, period)
44  }
45}
46
47#[cfg(test)]
48mod tests {
49  use futures::StreamExt;
50
51  use super::AsyncIoInterval;
52  use crate::time::{AsyncInterval, AsyncIntervalExt};
53  use std::time::{Duration, Instant};
54
55  const INTERVAL: Duration = Duration::from_millis(100);
56  const BOUND: Duration = Duration::from_millis(50);
57  const IMMEDIATE: Duration = Duration::from_millis(1);
58
59  #[test]
60  fn test_interval() {
61    futures::executor::block_on(async {
62      let start = Instant::now();
63      let interval = <AsyncIoInterval as AsyncIntervalExt>::interval(INTERVAL);
64      let mut interval = interval.take(3);
65
66      let ins = interval.next().await.unwrap();
67      let elapsed = start.elapsed();
68      assert!(ins >= start + INTERVAL - BOUND);
69      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
70
71      let ins = interval.next().await.unwrap();
72      let elapsed = start.elapsed();
73      assert!(ins >= start + INTERVAL * 2 - BOUND);
74      assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
75
76      let ins = interval.next().await.unwrap();
77      let elapsed = start.elapsed();
78      assert!(ins >= start + INTERVAL * 3 - BOUND);
79      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
80
81      assert!(interval.next().await.is_none());
82    });
83  }
84
85  #[test]
86  fn test_interval_at() {
87    futures::executor::block_on(async {
88      let start = Instant::now();
89      let interval = <AsyncIoInterval as AsyncIntervalExt>::interval_at(Instant::now(), INTERVAL);
90      let mut interval = interval.take(4);
91      // The first tick is immediate
92      let ins = interval.next().await.unwrap();
93      let elapsed = start.elapsed();
94      assert!(ins <= start + IMMEDIATE);
95      assert!(elapsed <= IMMEDIATE + BOUND);
96
97      let ins = interval.next().await.unwrap();
98      let elapsed = start.elapsed();
99      assert!(ins >= start + INTERVAL - BOUND);
100      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
101
102      let ins = interval.next().await.unwrap();
103      let elapsed = start.elapsed();
104      assert!(ins >= start + INTERVAL * 2 - BOUND);
105      assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
106
107      let ins = interval.next().await.unwrap();
108      let elapsed = start.elapsed();
109      assert!(ins >= start + INTERVAL * 3 - BOUND);
110      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
111
112      assert!(interval.next().await.is_none());
113    });
114  }
115
116  #[test]
117  fn test_interval_reset() {
118    futures::executor::block_on(async {
119      let start = Instant::now();
120      let mut interval = <AsyncIoInterval as AsyncIntervalExt>::interval(INTERVAL);
121
122      let ins = interval.next().await.unwrap();
123      let elapsed = start.elapsed();
124      assert!(ins >= start + INTERVAL - BOUND);
125      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
126
127      // Reset the next tick to 2x
128      interval.reset(INTERVAL * 2);
129      let ins = interval.next().await.unwrap();
130      let elapsed = start.elapsed();
131      // interval + 2x interval, so 3 here
132      assert!(ins >= start + INTERVAL * 3 - BOUND);
133      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
134
135      let ins = interval.next().await.unwrap();
136      let elapsed = start.elapsed();
137      // interval + 2x interval + interval, so 4 here
138      assert!(ins >= start + INTERVAL * 4 - BOUND);
139      assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
140    });
141  }
142
143  #[test]
144  fn test_interval_reset_at() {
145    futures::executor::block_on(async {
146      let start = Instant::now();
147      let mut interval = <AsyncIoInterval as AsyncIntervalExt>::interval(INTERVAL);
148
149      let ins = interval.next().await.unwrap();
150      let elapsed = start.elapsed();
151      assert!(ins >= start + INTERVAL);
152      assert!(elapsed >= INTERVAL && elapsed <= INTERVAL + BOUND);
153
154      // Reset the next tick to 2x
155      interval.reset_at(start + INTERVAL * 3);
156      let ins = interval.next().await.unwrap();
157      let elapsed = start.elapsed();
158      // interval + 2x interval, so 3 here
159      assert!(ins >= start + INTERVAL * 3);
160      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
161
162      let ins = interval.next().await.unwrap();
163      let elapsed = start.elapsed();
164      // interval + 2x interval + interval, so 4 here
165      assert!(ins >= start + INTERVAL * 4 - BOUND);
166      assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
167    });
168  }
169}