Skip to main content

linera_base/
task.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Abstractions over tasks that can be used natively or on the Web.
6 */
7
8use futures::{future, Future, FutureExt as _};
9
10/// `Send` on native targets; no bound on web (where there's only one thread).
11///
12/// Use this in generic bounds that need `Send` on native but should compile on
13/// web without the bound. Combined with [`run_detached`], this lets a single
14/// function body support both targets.
15#[cfg(not(web))]
16pub trait MaybeSend: Send {}
17#[cfg(not(web))]
18impl<T: Send> MaybeSend for T {}
19
20/// `Send` on native targets; no bound on web (where there's only one thread).
21#[cfg(web)]
22pub trait MaybeSend {}
23#[cfg(web)]
24impl<T> MaybeSend for T {}
25
26/// Spawns `future` on the runtime and awaits its completion.
27///
28/// Dropping the returned future does *not* cancel the spawned task — it runs
29/// to completion in the background. Use this when the spawned work (e.g. a
30/// storage write paired with its in-memory finalization) must not be torn
31/// apart mid-flight by caller cancellation.
32pub async fn run_detached<F, R>(future: F) -> R
33where
34    F: Future<Output = R> + MaybeSend + 'static,
35    R: MaybeSend + 'static,
36{
37    // On native, `tokio::task::spawn` returns a `JoinHandle` that already
38    // detaches on drop. On web, `wasm_bindgen_futures::spawn_local` is
39    // fire-and-forget, so we deliver the output through a oneshot channel.
40    #[cfg(not(web))]
41    {
42        tokio::task::spawn(future)
43            .await
44            .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))
45    }
46    #[cfg(web)]
47    {
48        let (tx, rx) = futures::channel::oneshot::channel();
49        wasm_bindgen_futures::spawn_local(async move {
50            if tx.send(future.await).is_err() {
51                tracing::debug!("run_detached: receiver dropped before result was delivered");
52            }
53        });
54        rx.await
55            .expect("spawned task dropped without sending its result")
56    }
57}
58
59/// The type of a future awaiting another task.
60///
61/// On drop, the remote task will be asynchronously cancelled, but will remain
62/// alive until it reaches a yield point.
63///
64/// To wait for the task to be fully cancelled, use [`Task::cancel`].
65pub struct Task<R> {
66    abort_handle: future::AbortHandle,
67    output: future::RemoteHandle<Result<R, future::Aborted>>,
68}
69
70impl<R: 'static> Task<R> {
71    fn spawn_<F: Future<Output = R>, T>(
72        future: F,
73        spawn: impl FnOnce(future::Remote<future::Abortable<F>>) -> T,
74    ) -> Self {
75        let (abortable_future, abort_handle) = future::abortable(future);
76        let (task, output) = abortable_future.remote_handle();
77        let _ = spawn(task);
78        Self {
79            abort_handle,
80            output,
81        }
82    }
83
84    /// Spawns a new task, potentially on the current thread.
85    #[cfg(not(web))]
86    pub fn spawn<F: Future<Output = R> + Send + 'static>(future: F) -> Self
87    where
88        R: Send,
89    {
90        Self::spawn_(future, tokio::task::spawn)
91    }
92
93    /// Spawns a new task on the current thread.
94    #[cfg(web)]
95    pub fn spawn<F: Future<Output = R> + 'static>(future: F) -> Self {
96        Self::spawn_(future, wasm_bindgen_futures::spawn_local)
97    }
98
99    /// Creates a [`Task`] that is immediately ready.
100    pub fn ready(value: R) -> Self {
101        Self::spawn_(async { value }, |fut| {
102            fut.now_or_never().expect("the future is ready")
103        })
104    }
105
106    /// Cancels the task, resolving only when the wrapped future is completely dropped.
107    pub async fn cancel(self) {
108        self.abort_handle.abort();
109        let _ = self.output.await;
110    }
111
112    /// Forgets the task. The task will continue to run to completion in the
113    /// background, but will no longer be joinable or cancelable.
114    pub fn forget(self) {
115        self.output.forget();
116    }
117}
118
119impl<R: 'static> std::future::IntoFuture for Task<R> {
120    type Output = R;
121    type IntoFuture = future::Map<
122        future::RemoteHandle<Result<R, future::Aborted>>,
123        fn(Result<R, future::Aborted>) -> R,
124    >;
125
126    fn into_future(self) -> Self::IntoFuture {
127        self.output
128            .map(|result| result.expect("we have the only AbortHandle"))
129    }
130}