ic_cdk/
futures.rs

1//! Functions relating to the async executor.
2//!
3//! ## Running async tasks
4//!
5//! Most async tasks can be run just by changing your canister entry point to `async`:
6//!
7//! ```
8//! # use ic_cdk::update;
9//! # async fn some_other_async_fn() {}
10//! #[update]
11//! async fn foo() {
12//!     some_other_async_fn().await;
13//! }
14//! ```
15//!
16//! To run async tasks in the *background*, however, use [`spawn`]:
17//!
18//! ```
19//! # use ic_cdk::{update, futures::spawn};
20//! # async fn some_other_async_fn() {}
21//! #[update]
22//! async fn foo() {
23//!     spawn(async { some_other_async_fn().await; });
24//!     // do other stuff
25//! }
26//! ```
27//!
28//! The spawned future will not be run at the same time as the remaining code, nor will it run immediately. It will start
29//! running while `foo` awaits (or after it ends if it does not await). Unlike some other libraries, `spawn` does not
30//! return a join-handle; if you want to await multiple results concurrently, use `futures`' [`join_all`] function.
31//!
32//! "Background" is a tricky subject on the IC. Background tasks can only run in the context of a canister message.
33//! If you await a future whose completion you manually trigger in code, such as sending to an async channel,
34//! then the code after the await will be in the call context of whatever you completed it in. This means that global state
35//! like [`caller`], [`in_replicated_execution`], and even [`canister_self`] may have changed. (The canister method
36//! itself cannot await anything triggered by another canister method, or you will get an error that it 'failed to reply'.)
37//! It will also take from that call's instruction limit, which can introduce hidden sources of instruction limit based traps.
38//!
39//! Most importantly, a background task that runs in other call contexts must never trap. When it traps, it will cancel
40//! (see below) the execution of the call whose context it's in, even though that call didn't do anything wrong, and it
41//! may not undo whatever caused it to trap, meaning the canister could end up bricked. Tasks that you expect to complete
42//! before the canister method ends are safe, but traps/panics in tasks that are expected to continue running into other
43//! calls/timers may produce surprising results and behavioral errors.
44//!
45//! ## Automatic cancellation
46//!
47//! Asynchronous tasks can be *canceled*, meaning that a partially completed function will halt at an
48//! `await` point, never complete, and drop its local variables as though it had returned. Cancellation
49//! is caused by panics and traps: if an async function panics, time will be rewound to the
50//! previous await as though the code since then never ran, and then the task will be canceled.
51//!
52//! Use panics sparingly in async functions after the first await, and beware system functions that trap
53//! (which is most of them in the right context). Make atomic transactions between awaits wherever
54//! possible, and use [`scopeguard`] or a [`Drop`] impl for any cleanup functions that must run no matter what.
55//! If an await cannot be removed from the middle of a transaction, and it must be rolled back if it fails,
56//! [`is_recovering_from_trap`] can be used to detect when the task is being automatically canceled.
57//!
58//! [`scopeguard`]: https://docs.rs/scopeguard
59//! [`join_all`]: https://docs.rs/futures/latest/futures/future/fn.join_all.html
60//! [`caller`]: crate::api::caller
61//! [`in_replicated_execution`]: crate::api::in_replicated_execution
62//! [`canister_self`]: crate::api::canister_self
63
64use std::cell::{Cell, RefCell};
65use std::collections::VecDeque;
66use std::future::Future;
67use std::mem;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::task::{Context, Poll, Wake, Waker};
71
72use slotmap::{new_key_type, SlotMap};
73
74/// Spawn an asynchronous task to run in the background. For information about semantics, see
75/// [the module docs](self).
76pub fn spawn<F: 'static + Future<Output = ()>>(future: F) {
77    let in_query = match CONTEXT.get() {
78        AsyncContext::None => panic!("`spawn` can only be called from an executor context"),
79        AsyncContext::Query => true,
80        AsyncContext::Update | AsyncContext::Cancel => false,
81        AsyncContext::FromTask => unreachable!("FromTask"),
82    };
83    let pinned_future = Box::pin(future);
84    let task = Task {
85        future: pinned_future,
86        query: in_query,
87    };
88    let task_id = TASKS.with_borrow_mut(|tasks| tasks.insert(task));
89    WAKEUP.with_borrow_mut(|wakeup| wakeup.push_back(task_id));
90}
91
92/// Execute an update function in a context that allows calling [`spawn`].
93///
94/// You do not need to worry about this function unless you are avoiding the attribute macros.
95///
96/// Background tasks will be polled in the process (and will not be run otherwise).
97/// Panics if called inside an existing executor context.
98pub fn in_executor_context<R>(f: impl FnOnce() -> R) -> R {
99    let _guard = ContextGuard::new(AsyncContext::Update);
100    crate::setup();
101    let res = f();
102    poll_all();
103    res
104}
105
106/// Execute a composite query function in a context that allows calling [`spawn`].
107///
108/// You do not need to worry about this function unless you are avoiding the attribute macros.
109///
110/// Background composite query tasks will be polled in the process (and will not be run otherwise).
111/// Panics if called inside an existing executor context.
112pub fn in_query_executor_context<R>(f: impl FnOnce() -> R) -> R {
113    let _guard = ContextGuard::new(AsyncContext::Query);
114    crate::setup();
115    let res = f();
116    poll_all();
117    res
118}
119
120pub(crate) fn in_callback_executor_context(f: impl FnOnce()) {
121    let _guard = ContextGuard::new(AsyncContext::FromTask);
122    f();
123    poll_all();
124}
125
126pub(crate) fn in_callback_cancellation_context(f: impl FnOnce()) {
127    let _guard = ContextGuard::new(AsyncContext::Cancel);
128    f();
129}
130
131/// Tells you whether the current async fn is being canceled due to a trap/panic.
132///
133/// In a destructor, `is_recovering_from_trap` serves the same purpose as
134/// [std::thread::panicking] - it tells you whether the destructor is executing *because* of a trap,
135/// as opposed to just because the scope was exited, so you could e.g. implement mutex poisoning.
136///
137/// For information about when and how this occurs, see [the module docs](self).
138pub fn is_recovering_from_trap() -> bool {
139    matches!(CONTEXT.get(), AsyncContext::Cancel)
140}
141
142fn poll_all() {
143    let in_query = match CONTEXT.get() {
144        AsyncContext::Query => true,
145        AsyncContext::Update | AsyncContext::Cancel => false,
146        AsyncContext::None => panic!("tasks can only be polled in an executor context"),
147        AsyncContext::FromTask => unreachable!("FromTask"),
148    };
149    let mut ineligible = vec![];
150    while let Some(task_id) = WAKEUP.with_borrow_mut(|queue| queue.pop_front()) {
151        // Temporarily remove the task from the table. We need to execute it while `TASKS` is not borrowed, because it may schedule more tasks.
152        let Some(mut task) = TASKS.with_borrow_mut(|tasks| tasks.get_mut(task_id).map(mem::take))
153        else {
154            // The task is dropped on the first callback that panics, but the last callback is the one that sets the flag.
155            // So if multiple calls are sent concurrently, the waker will be asked to wake a future that no longer exists.
156            // This should be the only possible case in which this happens.
157            crate::trap("Call already trapped");
158            // This also should not happen because the CallFuture handles this itself. But FuturesUnordered introduces some chaos.
159        };
160        if in_query && !task.query {
161            TASKS.with_borrow_mut(|tasks| tasks[task_id] = task);
162            ineligible.push(task_id);
163            continue;
164        }
165        let waker = Waker::from(Arc::new(TaskWaker {
166            task_id,
167            query: task.query,
168        }));
169        let poll = task.future.as_mut().poll(&mut Context::from_waker(&waker));
170        match poll {
171            Poll::Pending => {
172                // more to do, put the task back in the table
173                TASKS.with_borrow_mut(|tasks| {
174                    if let Some(t) = tasks.get_mut(task_id) {
175                        *t = task;
176                    }
177                });
178            }
179            Poll::Ready(()) => {
180                // task complete, remove its entry from the table fully
181                TASKS.with_borrow_mut(|tasks| tasks.remove(task_id));
182            }
183        }
184    }
185    if !ineligible.is_empty() {
186        WAKEUP.with_borrow_mut(|wakeup| wakeup.extend(ineligible));
187    }
188}
189
190new_key_type! {
191    struct TaskId;
192}
193
194thread_local! {
195    static TASKS: RefCell<SlotMap<TaskId, Task>> = <_>::default();
196    static WAKEUP: RefCell<VecDeque<TaskId>> = <_>::default();
197    static CONTEXT: Cell<AsyncContext> = <_>::default();
198}
199
200#[derive(Default, Copy, Clone)]
201enum AsyncContext {
202    #[default]
203    None,
204    Update,
205    Query,
206    FromTask,
207    Cancel,
208}
209
210struct Task {
211    future: Pin<Box<dyn Future<Output = ()>>>,
212    query: bool,
213}
214
215impl Default for Task {
216    fn default() -> Self {
217        Self {
218            future: Box::pin(std::future::pending()),
219            query: false,
220        }
221    }
222}
223
224struct ContextGuard(());
225
226impl ContextGuard {
227    fn new(context: AsyncContext) -> Self {
228        CONTEXT.with(|context_var| {
229            assert!(
230                matches!(context_var.get(), AsyncContext::None),
231                "in_*_context called within an existing async context"
232            );
233            context_var.set(context);
234            Self(())
235        })
236    }
237}
238
239impl Drop for ContextGuard {
240    fn drop(&mut self) {
241        CONTEXT.set(AsyncContext::None);
242    }
243}
244
245/// Waker implementation for executing futures produced by `call`/`call_raw`/etc.
246///
247/// *Almost* a do-nothing executor, i.e. wake directly calls poll with no scheduler, except it attempts to clean up tasks
248/// whose execution has trapped - see `call::is_recovering_from_trap`.
249#[derive(Clone)]
250struct TaskWaker {
251    task_id: TaskId,
252    query: bool,
253}
254
255impl Wake for TaskWaker {
256    fn wake(self: Arc<Self>) {
257        if matches!(CONTEXT.get(), AsyncContext::Cancel) {
258            // This task is recovering from a trap. We cancel it to run destructors.
259            let _task = TASKS.with_borrow_mut(|tasks| tasks.remove(self.task_id));
260            // _task must be dropped *outside* with_borrow_mut - its destructor may (inadvisably) schedule tasks
261        } else {
262            WAKEUP.with_borrow_mut(|wakeup| wakeup.push_back(self.task_id));
263            CONTEXT.with(|context| {
264                if matches!(context.get(), AsyncContext::FromTask) {
265                    if self.query {
266                        context.set(AsyncContext::Query)
267                    } else {
268                        context.set(AsyncContext::Update)
269                    }
270                }
271            })
272        }
273    }
274}