agnostic_lite/wasm/
interval.rs

1use core::{
2  pin::Pin,
3  task::{Context, Poll},
4  time::Duration,
5};
6use std::time::Instant;
7
8use futures_util::{stream::Stream, FutureExt};
9
10use crate::time::{AsyncLocalInterval, AsyncLocalIntervalExt, AsyncSleep, AsyncSleepExt};
11
12use super::WasmSleep;
13
14pin_project_lite::pin_project! {
15  /// The [`AsyncInterval`] implementation for wasm runtime.
16  ///
17  /// **Note:** `WasmInterval` is not accurate below second level.
18  pub struct WasmInterval {
19    #[pin]
20    inner: Pin<Box<WasmSleep>>,
21    first: bool,
22  }
23}
24
25impl Stream for WasmInterval {
26  type Item = Instant;
27
28  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29    if self.first {
30      self.first = false;
31      return Poll::Ready(Some(self.inner.ddl - self.inner.duration));
32    }
33
34    let mut this = self.project();
35    match this.inner.poll_unpin(cx) {
36      Poll::Ready(ins) => {
37        let duration = this.inner.duration;
38        Pin::new(&mut **this.inner).reset(Instant::now() + duration);
39        Poll::Ready(Some(ins))
40      }
41      Poll::Pending => Poll::Pending,
42    }
43  }
44}
45
46impl AsyncLocalInterval for WasmInterval {
47  type Instant = Instant;
48
49  fn reset(&mut self, interval: Duration) {
50    Pin::new(&mut *self.inner).reset(Instant::now() + interval);
51  }
52
53  fn reset_at(&mut self, instant: Instant) {
54    Pin::new(&mut *self.inner).reset(instant);
55  }
56
57  fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
58    if self.first {
59      self.first = false;
60      return Poll::Ready(self.inner.ddl - self.inner.duration);
61    }
62
63    let duration = self.inner.duration;
64    let mut this = Pin::new(&mut *self.inner);
65    match this.poll_unpin(cx) {
66      Poll::Ready(ins) => {
67        this.reset(Instant::now() + duration);
68        Poll::Ready(ins)
69      }
70      Poll::Pending => Poll::Pending,
71    }
72  }
73}
74
75impl AsyncLocalIntervalExt for WasmInterval {
76  fn interval_local(period: Duration) -> Self
77  where
78    Self: Sized,
79  {
80    Self {
81      inner: Box::pin(WasmSleep::sleep(period)),
82      first: true,
83    }
84  }
85
86  fn interval_local_at(start: Instant, period: Duration) -> Self
87  where
88    Self: Sized,
89  {
90    Self {
91      inner: Box::pin(WasmSleep::sleep_until(start + period)),
92      first: true,
93    }
94  }
95}
96
97#[cfg(test)]
98mod tests {
99  use futures::StreamExt;
100
101  use super::WasmInterval;
102  use crate::time::{AsyncInterval, AsyncIntervalExt};
103  use std::time::{Duration, Instant};
104
105  const INTERVAL: Duration = Duration::from_millis(100);
106  const BOUND: Duration = Duration::from_millis(50);
107  const IMMEDIATE: Duration = Duration::from_millis(1);
108
109  #[test]
110  fn test_interval() {
111    futures::executor::block_on(async {
112      let start = Instant::now();
113      let interval = WasmInterval::interval(INTERVAL);
114      let mut interval = interval.take(4);
115      // The first tick is immediate
116      let ins = interval.next().await.unwrap();
117      let elapsed = start.elapsed();
118      assert!(ins <= start + IMMEDIATE);
119      assert!(elapsed <= IMMEDIATE + BOUND);
120
121      let ins = interval.next().await.unwrap();
122      let elapsed = start.elapsed();
123      assert!(ins >= start + INTERVAL - BOUND);
124      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
125
126      let ins = interval.next().await.unwrap();
127      let elapsed = start.elapsed();
128      assert!(ins >= start + INTERVAL * 2 - BOUND);
129      assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
130
131      let ins = interval.next().await.unwrap();
132      let elapsed = start.elapsed();
133      assert!(ins >= start + INTERVAL * 3 - BOUND);
134      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
135
136      assert!(interval.next().await.is_none());
137    });
138  }
139
140  #[test]
141  fn test_interval_at() {
142    futures::executor::block_on(async {
143      let start = Instant::now();
144      let interval = WasmInterval::interval_at(Instant::now(), INTERVAL);
145      let mut interval = interval.take(4);
146      // The first tick is immediate
147      let ins = interval.next().await.unwrap();
148      let elapsed = start.elapsed();
149      assert!(ins <= start + IMMEDIATE);
150      assert!(elapsed <= IMMEDIATE + BOUND);
151
152      let ins = interval.next().await.unwrap();
153      let elapsed = start.elapsed();
154      assert!(ins >= start + INTERVAL - BOUND);
155      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
156
157      let ins = interval.next().await.unwrap();
158      let elapsed = start.elapsed();
159      assert!(ins >= start + INTERVAL * 2 - BOUND);
160      assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
161
162      let ins = interval.next().await.unwrap();
163      let elapsed = start.elapsed();
164      assert!(ins >= start + INTERVAL * 3 - BOUND);
165      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
166
167      assert!(interval.next().await.is_none());
168    });
169  }
170
171  #[test]
172  fn test_interval_reset() {
173    futures::executor::block_on(async {
174      let start = Instant::now();
175      let mut interval = WasmInterval::interval(INTERVAL);
176      // The first tick is immediate
177      let ins = interval.next().await.unwrap();
178      let elapsed = start.elapsed();
179      assert!(ins <= start + IMMEDIATE);
180      assert!(elapsed <= IMMEDIATE + BOUND);
181
182      let ins = interval.next().await.unwrap();
183      let elapsed = start.elapsed();
184      assert!(ins >= start + INTERVAL - BOUND);
185      assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
186
187      // Reset the next tick to 2x
188      interval.reset(INTERVAL * 2);
189      let ins = interval.next().await.unwrap();
190      let elapsed = start.elapsed();
191      // interval + 2x interval, so 3 here
192      assert!(ins >= start + INTERVAL * 3 - BOUND);
193      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
194
195      let ins = interval.next().await.unwrap();
196      let elapsed = start.elapsed();
197      // interval + 2x interval + interval, so 4 here
198      assert!(ins >= start + INTERVAL * 4 - BOUND);
199      assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
200    });
201  }
202
203  #[test]
204  fn test_interval_reset_at() {
205    futures::executor::block_on(async {
206      let start = Instant::now();
207      let mut interval = WasmInterval::interval(INTERVAL);
208      // The first tick is immediate
209      let ins = interval.next().await.unwrap();
210      let elapsed = start.elapsed();
211      assert!(ins <= start + IMMEDIATE);
212      assert!(elapsed <= IMMEDIATE + BOUND);
213
214      let ins = interval.next().await.unwrap();
215      let elapsed = start.elapsed();
216      assert!(ins >= start + INTERVAL);
217      assert!(elapsed >= INTERVAL && elapsed <= INTERVAL + BOUND);
218
219      // Reset the next tick to 2x
220      interval.reset_at(start + INTERVAL * 3);
221      let ins = interval.next().await.unwrap();
222      let elapsed = start.elapsed();
223      // interval + 2x interval, so 3 here
224      assert!(ins >= start + INTERVAL * 3);
225      assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
226
227      let ins = interval.next().await.unwrap();
228      let elapsed = start.elapsed();
229      // interval + 2x interval + interval, so 4 here
230      assert!(ins >= start + INTERVAL * 4 - BOUND);
231      assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
232    });
233  }
234}