agnostic_lite/wasm/
after.rs1use std::sync::{atomic::Ordering, Arc};
2
3use atomic_time::AtomicOptionDuration;
4use futures_util::{FutureExt, StreamExt};
5use wasm::channel::{
6 mpsc::{unbounded, UnboundedSender},
7 oneshot::{channel, Sender},
8};
9
10use crate::{
11 spawner::{AfterHandle, handle::JoinError},
12 time::AsyncSleep,
13 AfterHandleError, AfterHandleSignals, AsyncAfterSpawner, Canceled,
14};
15
16use super::{super::RuntimeLite, *};
17
18pub(crate) struct Resetter {
19 duration: Arc<AtomicOptionDuration>,
20 tx: UnboundedSender<()>,
21}
22
23impl Resetter {
24 pub(crate) fn new(duration: Arc<AtomicOptionDuration>, tx: UnboundedSender<()>) -> Self {
25 Self { duration, tx }
26 }
27
28 pub(crate) fn reset(&self, duration: Duration) {
29 self.duration.store(Some(duration), Ordering::Release);
30 }
31}
32
33macro_rules! spawn_after {
34 ($spawn:ident, $sleep:ident($trait:ident) -> ($instant:ident, $future:ident)) => {{
35 let (tx, rx) = channel::<()>();
36 let (abort_tx, abort_rx) = channel::<()>();
37 let signals = Arc::new(AfterHandleSignals::new());
38 let (reset_tx, mut reset_rx) = unbounded::<()>();
39 let duration = Arc::new(AtomicOptionDuration::none());
40 let resetter = Resetter::new(duration.clone(), reset_tx);
41 let s1 = signals.clone();
42 let h = WasmRuntime::$spawn(async move {
43 let delay = WasmRuntime::$sleep($instant);
44 let future = $future.fuse();
45 futures_util::pin_mut!(delay);
46 futures_util::pin_mut!(rx);
47 futures_util::pin_mut!(abort_rx);
48 futures_util::pin_mut!(future);
49 loop {
50 futures_util::select_biased! {
51 res = abort_rx => {
52 if res.is_ok() {
53 return Err(Canceled);
54 }
55 delay.await;
56 let res = future.await;
57 s1.set_finished();
58 return Ok(res);
59 }
60 res = rx => {
61 if res.is_ok() {
62 return Err(Canceled);
63 }
64
65 delay.await;
66 let res = future.await;
67 s1.set_finished();
68 return Ok(res);
69 }
70 res = reset_rx.next() => {
71 if res.is_none() {
72 delay.await;
73 let res = future.await;
74 s1.set_finished();
75 return Ok(res);
76 }
77
78 if let Some(d) = duration.load(Ordering::Acquire) {
79 if $instant.checked_sub(d).is_some() {
80 s1.set_expired();
81
82 futures_util::select_biased! {
83 res = &mut future => {
84 s1.set_finished();
85 return Ok(res);
86 }
87 canceled = &mut rx => {
88 if canceled.is_ok() {
89 return Err(Canceled);
90 }
91 delay.await;
92 s1.set_expired();
93 let res = future.await;
94 s1.set_finished();
95 return Ok(res);
96 }
97 }
98 }
99
100 match $instant.checked_sub(d) {
101 Some(v) => {
102 $trait::reset(delay.as_mut(), v);
103 },
104 None => {
105 match d.checked_sub($instant.elapsed()) {
106 Some(v) => {
107 $trait::reset(delay.as_mut(), Instant::now() + v);
108 },
109 None => {
110 s1.set_expired();
111
112 futures_util::select_biased! {
113 res = &mut future => {
114 s1.set_finished();
115 return Ok(res);
116 }
117 canceled = &mut rx => {
118 if canceled.is_ok() {
119 return Err(Canceled);
120 }
121 delay.await;
122 s1.set_expired();
123 let res = future.await;
124 s1.set_finished();
125 return Ok(res);
126 }
127 }
128 },
129 }
130 },
131 }
132 }
133 }
134 _ = delay.as_mut().fuse() => {
135 s1.set_expired();
136 futures_util::select_biased! {
137 res = abort_rx => {
138 if res.is_ok() {
139 return Err(Canceled);
140 }
141 let res = future.await;
142 s1.set_finished();
143 return Ok(res);
144 }
145 res = rx => {
146 if res.is_ok() {
147 return Err(Canceled);
148 }
149 let res = future.await;
150 s1.set_finished();
151 return Ok(res);
152 }
153 res = future => {
154 s1.set_finished();
155 return Ok(res);
156 }
157 }
158 }
159 }
160 }
161 });
162
163 WasmAfterHandle {
164 handle: h,
165 resetter,
166 signals,
167 abort_tx,
168 tx,
169 }
170 }};
171}
172
173#[pin_project::pin_project]
175pub struct WasmAfterHandle<O>
176where
177 O: 'static,
178{
179 #[pin]
180 handle: JoinHandle<Result<O, Canceled>>,
181 signals: Arc<AfterHandleSignals>,
182 abort_tx: Sender<()>,
183 resetter: Resetter,
184 tx: Sender<()>,
185}
186
187impl<O: 'static> Future for WasmAfterHandle<O> {
188 type Output = Result<O, AfterHandleError<JoinError>>;
189
190 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191 let this = self.project();
192 match this.handle.poll(cx) {
193 Poll::Ready(v) => match v {
194 Ok(v) => Poll::Ready(v.map_err(|_| AfterHandleError::Canceled)),
195 Err(e) => Poll::Ready(Err(AfterHandleError::Join(e))),
196 },
197 Poll::Pending => Poll::Pending,
198 }
199 }
200}
201
202impl<O> AfterHandle<O> for WasmAfterHandle<O>
203where
204 O: Send + 'static,
205{
206 type JoinError = AfterHandleError<JoinError>;
207
208 async fn cancel(self) -> Option<Result<O, Self::JoinError>> {
209 if AfterHandle::is_finished(&self) {
210 return Some(
211 self
212 .handle
213 .await
214 .map_err(AfterHandleError::Join)
215 .and_then(|v| v.map_err(|_| AfterHandleError::Canceled)),
216 );
217 }
218
219 let _ = self.tx.send(());
220 None
221 }
222
223 fn reset(&self, duration: Duration) {
224 self.resetter.reset(duration);
225 let _ = self.resetter.tx.unbounded_send(());
226 }
227
228 #[inline]
229 fn is_finished(&self) -> bool {
230 self.signals.is_finished()
231 }
232
233 #[inline]
234 fn is_expired(&self) -> bool {
235 self.signals.is_expired()
236 }
237
238 #[inline]
239 fn abort(self) {
240 let _ = self.abort_tx.send(());
241 }
242}
243
244impl AsyncAfterSpawner for WasmSpawner {
245 type Instant = Instant;
246 type JoinHandle<F>
247 = WasmAfterHandle<F>
248 where
249 F: Send + 'static;
250
251 fn spawn_after<F>(duration: core::time::Duration, future: F) -> Self::JoinHandle<F::Output>
252 where
253 F::Output: Send + 'static,
254 F: Future + Send + 'static,
255 {
256 Self::spawn_after_at(Instant::now() + duration, future)
257 }
258
259 fn spawn_after_at<F>(instant: Instant, future: F) -> Self::JoinHandle<F::Output>
260 where
261 F::Output: Send + 'static,
262 F: Future + Send + 'static,
263 {
264 spawn_after!(spawn, sleep_until(AsyncSleep) -> (instant, future))
265 }
266}
267
268
269#[cfg(all(test, target_arch = "wasm32"))]
270mod tests {
271 use super::*;
272 use wasm_bindgen_test::*;
273
274 #[wasm_bindgen_test]
275 async fn test_after_handle() {
276 crate::tests::spawn_after_unittest::<WasmRuntime>().await;
277 }
278
279 #[wasm_bindgen_test]
280 async fn test_after_drop() {
281 crate::tests::spawn_after_drop_unittest::<WasmRuntime>().await;
282 }
283
284 #[wasm_bindgen_test]
285 async fn test_after_cancel() {
286 crate::tests::spawn_after_cancel_unittest::<WasmRuntime>().await;
287 }
288
289 #[wasm_bindgen_test]
290 async fn test_after_abort() {
291 crate::tests::spawn_after_abort_unittest::<WasmRuntime>().await;
292 }
293
294 #[wasm_bindgen_test]
295 async fn test_after_reset_to_pass() {
296 crate::tests::spawn_after_reset_to_pass_unittest::<WasmRuntime>().await;
297 }
298
299 #[wasm_bindgen_test]
300 async fn test_after_reset_to_future() {
301 crate::tests::spawn_after_reset_to_future_unittest::<WasmRuntime>().await;
302 }
303}