agnostic_lite/wasm/
after.rs

1use std::sync::{atomic::Ordering, Arc};
2
3use atomic_time::AtomicOptionDuration;
4use futures_util::{FutureExt, StreamExt};
5use wasm::channel::{
6  mpsc::{unbounded, UnboundedSender},
7  oneshot::{channel, Sender},
8};
9
10use crate::{
11  spawner::{AfterHandle, handle::JoinError},
12  time::AsyncSleep,
13  AfterHandleError, AfterHandleSignals, AsyncAfterSpawner, Canceled,
14};
15
16use super::{super::RuntimeLite, *};
17
18pub(crate) struct Resetter {
19  duration: Arc<AtomicOptionDuration>,
20  tx: UnboundedSender<()>,
21}
22
23impl Resetter {
24  pub(crate) fn new(duration: Arc<AtomicOptionDuration>, tx: UnboundedSender<()>) -> Self {
25    Self { duration, tx }
26  }
27
28  pub(crate) fn reset(&self, duration: Duration) {
29    self.duration.store(Some(duration), Ordering::Release);
30  }
31}
32
33macro_rules! spawn_after {
34  ($spawn:ident, $sleep:ident($trait:ident) -> ($instant:ident, $future:ident)) => {{
35    let (tx, rx) = channel::<()>();
36    let (abort_tx, abort_rx) = channel::<()>();
37    let signals = Arc::new(AfterHandleSignals::new());
38    let (reset_tx, mut reset_rx) = unbounded::<()>();
39    let duration = Arc::new(AtomicOptionDuration::none());
40    let resetter = Resetter::new(duration.clone(), reset_tx);
41    let s1 = signals.clone();
42    let h = WasmRuntime::$spawn(async move {
43      let delay = WasmRuntime::$sleep($instant);
44      let future = $future.fuse();
45      futures_util::pin_mut!(delay);
46      futures_util::pin_mut!(rx);
47      futures_util::pin_mut!(abort_rx);
48      futures_util::pin_mut!(future);
49      loop {
50        futures_util::select_biased! {
51          res = abort_rx => {
52            if res.is_ok() {
53              return Err(Canceled);
54            }
55            delay.await;
56            let res = future.await;
57            s1.set_finished();
58            return Ok(res);
59          }
60          res = rx => {
61            if res.is_ok() {
62              return Err(Canceled);
63            }
64
65            delay.await;
66            let res = future.await;
67            s1.set_finished();
68            return Ok(res);
69          }
70          res = reset_rx.next() => {
71            if res.is_none() {
72              delay.await;
73              let res = future.await;
74              s1.set_finished();
75              return Ok(res);
76            }
77
78            if let Some(d) = duration.load(Ordering::Acquire) {
79              if $instant.checked_sub(d).is_some() {
80                s1.set_expired();
81
82                futures_util::select_biased! {
83                  res = &mut future => {
84                    s1.set_finished();
85                    return Ok(res);
86                  }
87                  canceled = &mut rx => {
88                    if canceled.is_ok() {
89                      return Err(Canceled);
90                    }
91                    delay.await;
92                    s1.set_expired();
93                    let res = future.await;
94                    s1.set_finished();
95                    return Ok(res);
96                  }
97                }
98              }
99
100              match $instant.checked_sub(d) {
101                Some(v) => {
102                  $trait::reset(delay.as_mut(), v);
103                },
104                None => {
105                  match d.checked_sub($instant.elapsed()) {
106                    Some(v) => {
107                      $trait::reset(delay.as_mut(), Instant::now() + v);
108                    },
109                    None => {
110                      s1.set_expired();
111
112                      futures_util::select_biased! {
113                        res = &mut future => {
114                          s1.set_finished();
115                          return Ok(res);
116                        }
117                        canceled = &mut rx => {
118                          if canceled.is_ok() {
119                            return Err(Canceled);
120                          }
121                          delay.await;
122                          s1.set_expired();
123                          let res = future.await;
124                          s1.set_finished();
125                          return Ok(res);
126                        }
127                      }
128                    },
129                  }
130                },
131              }
132            }
133          }
134          _ = delay.as_mut().fuse() => {
135            s1.set_expired();
136            futures_util::select_biased! {
137              res = abort_rx => {
138                if res.is_ok() {
139                  return Err(Canceled);
140                }
141                let res = future.await;
142                s1.set_finished();
143                return Ok(res);
144              }
145              res = rx => {
146                if res.is_ok() {
147                  return Err(Canceled);
148                }
149                let res = future.await;
150                s1.set_finished();
151                return Ok(res);
152              }
153              res = future => {
154                s1.set_finished();
155                return Ok(res);
156              }
157            }
158          }
159        }
160      }
161    });
162
163    WasmAfterHandle {
164      handle: h,
165      resetter,
166      signals,
167      abort_tx,
168      tx,
169    }
170  }};
171}
172
173/// The handle return by [`RuntimeLite::spawn_after`] or [`RuntimeLite::spawn_after_at`]
174#[pin_project::pin_project]
175pub struct WasmAfterHandle<O>
176where
177  O: 'static,
178{
179  #[pin]
180  handle: JoinHandle<Result<O, Canceled>>,
181  signals: Arc<AfterHandleSignals>,
182  abort_tx: Sender<()>,
183  resetter: Resetter,
184  tx: Sender<()>,
185}
186
187impl<O: 'static> Future for WasmAfterHandle<O> {
188  type Output = Result<O, AfterHandleError<JoinError>>;
189
190  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191    let this = self.project();
192    match this.handle.poll(cx) {
193      Poll::Ready(v) => match v {
194        Ok(v) => Poll::Ready(v.map_err(|_| AfterHandleError::Canceled)),
195        Err(e) => Poll::Ready(Err(AfterHandleError::Join(e))),
196      },
197      Poll::Pending => Poll::Pending,
198    }
199  }
200}
201
202impl<O> AfterHandle<O> for WasmAfterHandle<O>
203where
204  O: Send + 'static,
205{
206  type JoinError = AfterHandleError<JoinError>;
207
208  async fn cancel(self) -> Option<Result<O, Self::JoinError>> {
209    if AfterHandle::is_finished(&self) {
210      return Some(
211        self
212          .handle
213          .await
214          .map_err(AfterHandleError::Join)
215          .and_then(|v| v.map_err(|_| AfterHandleError::Canceled)),
216      );
217    }
218
219    let _ = self.tx.send(());
220    None
221  }
222
223  fn reset(&self, duration: Duration) {
224    self.resetter.reset(duration);
225    let _ = self.resetter.tx.unbounded_send(());
226  }
227
228  #[inline]
229  fn is_finished(&self) -> bool {
230    self.signals.is_finished()
231  }
232
233  #[inline]
234  fn is_expired(&self) -> bool {
235    self.signals.is_expired()
236  }
237
238  #[inline]
239  fn abort(self) {
240    let _ = self.abort_tx.send(());
241  }
242}
243
244impl AsyncAfterSpawner for WasmSpawner {
245  type Instant = Instant;
246  type JoinHandle<F>
247    = WasmAfterHandle<F>
248  where
249    F: Send + 'static;
250
251  fn spawn_after<F>(duration: core::time::Duration, future: F) -> Self::JoinHandle<F::Output>
252  where
253    F::Output: Send + 'static,
254    F: Future + Send + 'static,
255  {
256    Self::spawn_after_at(Instant::now() + duration, future)
257  }
258
259  fn spawn_after_at<F>(instant: Instant, future: F) -> Self::JoinHandle<F::Output>
260  where
261    F::Output: Send + 'static,
262    F: Future + Send + 'static,
263  {
264    spawn_after!(spawn, sleep_until(AsyncSleep) -> (instant, future))
265  }
266}
267
268
269#[cfg(all(test, target_arch = "wasm32"))]
270mod tests {
271  use super::*;
272  use wasm_bindgen_test::*;
273
274  #[wasm_bindgen_test]
275  async fn test_after_handle() {
276    crate::tests::spawn_after_unittest::<WasmRuntime>().await;
277  }
278
279  #[wasm_bindgen_test]
280  async fn test_after_drop() {
281    crate::tests::spawn_after_drop_unittest::<WasmRuntime>().await;
282  }
283
284  #[wasm_bindgen_test]
285  async fn test_after_cancel() {
286    crate::tests::spawn_after_cancel_unittest::<WasmRuntime>().await;
287  }
288
289  #[wasm_bindgen_test]
290  async fn test_after_abort() {
291    crate::tests::spawn_after_abort_unittest::<WasmRuntime>().await;
292  }
293
294  #[wasm_bindgen_test]
295  async fn test_after_reset_to_pass() {
296    crate::tests::spawn_after_reset_to_pass_unittest::<WasmRuntime>().await;
297  }
298
299  #[wasm_bindgen_test]
300  async fn test_after_reset_to_future() {
301    crate::tests::spawn_after_reset_to_future_unittest::<WasmRuntime>().await;
302  }
303}