matrix_sdk_common/
executor.rs1#[cfg(not(target_family = "wasm"))]
23mod sys {
24 pub use tokio::{
25 runtime::{Handle, Runtime},
26 task::{spawn, AbortHandle, JoinError, JoinHandle},
27 };
28}
29
30#[cfg(target_family = "wasm")]
31mod sys {
32 use std::{
33 future::Future,
34 pin::Pin,
35 task::{Context, Poll},
36 };
37
38 pub use futures_util::future::AbortHandle;
39 use futures_util::{
40 future::{Abortable, RemoteHandle},
41 FutureExt,
42 };
43
44 #[derive(Debug)]
47 pub enum JoinError {
48 Cancelled,
49 Panic,
50 }
51
52 impl JoinError {
53 pub fn is_cancelled(&self) -> bool {
59 matches!(self, JoinError::Cancelled)
60 }
61 }
62
63 impl std::fmt::Display for JoinError {
64 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match &self {
66 JoinError::Cancelled => write!(fmt, "task was cancelled"),
67 JoinError::Panic => write!(fmt, "task panicked"),
68 }
69 }
70 }
71
72 #[derive(Debug)]
75 pub struct JoinHandle<T> {
76 remote_handle: Option<RemoteHandle<T>>,
77 abort_handle: AbortHandle,
78 }
79
80 impl<T> JoinHandle<T> {
81 pub fn abort(&self) {
83 self.abort_handle.abort();
84 }
85
86 pub fn abort_handle(&self) -> AbortHandle {
89 self.abort_handle.clone()
90 }
91
92 pub fn is_finished(&self) -> bool {
94 self.abort_handle.is_aborted()
95 }
96 }
97
98 impl<T> Drop for JoinHandle<T> {
99 fn drop(&mut self) {
100 if let Some(h) = self.remote_handle.take() {
102 h.forget();
103 }
104 }
105 }
106
107 impl<T: 'static> Future for JoinHandle<T> {
108 type Output = Result<T, JoinError>;
109
110 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111 if self.abort_handle.is_aborted() {
112 Poll::Ready(Err(JoinError::Cancelled))
114 } else if let Some(handle) = self.remote_handle.as_mut() {
115 Pin::new(handle).poll(cx).map(Ok)
116 } else {
117 Poll::Ready(Err(JoinError::Panic))
118 }
119 }
120 }
121
122 pub fn spawn<F, T>(future: F) -> JoinHandle<T>
125 where
126 F: Future<Output = T> + 'static,
127 {
128 let (future, remote_handle) = future.remote_handle();
129 let (abort_handle, abort_registration) = AbortHandle::new_pair();
130 let future = Abortable::new(future, abort_registration);
131
132 wasm_bindgen_futures::spawn_local(async {
133 let _ = future.await;
136 });
137
138 JoinHandle { remote_handle: Some(remote_handle), abort_handle }
139 }
140}
141
142pub use sys::*;
143
144#[cfg(test)]
145mod tests {
146 use assert_matches::assert_matches;
147 use matrix_sdk_test_macros::async_test;
148
149 use super::spawn;
150
151 #[async_test]
152 async fn test_spawn() {
153 let future = async { 42 };
154 let join_handle = spawn(future);
155
156 assert_matches!(join_handle.await, Ok(42));
157 }
158
159 #[async_test]
160 async fn test_abort() {
161 let future = async { 42 };
162 let join_handle = spawn(future);
163
164 join_handle.abort();
165
166 assert!(join_handle.await.is_err());
167 }
168}