godot_core/task/
async_runtime.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use std::cell::RefCell;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::panic::AssertUnwindSafe;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, LocalKey, ThreadId};
16
17use crate::builtin::{Callable, Variant};
18use crate::private::handle_panic;
19
20// ----------------------------------------------------------------------------------------------------------------------------------------------
21// Public interface
22
23/// Create a new async background task.
24///
25/// This function allows creating a new async task in which Godot signals can be awaited, like it is possible in GDScript. The
26/// [`TaskHandle`] that is returned provides synchronous introspection into the current state of the task.
27///
28/// Signals can be converted to futures in the following ways:
29///
30/// | Signal type | Simple future                | Fallible future (handles freed object) |
31/// |-------------|------------------------------|----------------------------------------|
32/// | Untyped     | [`Signal::to_future()`]      | [`Signal::to_fallible_future()`]       |
33/// | Typed       | [`TypedSignal::to_future()`] | [`TypedSignal::to_fallible_future()`]  |
34///
35/// [`Signal::to_future()`]: crate::builtin::Signal::to_future
36/// [`Signal::to_fallible_future()`]: crate::builtin::Signal::to_fallible_future
37/// [`TypedSignal::to_future()`]: crate::registry::signal::TypedSignal::to_future
38/// [`TypedSignal::to_fallible_future()`]: crate::registry::signal::TypedSignal::to_fallible_future
39///
40/// # Panics
41/// If called from any other thread than the main thread.
42///
43/// # Examples
44/// With typed signals:
45///
46/// ```no_run
47/// # use godot::prelude::*;
48/// #[derive(GodotClass)]
49/// #[class(init)]
50/// struct Building {
51///    base: Base<RefCounted>,
52/// }
53///
54/// #[godot_api]
55/// impl Building {
56///    #[signal]
57///    fn constructed(seconds: u32);
58/// }
59///
60/// let house = Building::new_gd();
61/// godot::task::spawn(async move {
62///     println!("Wait for construction...");
63///
64///     // Emitted arguments can be fetched in tuple form.
65///     // If the signal has no parameters, you can skip `let` and just await the future.
66///     let (seconds,) = house.signals().constructed().to_future().await;
67///
68///     println!("Construction complete after {seconds}s.");
69/// });
70/// ```
71///
72/// With untyped signals:
73/// ```no_run
74/// # use godot::builtin::Signal;
75/// # use godot::classes::Node;
76/// # use godot::obj::NewAlloc;
77/// let node = Node::new_alloc();
78/// let signal = Signal::from_object_signal(&node, "signal");
79///
80/// godot::task::spawn(async move {
81///     println!("Starting task...");
82///
83///     // Explicit generic arguments needed, here `()`:
84///     signal.to_future::<()>().await;
85///
86///     println!("Node has changed: {}", node.get_name());
87/// });
88/// ```
89#[doc(alias = "async")]
90pub fn spawn(future: impl Future<Output = ()> + 'static) -> TaskHandle {
91    // Spawning new tasks is only allowed on the main thread for now.
92    // We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
93    // same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
94    // a future will be polled.
95    // By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
96    //
97    // Once thread-safe futures are possible the restriction can be lifted.
98    #[cfg(not(wasm_nothreads))] #[cfg_attr(published_docs, doc(cfg(not(wasm_nothreads))))]
99    assert!(
100        crate::init::is_main_thread(),
101        "godot_task() can only be used on the main thread"
102    );
103
104    let (task_handle, godot_waker) = ASYNC_RUNTIME.with_runtime_mut(move |rt| {
105        let task_handle = rt.add_task(Box::pin(future));
106        let godot_waker = Arc::new(GodotWaker::new(
107            task_handle.index,
108            task_handle.id,
109            thread::current().id(),
110        ));
111
112        (task_handle, godot_waker)
113    });
114
115    poll_future(godot_waker);
116    task_handle
117}
118
119/// Handle for an active background task.
120///
121/// This handle provides introspection into the current state of the task, as well as providing a way to cancel it.
122///
123/// The associated task will **not** be canceled if this handle is dropped.
124pub struct TaskHandle {
125    index: usize,
126    id: u64,
127    _no_send_sync: PhantomData<*const ()>,
128}
129
130impl TaskHandle {
131    fn new(index: usize, id: u64) -> Self {
132        Self {
133            index,
134            id,
135            _no_send_sync: PhantomData,
136        }
137    }
138
139    /// Cancels the task if it is still pending and does nothing if it is already completed.
140    pub fn cancel(self) {
141        ASYNC_RUNTIME.with_runtime_mut(|rt| {
142            let Some(task) = rt.tasks.get(self.index) else {
143                // Getting the task from the runtime might return None if the runtime has already been deinitialized. In this case, we just
144                // ignore the cancel request, as the entire runtime has already been canceled.
145                return;
146            };
147
148            let alive = match task.value {
149                FutureSlotState::Empty => {
150                    panic!("Future slot is empty when canceling it! This is a bug!")
151                }
152                FutureSlotState::Gone => false,
153                FutureSlotState::Pending(_) => task.id == self.id,
154                FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
155            };
156
157            if !alive {
158                return;
159            }
160
161            rt.clear_task(self.index);
162        })
163    }
164
165    /// Synchronously checks if the task is still pending or has already completed.
166    pub fn is_pending(&self) -> bool {
167        ASYNC_RUNTIME.with_runtime(|rt| {
168            let slot = rt
169                .tasks
170                .get(self.index)
171                .unwrap_or_else(|| unreachable!("missing future slot at index {}", self.index));
172
173            if slot.id != self.id {
174                return false;
175            }
176
177            matches!(
178                slot.value,
179                FutureSlotState::Pending(_) | FutureSlotState::Polling
180            )
181        })
182    }
183}
184
185// ----------------------------------------------------------------------------------------------------------------------------------------------
186// Async Runtime
187
188const ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE: &str = "The async runtime is being accessed after it has been deinitialized. This should not be possible and is most likely a bug.";
189
190thread_local! {
191    /// The thread local is only initialized the first time it's used. This means the async runtime won't be allocated until a task is
192    /// spawned.
193    static ASYNC_RUNTIME: RefCell<Option<AsyncRuntime>> = RefCell::new(Some(AsyncRuntime::new()));
194}
195
196/// Will be called during engine shutdown.
197///
198/// We have to drop all the remaining Futures during engine shutdown. This avoids them being dropped at process termination where they would
199/// try to access engine resources, which leads to SEGFAULTs.
200pub(crate) fn cleanup() {
201    ASYNC_RUNTIME.set(None);
202}
203
204#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
205pub fn has_godot_task_panicked(task_handle: TaskHandle) -> bool {
206    ASYNC_RUNTIME.with_runtime(|rt| rt.panicked_tasks.contains(&task_handle.id))
207}
208
209/// The current state of a future inside the async runtime.
210enum FutureSlotState<T> {
211    /// Slot is currently empty.
212    Empty,
213    /// Slot was previously occupied but the future has been canceled or the slot reused.
214    Gone,
215    /// Slot contains a pending future.
216    Pending(T),
217    /// Slot contains a future which is currently being polled.
218    Polling,
219}
220
221/// Wrapper around a future that is being stored in the async runtime.
222///
223/// This wrapper contains additional metadata for the async runtime.
224struct FutureSlot<T> {
225    value: FutureSlotState<T>,
226    id: u64,
227}
228
229impl<T> FutureSlot<T> {
230    /// Create a new slot with a pending future.
231    fn pending(id: u64, value: T) -> Self {
232        Self {
233            value: FutureSlotState::Pending(value),
234            id,
235        }
236    }
237
238    /// Checks if the future slot is either still empty or has become unoccupied due to a future completing.
239    fn is_empty(&self) -> bool {
240        matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
241    }
242
243    /// Drop the future from this slot.
244    ///
245    /// This transitions the slot into the [`FutureSlotState::Gone`] state.
246    fn clear(&mut self) {
247        self.value = FutureSlotState::Gone;
248    }
249
250    /// Attempts to extract the future with the given ID from the slot.
251    ///
252    /// Puts the slot into [`FutureSlotState::Polling`] state after taking the future out. It is expected that the future is either parked
253    /// again or the slot is cleared.
254    /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
255    fn take_for_polling(&mut self, id: u64) -> FutureSlotState<T> {
256        match self.value {
257            FutureSlotState::Empty => FutureSlotState::Empty,
258            FutureSlotState::Polling => FutureSlotState::Polling,
259            FutureSlotState::Gone => FutureSlotState::Gone,
260            FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
261            FutureSlotState::Pending(_) => {
262                std::mem::replace(&mut self.value, FutureSlotState::Polling)
263            }
264        }
265    }
266
267    /// Parks the future in this slot again.
268    ///
269    /// # Panics
270    /// - If the slot is not in state [`FutureSlotState::Polling`].
271    fn park(&mut self, value: T) {
272        match self.value {
273            FutureSlotState::Empty | FutureSlotState::Gone => {
274                panic!("cannot park future in slot which is unoccupied")
275            }
276            FutureSlotState::Pending(_) => {
277                panic!(
278                    "cannot park future in slot, which is already occupied by a different future"
279                )
280            }
281            FutureSlotState::Polling => {
282                self.value = FutureSlotState::Pending(value);
283            }
284        }
285    }
286}
287
288/// The storage for the pending tasks of the async runtime.
289#[derive(Default)]
290struct AsyncRuntime {
291    tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
292    next_task_id: u64,
293    #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
294    panicked_tasks: std::collections::HashSet<u64>,
295}
296
297impl AsyncRuntime {
298    fn new() -> Self {
299        Self {
300            // We only create a new async runtime inside a thread_local, which has lazy initialization on first use.
301            tasks: Vec::with_capacity(16),
302            next_task_id: 0,
303            #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
304            panicked_tasks: std::collections::HashSet::default(),
305        }
306    }
307
308    /// Get the next task ID.
309    fn next_id(&mut self) -> u64 {
310        let id = self.next_task_id;
311        self.next_task_id += 1;
312        id
313    }
314
315    /// Store a new async task in the runtime.
316    ///
317    /// First, a linear search is performed to locate an already existing but currently unoccupied slot in the task buffer. If there is no
318    /// free slot, a new slot is added which may grow the underlying [`Vec`].
319    ///
320    /// The future storage always starts out with a capacity of 10 tasks.
321    fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
322        let id = self.next_id();
323        let index_slot = self
324            .tasks
325            // If we find an available slot, we will assign the new future to it.
326            .iter_mut()
327            .enumerate()
328            .find(|(_, slot)| slot.is_empty());
329
330        let boxed = Box::pin(future);
331
332        let index = match index_slot {
333            Some((index, slot)) => {
334                *slot = FutureSlot::pending(id, boxed);
335                index
336            }
337            None => {
338                self.tasks.push(FutureSlot::pending(id, boxed));
339                self.tasks.len() - 1
340            }
341        };
342
343        TaskHandle::new(index, id)
344    }
345
346    /// Extract a pending task from the storage.
347    ///
348    /// Attempts to extract a future with the given ID from the specified index and leaves the slot in state [`FutureSlotState::Polling`].
349    /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
350    fn take_task_for_polling(
351        &mut self,
352        index: usize,
353        id: u64,
354    ) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
355        let slot = self.tasks.get_mut(index);
356        slot.map(|inner| inner.take_for_polling(id))
357            .unwrap_or(FutureSlotState::Empty)
358    }
359
360    /// Remove a future from the storage and free up its slot.
361    ///
362    /// The slot is left in the [`FutureSlotState::Gone`] state.
363    fn clear_task(&mut self, index: usize) {
364        self.tasks[index].clear();
365    }
366
367    /// Move a future back into its slot.
368    ///
369    /// # Panic
370    /// - If the underlying slot is not in the [`FutureSlotState::Polling`] state.
371    fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
372        self.tasks[index].park(future);
373    }
374
375    /// Track that a future caused a panic.
376    ///
377    /// This is only available for itest.
378    #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
379    fn track_panic(&mut self, task_id: u64) {
380        self.panicked_tasks.insert(task_id);
381    }
382}
383
384trait WithRuntime {
385    fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R;
386    fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R;
387}
388
389impl WithRuntime for LocalKey<RefCell<Option<AsyncRuntime>>> {
390    fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R {
391        self.with_borrow(|rt| {
392            let rt_ref = rt.as_ref().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
393
394            f(rt_ref)
395        })
396    }
397
398    fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R {
399        self.with_borrow_mut(|rt| {
400            let rt_ref = rt.as_mut().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
401
402            f(rt_ref)
403        })
404    }
405}
406
407/// Use a godot waker to poll it's associated future.
408///
409/// # Panics
410/// - If called from a thread other than the main-thread.
411fn poll_future(godot_waker: Arc<GodotWaker>) {
412    let current_thread = thread::current().id();
413
414    assert_eq!(
415        godot_waker.thread_id,
416        current_thread,
417        "trying to poll future on a different thread!\n  Current thread: {:?}\n  Future thread: {:?}",
418        current_thread,
419        godot_waker.thread_id,
420    );
421
422    let waker = Waker::from(godot_waker.clone());
423    let mut ctx = Context::from_waker(&waker);
424
425    // Move future out of the runtime while we are polling it to avoid holding a mutable reference for the entire runtime.
426    let future = ASYNC_RUNTIME.with_runtime_mut(|rt| {
427        match rt.take_task_for_polling(godot_waker.runtime_index, godot_waker.task_id) {
428            FutureSlotState::Empty => {
429                panic!("Future slot is empty when waking it! This is a bug!");
430            }
431
432            FutureSlotState::Gone => None,
433
434            FutureSlotState::Polling => {
435                unreachable!("the same GodotWaker has been called recursively");
436            }
437
438            FutureSlotState::Pending(future) => Some(future),
439        }
440    });
441
442    let Some(future) = future else {
443        // Future has been canceled while the waker was already triggered.
444        return;
445    };
446
447    let error_context = || "Godot async task failed".to_string();
448
449    // If Future::poll() panics, the future is immediately dropped and cannot be accessed again,
450    // thus any state that may not have been unwind-safe cannot be observed later.
451    let mut future = AssertUnwindSafe(future);
452
453    let panic_result = handle_panic(error_context, move || {
454        (future.as_mut().poll(&mut ctx), future)
455    });
456
457    let Ok((poll_result, future)) = panic_result else {
458        // Polling the future caused a panic. The task state has to be cleaned up and we want track the panic if the trace feature is enabled.
459        ASYNC_RUNTIME.with_runtime_mut(|rt| {
460            #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
461            rt.track_panic(godot_waker.task_id);
462            rt.clear_task(godot_waker.runtime_index);
463        });
464
465        return;
466    };
467
468    // Update the state of the Future in the runtime.
469    ASYNC_RUNTIME.with_runtime_mut(|rt| match poll_result {
470        // Future is still pending, so we park it again.
471        Poll::Pending => rt.park_task(godot_waker.runtime_index, future.0),
472
473        // Future has resolved, so we remove it from the runtime.
474        Poll::Ready(()) => rt.clear_task(godot_waker.runtime_index),
475    });
476}
477
478/// Implementation of a [`Waker`] to poll futures with the engine.
479struct GodotWaker {
480    runtime_index: usize,
481    task_id: u64,
482    thread_id: ThreadId,
483}
484
485impl GodotWaker {
486    fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
487        Self {
488            runtime_index: index,
489            thread_id,
490            task_id,
491        }
492    }
493}
494
495// Uses a deferred callable to poll the associated future, i.e. at the end of the current frame.
496impl Wake for GodotWaker {
497    fn wake(self: Arc<Self>) {
498        let mut waker = Some(self);
499
500        /// Enforce the passed closure is generic over its lifetime. The compiler gets confused about the livetime of the argument otherwise.
501        /// This appears to be a common issue: https://github.com/rust-lang/rust/issues/89976
502        fn callback_type_hint<F>(f: F) -> F
503        where
504            F: for<'a> FnMut(&'a [&Variant]) -> Result<Variant, ()>,
505        {
506            f
507        }
508
509        #[cfg(not(feature = "experimental-threads"))] #[cfg_attr(published_docs, doc(cfg(not(feature = "experimental-threads"))))]
510        let create_callable = Callable::from_local_fn;
511
512        #[cfg(feature = "experimental-threads")] #[cfg_attr(published_docs, doc(cfg(feature = "experimental-threads")))]
513        let create_callable = Callable::from_sync_fn;
514
515        let callable = create_callable(
516            "GodotWaker::wake",
517            callback_type_hint(move |_args| {
518                poll_future(waker.take().expect("Callable will never be called again"));
519                Ok(Variant::nil())
520            }),
521        );
522
523        // Schedule waker to poll the Future at the end of the frame.
524        callable.call_deferred(&[]);
525    }
526}