agnostic_lite/wasm/
interval.rs1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4 time::Duration,
5};
6use std::time::Instant;
7
8use futures_util::{stream::Stream, FutureExt};
9
10use crate::time::{AsyncLocalInterval, AsyncLocalIntervalExt, AsyncSleep, AsyncSleepExt};
11
12use super::WasmSleep;
13
14pin_project_lite::pin_project! {
15 pub struct WasmInterval {
19 #[pin]
20 inner: Pin<Box<WasmSleep>>,
21 first: bool,
22 }
23}
24
25impl Stream for WasmInterval {
26 type Item = Instant;
27
28 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29 if self.first {
30 self.first = false;
31 return Poll::Ready(Some(self.inner.ddl - self.inner.duration));
32 }
33
34 let mut this = self.project();
35 match this.inner.poll_unpin(cx) {
36 Poll::Ready(ins) => {
37 let duration = this.inner.duration;
38 Pin::new(&mut **this.inner).reset(Instant::now() + duration);
39 Poll::Ready(Some(ins))
40 }
41 Poll::Pending => Poll::Pending,
42 }
43 }
44}
45
46impl AsyncLocalInterval for WasmInterval {
47 type Instant = Instant;
48
49 fn reset(&mut self, interval: Duration) {
50 Pin::new(&mut *self.inner).reset(Instant::now() + interval);
51 }
52
53 fn reset_at(&mut self, instant: Instant) {
54 Pin::new(&mut *self.inner).reset(instant);
55 }
56
57 fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
58 if self.first {
59 self.first = false;
60 return Poll::Ready(self.inner.ddl - self.inner.duration);
61 }
62
63 let duration = self.inner.duration;
64 let mut this = Pin::new(&mut *self.inner);
65 match this.poll_unpin(cx) {
66 Poll::Ready(ins) => {
67 this.reset(Instant::now() + duration);
68 Poll::Ready(ins)
69 }
70 Poll::Pending => Poll::Pending,
71 }
72 }
73}
74
75impl AsyncLocalIntervalExt for WasmInterval {
76 fn interval_local(period: Duration) -> Self
77 where
78 Self: Sized,
79 {
80 Self {
81 inner: Box::pin(WasmSleep::sleep(period)),
82 first: true,
83 }
84 }
85
86 fn interval_local_at(start: Instant, period: Duration) -> Self
87 where
88 Self: Sized,
89 {
90 Self {
91 inner: Box::pin(WasmSleep::sleep_until(start + period)),
92 first: true,
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use futures::StreamExt;
100
101 use super::WasmInterval;
102 use crate::time::{AsyncInterval, AsyncIntervalExt};
103 use std::time::{Duration, Instant};
104
105 const INTERVAL: Duration = Duration::from_millis(100);
106 const BOUND: Duration = Duration::from_millis(50);
107 const IMMEDIATE: Duration = Duration::from_millis(1);
108
109 #[test]
110 fn test_interval() {
111 futures::executor::block_on(async {
112 let start = Instant::now();
113 let interval = WasmInterval::interval(INTERVAL);
114 let mut interval = interval.take(4);
115 let ins = interval.next().await.unwrap();
117 let elapsed = start.elapsed();
118 assert!(ins <= start + IMMEDIATE);
119 assert!(elapsed <= IMMEDIATE + BOUND);
120
121 let ins = interval.next().await.unwrap();
122 let elapsed = start.elapsed();
123 assert!(ins >= start + INTERVAL - BOUND);
124 assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
125
126 let ins = interval.next().await.unwrap();
127 let elapsed = start.elapsed();
128 assert!(ins >= start + INTERVAL * 2 - BOUND);
129 assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
130
131 let ins = interval.next().await.unwrap();
132 let elapsed = start.elapsed();
133 assert!(ins >= start + INTERVAL * 3 - BOUND);
134 assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
135
136 assert!(interval.next().await.is_none());
137 });
138 }
139
140 #[test]
141 fn test_interval_at() {
142 futures::executor::block_on(async {
143 let start = Instant::now();
144 let interval = WasmInterval::interval_at(Instant::now(), INTERVAL);
145 let mut interval = interval.take(4);
146 let ins = interval.next().await.unwrap();
148 let elapsed = start.elapsed();
149 assert!(ins <= start + IMMEDIATE);
150 assert!(elapsed <= IMMEDIATE + BOUND);
151
152 let ins = interval.next().await.unwrap();
153 let elapsed = start.elapsed();
154 assert!(ins >= start + INTERVAL - BOUND);
155 assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
156
157 let ins = interval.next().await.unwrap();
158 let elapsed = start.elapsed();
159 assert!(ins >= start + INTERVAL * 2 - BOUND);
160 assert!(elapsed >= INTERVAL * 2 - BOUND && elapsed <= INTERVAL * 2 + BOUND);
161
162 let ins = interval.next().await.unwrap();
163 let elapsed = start.elapsed();
164 assert!(ins >= start + INTERVAL * 3 - BOUND);
165 assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
166
167 assert!(interval.next().await.is_none());
168 });
169 }
170
171 #[test]
172 fn test_interval_reset() {
173 futures::executor::block_on(async {
174 let start = Instant::now();
175 let mut interval = WasmInterval::interval(INTERVAL);
176 let ins = interval.next().await.unwrap();
178 let elapsed = start.elapsed();
179 assert!(ins <= start + IMMEDIATE);
180 assert!(elapsed <= IMMEDIATE + BOUND);
181
182 let ins = interval.next().await.unwrap();
183 let elapsed = start.elapsed();
184 assert!(ins >= start + INTERVAL - BOUND);
185 assert!(elapsed >= INTERVAL - BOUND && elapsed <= INTERVAL + BOUND);
186
187 interval.reset(INTERVAL * 2);
189 let ins = interval.next().await.unwrap();
190 let elapsed = start.elapsed();
191 assert!(ins >= start + INTERVAL * 3 - BOUND);
193 assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
194
195 let ins = interval.next().await.unwrap();
196 let elapsed = start.elapsed();
197 assert!(ins >= start + INTERVAL * 4 - BOUND);
199 assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
200 });
201 }
202
203 #[test]
204 fn test_interval_reset_at() {
205 futures::executor::block_on(async {
206 let start = Instant::now();
207 let mut interval = WasmInterval::interval(INTERVAL);
208 let ins = interval.next().await.unwrap();
210 let elapsed = start.elapsed();
211 assert!(ins <= start + IMMEDIATE);
212 assert!(elapsed <= IMMEDIATE + BOUND);
213
214 let ins = interval.next().await.unwrap();
215 let elapsed = start.elapsed();
216 assert!(ins >= start + INTERVAL);
217 assert!(elapsed >= INTERVAL && elapsed <= INTERVAL + BOUND);
218
219 interval.reset_at(start + INTERVAL * 3);
221 let ins = interval.next().await.unwrap();
222 let elapsed = start.elapsed();
223 assert!(ins >= start + INTERVAL * 3);
225 assert!(elapsed >= INTERVAL * 3 - BOUND && elapsed <= INTERVAL * 3 + BOUND);
226
227 let ins = interval.next().await.unwrap();
228 let elapsed = start.elapsed();
229 assert!(ins >= start + INTERVAL * 4 - BOUND);
231 assert!(elapsed >= INTERVAL * 4 - BOUND && elapsed <= INTERVAL * 4 + BOUND);
232 });
233 }
234}