1use futures::channel::oneshot;
5use futures::future::{AbortHandle, Abortable, Aborted};
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use thiserror::Error;
10
11#[derive(Error, Debug)]
13pub enum JoinError {
14 #[error("The task is cancelled")]
16 Canceled,
17 #[error("The task is aborted")]
19 Aborted,
20}
21
22pub struct JoinHandle<T> {
25 receiver: oneshot::Receiver<Result<T, Aborted>>,
26 abort_handle: AbortHandle,
27}
28
29impl<T> Future for JoinHandle<T> {
30 type Output = Result<T, JoinError>;
31
32 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33 match Pin::new(&mut self.receiver).poll(cx) {
34 Poll::Ready(Ok(result)) => Poll::Ready(match result {
36 Ok(val) => Ok(val),
37 Err(_) => Err(JoinError::Aborted),
38 }),
39 Poll::Ready(Err(_)) => Poll::Ready(Err(JoinError::Canceled)),
41 Poll::Pending => Poll::Pending,
42 }
43 }
44}
45
46impl<T> JoinHandle<T> {
47 pub fn abort(&self) {
49 self.abort_handle.abort();
50 }
51}
52
53pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
56where
57 F: Future + 'static,
58 F::Output: 'static,
59{
60 let (sender, receiver) = oneshot::channel();
62 let (abort_handle, abort_registration) = AbortHandle::new_pair();
64
65 let abortable_future = Abortable::new(future, abort_registration);
67
68 let wrapped_future = async move {
70 let res = abortable_future.await;
71 let _ = sender.send(res);
73 };
74
75 wasm_bindgen_futures::spawn_local(wrapped_future);
76
77 JoinHandle {
78 receiver,
79 abort_handle,
80 }
81}
82
83pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
86where
87 F: Future + 'static,
88 F::Output: 'static,
89{
90 spawn_local(future)
91}
92
93pub fn spawn_blocking<F, R>(blocking_func: F) -> JoinHandle<R>
96where
97 F: FnOnce() -> R + 'static,
98 R: 'static,
99{
100 spawn_local(async move { blocking_func() })
101}