crb_core_web/
task.rs

1//! Task module for spawning async tasks
2//! in WASM environment.
3
4use futures::channel::oneshot;
5use futures::future::{AbortHandle, Abortable, Aborted};
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use thiserror::Error;
10
11/// Error type returned when awaiting a spawned task.
12#[derive(Error, Debug)]
13pub enum JoinError {
14    /// The task failed to send a result (e.g., the channel was dropped).
15    #[error("The task is cancelled")]
16    Canceled,
17    /// The task was aborted.
18    #[error("The task is aborted")]
19    Aborted,
20}
21
22/// JoinHandle wraps a oneshot::Receiver for obtaining the task’s result,
23/// along with an AbortHandle to allow the task to be aborted.
24pub struct JoinHandle<T> {
25    receiver: oneshot::Receiver<Result<T, Aborted>>,
26    abort_handle: AbortHandle,
27}
28
29impl<T> Future for JoinHandle<T> {
30    type Output = Result<T, JoinError>;
31
32    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33        match Pin::new(&mut self.receiver).poll(cx) {
34            // The task completed and sent a result.
35            Poll::Ready(Ok(result)) => Poll::Ready(match result {
36                Ok(val) => Ok(val),
37                Err(_) => Err(JoinError::Aborted),
38            }),
39            // The oneshot channel was closed before a value could be sent.
40            Poll::Ready(Err(_)) => Poll::Ready(Err(JoinError::Canceled)),
41            Poll::Pending => Poll::Pending,
42        }
43    }
44}
45
46impl<T> JoinHandle<T> {
47    /// Aborts the spawned task.
48    pub fn abort(&self) {
49        self.abort_handle.abort();
50    }
51}
52
53/// Spawns a future as an abortable task using spawn_local.
54/// This function is equivalent to tokio's spawn_local.
55pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
56where
57    F: Future + 'static,
58    F::Output: 'static,
59{
60    // Create a oneshot channel for sending the result.
61    let (sender, receiver) = oneshot::channel();
62    // Create an AbortHandle and AbortRegistration pair.
63    let (abort_handle, abort_registration) = AbortHandle::new_pair();
64
65    // Wrap the future in Abortable so it can be aborted.
66    let abortable_future = Abortable::new(future, abort_registration);
67
68    // The wrapped future awaits completion or abortion, then sends its result.
69    let wrapped_future = async move {
70        let res = abortable_future.await;
71        // Ignore errors if the receiver has been dropped.
72        let _ = sender.send(res);
73    };
74
75    wasm_bindgen_futures::spawn_local(wrapped_future);
76
77    JoinHandle {
78        receiver,
79        abort_handle,
80    }
81}
82
83/// Spawns a future as an abortable task.
84/// In WASM, spawn and spawn_local are equivalent.
85pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
86where
87    F: Future + 'static,
88    F::Output: 'static,
89{
90    spawn_local(future)
91}
92
93/// Spawns a blocking task by wrapping the blocking function in an async block.
94/// In WASM this is equivalent to spawn_local since true blocking cannot be offloaded to another thread.
95pub fn spawn_blocking<F, R>(blocking_func: F) -> JoinHandle<R>
96where
97    F: FnOnce() -> R + 'static,
98    R: 'static,
99{
100    spawn_local(async move { blocking_func() })
101}