matrix_sdk_common/
executor.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Abstraction over an executor so we can spawn tasks under Wasm the same way
16//! we do usually.
17
18//! On non Wasm platforms, this re-exports parts of tokio directly.  For Wasm,
19//! we provide a single-threaded solution that matches the interface that tokio
20//! provides as a drop in replacement.
21
22#[cfg(not(target_family = "wasm"))]
23mod sys {
24    pub use tokio::{
25        runtime::{Handle, Runtime},
26        task::{spawn, AbortHandle, JoinError, JoinHandle},
27    };
28}
29
30#[cfg(target_family = "wasm")]
31mod sys {
32    use std::{
33        future::Future,
34        pin::Pin,
35        task::{Context, Poll},
36    };
37
38    pub use futures_util::future::AbortHandle;
39    use futures_util::{
40        future::{Abortable, RemoteHandle},
41        FutureExt,
42    };
43
44    /// A Wasm specific version of `tokio::task::JoinError` designed to work
45    /// in the single-threaded environment available in Wasm environments.
46    #[derive(Debug)]
47    pub enum JoinError {
48        Cancelled,
49        Panic,
50    }
51
52    impl JoinError {
53        /// Returns true if the error was caused by the task being cancelled.
54        ///
55        /// See [the module level docs] for more information on cancellation.
56        ///
57        /// [the module level docs]: crate::task#cancellation
58        pub fn is_cancelled(&self) -> bool {
59            matches!(self, JoinError::Cancelled)
60        }
61    }
62
63    impl std::fmt::Display for JoinError {
64        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65            match &self {
66                JoinError::Cancelled => write!(fmt, "task was cancelled"),
67                JoinError::Panic => write!(fmt, "task panicked"),
68            }
69        }
70    }
71
72    /// A Wasm specific version of `tokio::task::JoinHandle` that
73    /// holds handles to locally executing futures.
74    #[derive(Debug)]
75    pub struct JoinHandle<T> {
76        remote_handle: Option<RemoteHandle<T>>,
77        abort_handle: AbortHandle,
78    }
79
80    impl<T> JoinHandle<T> {
81        /// Aborts the spawned future, preventing it from being polled again.
82        pub fn abort(&self) {
83            self.abort_handle.abort();
84        }
85
86        /// Returns the handle to the `AbortHandle` that can be used to
87        /// abort the spawned future.
88        pub fn abort_handle(&self) -> AbortHandle {
89            self.abort_handle.clone()
90        }
91
92        /// Returns true if the spawned future has been aborted.
93        pub fn is_finished(&self) -> bool {
94            self.abort_handle.is_aborted()
95        }
96    }
97
98    impl<T> Drop for JoinHandle<T> {
99        fn drop(&mut self) {
100            // don't abort the spawned future
101            if let Some(h) = self.remote_handle.take() {
102                h.forget();
103            }
104        }
105    }
106
107    impl<T: 'static> Future for JoinHandle<T> {
108        type Output = Result<T, JoinError>;
109
110        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111            if self.abort_handle.is_aborted() {
112                // The future has been aborted. It is not possible to poll it again.
113                Poll::Ready(Err(JoinError::Cancelled))
114            } else if let Some(handle) = self.remote_handle.as_mut() {
115                Pin::new(handle).poll(cx).map(Ok)
116            } else {
117                Poll::Ready(Err(JoinError::Panic))
118            }
119        }
120    }
121
122    /// A Wasm specific version of `tokio::task::spawn` that utilizes
123    /// wasm_bindgen_futures to spawn futures on the local executor.
124    pub fn spawn<F, T>(future: F) -> JoinHandle<T>
125    where
126        F: Future<Output = T> + 'static,
127    {
128        let (future, remote_handle) = future.remote_handle();
129        let (abort_handle, abort_registration) = AbortHandle::new_pair();
130        let future = Abortable::new(future, abort_registration);
131
132        wasm_bindgen_futures::spawn_local(async {
133            // Poll the future, and ignore the result (either it's `Ok(())`, or it's
134            // `Err(Aborted)`).
135            let _ = future.await;
136        });
137
138        JoinHandle { remote_handle: Some(remote_handle), abort_handle }
139    }
140}
141
142pub use sys::*;
143
144#[cfg(test)]
145mod tests {
146    use assert_matches::assert_matches;
147    use matrix_sdk_test_macros::async_test;
148
149    use super::spawn;
150
151    #[async_test]
152    async fn test_spawn() {
153        let future = async { 42 };
154        let join_handle = spawn(future);
155
156        assert_matches!(join_handle.await, Ok(42));
157    }
158
159    #[async_test]
160    async fn test_abort() {
161        let future = async { 42 };
162        let join_handle = spawn(future);
163
164        join_handle.abort();
165
166        assert!(join_handle.await.is_err());
167    }
168}