godot_core/task/
async_runtime.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use std::cell::RefCell;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::panic::AssertUnwindSafe;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, LocalKey, ThreadId};
16
17use crate::builtin::{Callable, Variant};
18use crate::private::handle_panic;
19
20// ----------------------------------------------------------------------------------------------------------------------------------------------
21// Public interface
22
23/// Create a new async background task.
24///
25/// This function allows creating a new async task in which Godot signals can be awaited, like it is possible in GDScript. The
26/// [`TaskHandle`] that is returned provides synchronous introspection into the current state of the task.
27///
28/// Signals can be converted to futures in the following ways:
29///
30/// | Signal type | Simple future                | Fallible future (handles freed object) |
31/// |-------------|------------------------------|----------------------------------------|
32/// | Untyped     | [`Signal::to_future()`]      | [`Signal::to_fallible_future()`]       |
33/// | Typed       | [`TypedSignal::to_future()`] | [`TypedSignal::to_fallible_future()`]  |
34///
35/// [`Signal::to_future()`]: crate::builtin::Signal::to_future
36/// [`Signal::to_fallible_future()`]: crate::builtin::Signal::to_fallible_future
37/// [`TypedSignal::to_future()`]: crate::registry::signal::TypedSignal::to_future
38/// [`TypedSignal::to_fallible_future()`]: crate::registry::signal::TypedSignal::to_fallible_future
39///
40/// # Panics
41/// If called from any other thread than the main thread.
42///
43/// # Examples
44/// With typed signals:
45///
46/// ```no_run
47/// # use godot::prelude::*;
48/// #[derive(GodotClass)]
49/// #[class(init)]
50/// struct Building {
51///    base: Base<RefCounted>,
52/// }
53///
54/// #[godot_api]
55/// impl Building {
56///    #[signal]
57///    fn constructed(seconds: u32);
58/// }
59///
60/// let house = Building::new_gd();
61/// godot::task::spawn(async move {
62///     println!("Wait for construction...");
63///
64///     // Emitted arguments can be fetched in tuple form.
65///     // If the signal has no parameters, you can skip `let` and just await the future.
66///     let (seconds,) = house.signals().constructed().to_future().await;
67///
68///     println!("Construction complete after {seconds}s.");
69/// });
70/// ```
71///
72/// With untyped signals:
73/// ```no_run
74/// # use godot::builtin::Signal;
75/// # use godot::classes::Node;
76/// # use godot::obj::NewAlloc;
77/// let node = Node::new_alloc();
78/// let signal = Signal::from_object_signal(&node, "signal");
79///
80/// godot::task::spawn(async move {
81///     println!("Starting task...");
82///
83///     // Explicit generic arguments needed, here `()`:
84///     signal.to_future::<()>().await;
85///
86///     println!("Node has changed: {}", node.get_name());
87/// });
88/// ```
89#[doc(alias = "async")]
90pub fn spawn(future: impl Future<Output = ()> + 'static) -> TaskHandle {
91    // Spawning new tasks is only allowed on the main thread for now.
92    // We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
93    // same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
94    // a future will be polled.
95    // By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
96    //
97    // Once thread-safe futures are possible the restriction can be lifted.
98    assert!(
99        crate::init::is_main_thread(),
100        "godot_task() can only be used on the main thread"
101    );
102
103    let (task_handle, godot_waker) = ASYNC_RUNTIME.with_runtime_mut(move |rt| {
104        let task_handle = rt.add_task(Box::pin(future));
105        let godot_waker = Arc::new(GodotWaker::new(
106            task_handle.index,
107            task_handle.id,
108            thread::current().id(),
109        ));
110
111        (task_handle, godot_waker)
112    });
113
114    poll_future(godot_waker);
115    task_handle
116}
117
118/// Handle for an active background task.
119///
120/// This handle provides introspection into the current state of the task, as well as providing a way to cancel it.
121///
122/// The associated task will **not** be canceled if this handle is dropped.
123pub struct TaskHandle {
124    index: usize,
125    id: u64,
126    _no_send_sync: PhantomData<*const ()>,
127}
128
129impl TaskHandle {
130    fn new(index: usize, id: u64) -> Self {
131        Self {
132            index,
133            id,
134            _no_send_sync: PhantomData,
135        }
136    }
137
138    /// Cancels the task if it is still pending and does nothing if it is already completed.
139    pub fn cancel(self) {
140        ASYNC_RUNTIME.with_runtime_mut(|rt| {
141            let Some(task) = rt.tasks.get(self.index) else {
142                // Getting the task from the runtime might return None if the runtime has already been deinitialized. In this case, we just
143                // ignore the cancel request, as the entire runtime has already been canceled.
144                return;
145            };
146
147            let alive = match task.value {
148                FutureSlotState::Empty => {
149                    panic!("Future slot is empty when canceling it! This is a bug!")
150                }
151                FutureSlotState::Gone => false,
152                FutureSlotState::Pending(_) => task.id == self.id,
153                FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
154            };
155
156            if !alive {
157                return;
158            }
159
160            rt.clear_task(self.index);
161        })
162    }
163
164    /// Synchronously checks if the task is still pending or has already completed.
165    pub fn is_pending(&self) -> bool {
166        ASYNC_RUNTIME.with_runtime(|rt| {
167            let slot = rt
168                .tasks
169                .get(self.index)
170                .unwrap_or_else(|| unreachable!("missing future slot at index {}", self.index));
171
172            if slot.id != self.id {
173                return false;
174            }
175
176            matches!(
177                slot.value,
178                FutureSlotState::Pending(_) | FutureSlotState::Polling
179            )
180        })
181    }
182}
183
184// ----------------------------------------------------------------------------------------------------------------------------------------------
185// Async Runtime
186
187const ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE: &str = "The async runtime is being accessed after it has been deinitialized. This should not be possible and is most likely a bug.";
188
189thread_local! {
190    /// The thread local is only initialized the first time it's used. This means the async runtime won't be allocated until a task is
191    /// spawned.
192    static ASYNC_RUNTIME: RefCell<Option<AsyncRuntime>> = RefCell::new(Some(AsyncRuntime::new()));
193}
194
195/// Will be called during engine shutdown.
196///
197/// We have to drop all the remaining Futures during engine shutdown. This avoids them being dropped at process termination where they would
198/// try to access engine resources, which leads to SEGFAULTs.
199pub(crate) fn cleanup() {
200    ASYNC_RUNTIME.set(None);
201}
202
203#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
204pub fn has_godot_task_panicked(task_handle: TaskHandle) -> bool {
205    ASYNC_RUNTIME.with_runtime(|rt| rt.panicked_tasks.contains(&task_handle.id))
206}
207
208/// The current state of a future inside the async runtime.
209enum FutureSlotState<T> {
210    /// Slot is currently empty.
211    Empty,
212    /// Slot was previously occupied but the future has been canceled or the slot reused.
213    Gone,
214    /// Slot contains a pending future.
215    Pending(T),
216    /// Slot contains a future which is currently being polled.
217    Polling,
218}
219
220/// Wrapper around a future that is being stored in the async runtime.
221///
222/// This wrapper contains additional metadata for the async runtime.
223struct FutureSlot<T> {
224    value: FutureSlotState<T>,
225    id: u64,
226}
227
228impl<T> FutureSlot<T> {
229    /// Create a new slot with a pending future.
230    fn pending(id: u64, value: T) -> Self {
231        Self {
232            value: FutureSlotState::Pending(value),
233            id,
234        }
235    }
236
237    /// Checks if the future slot is either still empty or has become unoccupied due to a future completing.
238    fn is_empty(&self) -> bool {
239        matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
240    }
241
242    /// Drop the future from this slot.
243    ///
244    /// This transitions the slot into the [`FutureSlotState::Gone`] state.
245    fn clear(&mut self) {
246        self.value = FutureSlotState::Gone;
247    }
248
249    /// Attempts to extract the future with the given ID from the slot.
250    ///
251    /// Puts the slot into [`FutureSlotState::Polling`] state after taking the future out. It is expected that the future is either parked
252    /// again or the slot is cleared.
253    /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
254    fn take_for_polling(&mut self, id: u64) -> FutureSlotState<T> {
255        match self.value {
256            FutureSlotState::Empty => FutureSlotState::Empty,
257            FutureSlotState::Polling => FutureSlotState::Polling,
258            FutureSlotState::Gone => FutureSlotState::Gone,
259            FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
260            FutureSlotState::Pending(_) => {
261                std::mem::replace(&mut self.value, FutureSlotState::Polling)
262            }
263        }
264    }
265
266    /// Parks the future in this slot again.
267    ///
268    /// # Panics
269    /// - If the slot is not in state [`FutureSlotState::Polling`].
270    fn park(&mut self, value: T) {
271        match self.value {
272            FutureSlotState::Empty | FutureSlotState::Gone => {
273                panic!("cannot park future in slot which is unoccupied")
274            }
275            FutureSlotState::Pending(_) => {
276                panic!(
277                    "cannot park future in slot, which is already occupied by a different future"
278                )
279            }
280            FutureSlotState::Polling => {
281                self.value = FutureSlotState::Pending(value);
282            }
283        }
284    }
285}
286
287/// The storage for the pending tasks of the async runtime.
288#[derive(Default)]
289struct AsyncRuntime {
290    tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
291    next_task_id: u64,
292    #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
293    panicked_tasks: std::collections::HashSet<u64>,
294}
295
296impl AsyncRuntime {
297    fn new() -> Self {
298        Self {
299            // We only create a new async runtime inside a thread_local, which has lazy initialization on first use.
300            tasks: Vec::with_capacity(16),
301            next_task_id: 0,
302            #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
303            panicked_tasks: std::collections::HashSet::default(),
304        }
305    }
306
307    /// Get the next task ID.
308    fn next_id(&mut self) -> u64 {
309        let id = self.next_task_id;
310        self.next_task_id += 1;
311        id
312    }
313
314    /// Store a new async task in the runtime.
315    ///
316    /// First, a linear search is performed to locate an already existing but currently unoccupied slot in the task buffer. If there is no
317    /// free slot, a new slot is added which may grow the underlying [`Vec`].
318    ///
319    /// The future storage always starts out with a capacity of 10 tasks.
320    fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
321        let id = self.next_id();
322        let index_slot = self
323            .tasks
324            // If we find an available slot, we will assign the new future to it.
325            .iter_mut()
326            .enumerate()
327            .find(|(_, slot)| slot.is_empty());
328
329        let boxed = Box::pin(future);
330
331        let index = match index_slot {
332            Some((index, slot)) => {
333                *slot = FutureSlot::pending(id, boxed);
334                index
335            }
336            None => {
337                self.tasks.push(FutureSlot::pending(id, boxed));
338                self.tasks.len() - 1
339            }
340        };
341
342        TaskHandle::new(index, id)
343    }
344
345    /// Extract a pending task from the storage.
346    ///
347    /// Attempts to extract a future with the given ID from the specified index and leaves the slot in state [`FutureSlotState::Polling`].
348    /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
349    fn take_task_for_polling(
350        &mut self,
351        index: usize,
352        id: u64,
353    ) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
354        let slot = self.tasks.get_mut(index);
355        slot.map(|inner| inner.take_for_polling(id))
356            .unwrap_or(FutureSlotState::Empty)
357    }
358
359    /// Remove a future from the storage and free up its slot.
360    ///
361    /// The slot is left in the [`FutureSlotState::Gone`] state.
362    fn clear_task(&mut self, index: usize) {
363        self.tasks[index].clear();
364    }
365
366    /// Move a future back into its slot.
367    ///
368    /// # Panic
369    /// - If the underlying slot is not in the [`FutureSlotState::Polling`] state.
370    fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
371        self.tasks[index].park(future);
372    }
373
374    /// Track that a future caused a panic.
375    ///
376    /// This is only available for itest.
377    #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
378    fn track_panic(&mut self, task_id: u64) {
379        self.panicked_tasks.insert(task_id);
380    }
381}
382
383trait WithRuntime {
384    fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R;
385    fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R;
386}
387
388impl WithRuntime for LocalKey<RefCell<Option<AsyncRuntime>>> {
389    fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R {
390        self.with_borrow(|rt| {
391            let rt_ref = rt.as_ref().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
392
393            f(rt_ref)
394        })
395    }
396
397    fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R {
398        self.with_borrow_mut(|rt| {
399            let rt_ref = rt.as_mut().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
400
401            f(rt_ref)
402        })
403    }
404}
405
406/// Use a godot waker to poll it's associated future.
407///
408/// # Panics
409/// - If called from a thread other than the main-thread.
410fn poll_future(godot_waker: Arc<GodotWaker>) {
411    let current_thread = thread::current().id();
412
413    assert_eq!(
414        godot_waker.thread_id,
415        current_thread,
416        "trying to poll future on a different thread!\n  Current thread: {:?}\n  Future thread: {:?}",
417        current_thread,
418        godot_waker.thread_id,
419    );
420
421    let waker = Waker::from(godot_waker.clone());
422    let mut ctx = Context::from_waker(&waker);
423
424    // Move future out of the runtime while we are polling it to avoid holding a mutable reference for the entire runtime.
425    let future = ASYNC_RUNTIME.with_runtime_mut(|rt| {
426        match rt.take_task_for_polling(godot_waker.runtime_index, godot_waker.task_id) {
427            FutureSlotState::Empty => {
428                panic!("Future slot is empty when waking it! This is a bug!");
429            }
430
431            FutureSlotState::Gone => None,
432
433            FutureSlotState::Polling => {
434                unreachable!("the same GodotWaker has been called recursively");
435            }
436
437            FutureSlotState::Pending(future) => Some(future),
438        }
439    });
440
441    let Some(future) = future else {
442        // Future has been canceled while the waker was already triggered.
443        return;
444    };
445
446    let error_context = || "Godot async task failed".to_string();
447
448    // If Future::poll() panics, the future is immediately dropped and cannot be accessed again,
449    // thus any state that may not have been unwind-safe cannot be observed later.
450    let mut future = AssertUnwindSafe(future);
451
452    let panic_result = handle_panic(error_context, move || {
453        (future.as_mut().poll(&mut ctx), future)
454    });
455
456    let Ok((poll_result, future)) = panic_result else {
457        // Polling the future caused a panic. The task state has to be cleaned up and we want track the panic if the trace feature is enabled.
458        ASYNC_RUNTIME.with_runtime_mut(|rt| {
459            #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
460            rt.track_panic(godot_waker.task_id);
461            rt.clear_task(godot_waker.runtime_index);
462        });
463
464        return;
465    };
466
467    // Update the state of the Future in the runtime.
468    ASYNC_RUNTIME.with_runtime_mut(|rt| match poll_result {
469        // Future is still pending, so we park it again.
470        Poll::Pending => rt.park_task(godot_waker.runtime_index, future.0),
471
472        // Future has resolved, so we remove it from the runtime.
473        Poll::Ready(()) => rt.clear_task(godot_waker.runtime_index),
474    });
475}
476
477/// Implementation of a [`Waker`] to poll futures with the engine.
478struct GodotWaker {
479    runtime_index: usize,
480    task_id: u64,
481    thread_id: ThreadId,
482}
483
484impl GodotWaker {
485    fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
486        Self {
487            runtime_index: index,
488            thread_id,
489            task_id,
490        }
491    }
492}
493
494// Uses a deferred callable to poll the associated future, i.e. at the end of the current frame.
495impl Wake for GodotWaker {
496    fn wake(self: Arc<Self>) {
497        let mut waker = Some(self);
498
499        /// Enforce the passed closure is generic over its lifetime. The compiler gets confused about the livetime of the argument otherwise.
500        /// This appears to be a common issue: https://github.com/rust-lang/rust/issues/89976
501        fn callback_type_hint<F>(f: F) -> F
502        where
503            F: for<'a> FnMut(&'a [&Variant]) -> Result<Variant, ()>,
504        {
505            f
506        }
507
508        #[cfg(not(feature = "experimental-threads"))] #[cfg_attr(published_docs, doc(cfg(not(feature = "experimental-threads"))))]
509        let create_callable = Callable::from_local_fn;
510
511        #[cfg(feature = "experimental-threads")] #[cfg_attr(published_docs, doc(cfg(feature = "experimental-threads")))]
512        let create_callable = Callable::from_sync_fn;
513
514        let callable = create_callable(
515            "GodotWaker::wake",
516            callback_type_hint(move |_args| {
517                poll_future(waker.take().expect("Callable will never be called again"));
518                Ok(Variant::nil())
519            }),
520        );
521
522        // Schedule waker to poll the Future at the end of the frame.
523        callable.call_deferred(&[]);
524    }
525}