godot_core/task/async_runtime.rs
1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use std::cell::RefCell;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::panic::AssertUnwindSafe;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, LocalKey, ThreadId};
16
17use crate::builtin::{Callable, Variant};
18use crate::private::handle_panic;
19
20// ----------------------------------------------------------------------------------------------------------------------------------------------
21// Public interface
22
23/// Create a new async background task.
24///
25/// This function allows creating a new async task in which Godot signals can be awaited, like it is possible in GDScript. The
26/// [`TaskHandle`] that is returned provides synchronous introspection into the current state of the task.
27///
28/// Signals can be converted to futures in the following ways:
29///
30/// | Signal type | Simple future | Fallible future (handles freed object) |
31/// |-------------|------------------------------|----------------------------------------|
32/// | Untyped | [`Signal::to_future()`] | [`Signal::to_fallible_future()`] |
33/// | Typed | [`TypedSignal::to_future()`] | [`TypedSignal::to_fallible_future()`] |
34///
35/// [`Signal::to_future()`]: crate::builtin::Signal::to_future
36/// [`Signal::to_fallible_future()`]: crate::builtin::Signal::to_fallible_future
37/// [`TypedSignal::to_future()`]: crate::registry::signal::TypedSignal::to_future
38/// [`TypedSignal::to_fallible_future()`]: crate::registry::signal::TypedSignal::to_fallible_future
39///
40/// # Panics
41/// If called from any other thread than the main thread.
42///
43/// # Examples
44/// With typed signals:
45///
46/// ```no_run
47/// # use godot::prelude::*;
48/// #[derive(GodotClass)]
49/// #[class(init)]
50/// struct Building {
51/// base: Base<RefCounted>,
52/// }
53///
54/// #[godot_api]
55/// impl Building {
56/// #[signal]
57/// fn constructed(seconds: u32);
58/// }
59///
60/// let house = Building::new_gd();
61/// godot::task::spawn(async move {
62/// println!("Wait for construction...");
63///
64/// // Emitted arguments can be fetched in tuple form.
65/// // If the signal has no parameters, you can skip `let` and just await the future.
66/// let (seconds,) = house.signals().constructed().to_future().await;
67///
68/// println!("Construction complete after {seconds}s.");
69/// });
70/// ```
71///
72/// With untyped signals:
73/// ```no_run
74/// # use godot::builtin::Signal;
75/// # use godot::classes::Node;
76/// # use godot::obj::NewAlloc;
77/// let node = Node::new_alloc();
78/// let signal = Signal::from_object_signal(&node, "signal");
79///
80/// godot::task::spawn(async move {
81/// println!("Starting task...");
82///
83/// // Explicit generic arguments needed, here `()`:
84/// signal.to_future::<()>().await;
85///
86/// println!("Node has changed: {}", node.get_name());
87/// });
88/// ```
89#[doc(alias = "async")]
90pub fn spawn(future: impl Future<Output = ()> + 'static) -> TaskHandle {
91 // Spawning new tasks is only allowed on the main thread for now.
92 // We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
93 // same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
94 // a future will be polled.
95 // By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
96 //
97 // Once thread-safe futures are possible the restriction can be lifted.
98 #[cfg(not(wasm_nothreads))] #[cfg_attr(published_docs, doc(cfg(not(wasm_nothreads))))]
99 assert!(
100 crate::init::is_main_thread(),
101 "godot_task() can only be used on the main thread"
102 );
103
104 let (task_handle, godot_waker) = ASYNC_RUNTIME.with_runtime_mut(move |rt| {
105 let task_handle = rt.add_task(Box::pin(future));
106 let godot_waker = Arc::new(GodotWaker::new(
107 task_handle.index,
108 task_handle.id,
109 thread::current().id(),
110 ));
111
112 (task_handle, godot_waker)
113 });
114
115 poll_future(godot_waker);
116 task_handle
117}
118
119/// Handle for an active background task.
120///
121/// This handle provides introspection into the current state of the task, as well as providing a way to cancel it.
122///
123/// The associated task will **not** be canceled if this handle is dropped.
124pub struct TaskHandle {
125 index: usize,
126 id: u64,
127 _no_send_sync: PhantomData<*const ()>,
128}
129
130impl TaskHandle {
131 fn new(index: usize, id: u64) -> Self {
132 Self {
133 index,
134 id,
135 _no_send_sync: PhantomData,
136 }
137 }
138
139 /// Cancels the task if it is still pending and does nothing if it is already completed.
140 pub fn cancel(self) {
141 ASYNC_RUNTIME.with_runtime_mut(|rt| {
142 let Some(task) = rt.tasks.get(self.index) else {
143 // Getting the task from the runtime might return None if the runtime has already been deinitialized. In this case, we just
144 // ignore the cancel request, as the entire runtime has already been canceled.
145 return;
146 };
147
148 let alive = match task.value {
149 FutureSlotState::Empty => {
150 panic!("Future slot is empty when canceling it! This is a bug!")
151 }
152 FutureSlotState::Gone => false,
153 FutureSlotState::Pending(_) => task.id == self.id,
154 FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
155 };
156
157 if !alive {
158 return;
159 }
160
161 rt.clear_task(self.index);
162 })
163 }
164
165 /// Synchronously checks if the task is still pending or has already completed.
166 pub fn is_pending(&self) -> bool {
167 ASYNC_RUNTIME.with_runtime(|rt| {
168 let slot = rt
169 .tasks
170 .get(self.index)
171 .unwrap_or_else(|| unreachable!("missing future slot at index {}", self.index));
172
173 if slot.id != self.id {
174 return false;
175 }
176
177 matches!(
178 slot.value,
179 FutureSlotState::Pending(_) | FutureSlotState::Polling
180 )
181 })
182 }
183}
184
185// ----------------------------------------------------------------------------------------------------------------------------------------------
186// Async Runtime
187
188const ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE: &str = "The async runtime is being accessed after it has been deinitialized. This should not be possible and is most likely a bug.";
189
190thread_local! {
191 /// The thread local is only initialized the first time it's used. This means the async runtime won't be allocated until a task is
192 /// spawned.
193 static ASYNC_RUNTIME: RefCell<Option<AsyncRuntime>> = RefCell::new(Some(AsyncRuntime::new()));
194}
195
196/// Will be called during engine shutdown.
197///
198/// We have to drop all the remaining Futures during engine shutdown. This avoids them being dropped at process termination where they would
199/// try to access engine resources, which leads to SEGFAULTs.
200pub(crate) fn cleanup() {
201 ASYNC_RUNTIME.set(None);
202}
203
204#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
205pub fn has_godot_task_panicked(task_handle: TaskHandle) -> bool {
206 ASYNC_RUNTIME.with_runtime(|rt| rt.panicked_tasks.contains(&task_handle.id))
207}
208
209/// The current state of a future inside the async runtime.
210enum FutureSlotState<T> {
211 /// Slot is currently empty.
212 Empty,
213 /// Slot was previously occupied but the future has been canceled or the slot reused.
214 Gone,
215 /// Slot contains a pending future.
216 Pending(T),
217 /// Slot contains a future which is currently being polled.
218 Polling,
219}
220
221/// Wrapper around a future that is being stored in the async runtime.
222///
223/// This wrapper contains additional metadata for the async runtime.
224struct FutureSlot<T> {
225 value: FutureSlotState<T>,
226 id: u64,
227}
228
229impl<T> FutureSlot<T> {
230 /// Create a new slot with a pending future.
231 fn pending(id: u64, value: T) -> Self {
232 Self {
233 value: FutureSlotState::Pending(value),
234 id,
235 }
236 }
237
238 /// Checks if the future slot is either still empty or has become unoccupied due to a future completing.
239 fn is_empty(&self) -> bool {
240 matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
241 }
242
243 /// Drop the future from this slot.
244 ///
245 /// This transitions the slot into the [`FutureSlotState::Gone`] state.
246 fn clear(&mut self) {
247 self.value = FutureSlotState::Gone;
248 }
249
250 /// Attempts to extract the future with the given ID from the slot.
251 ///
252 /// Puts the slot into [`FutureSlotState::Polling`] state after taking the future out. It is expected that the future is either parked
253 /// again or the slot is cleared.
254 /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
255 fn take_for_polling(&mut self, id: u64) -> FutureSlotState<T> {
256 match self.value {
257 FutureSlotState::Empty => FutureSlotState::Empty,
258 FutureSlotState::Polling => FutureSlotState::Polling,
259 FutureSlotState::Gone => FutureSlotState::Gone,
260 FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
261 FutureSlotState::Pending(_) => {
262 std::mem::replace(&mut self.value, FutureSlotState::Polling)
263 }
264 }
265 }
266
267 /// Parks the future in this slot again.
268 ///
269 /// # Panics
270 /// - If the slot is not in state [`FutureSlotState::Polling`].
271 fn park(&mut self, value: T) {
272 match self.value {
273 FutureSlotState::Empty | FutureSlotState::Gone => {
274 panic!("cannot park future in slot which is unoccupied")
275 }
276 FutureSlotState::Pending(_) => {
277 panic!(
278 "cannot park future in slot, which is already occupied by a different future"
279 )
280 }
281 FutureSlotState::Polling => {
282 self.value = FutureSlotState::Pending(value);
283 }
284 }
285 }
286}
287
288/// The storage for the pending tasks of the async runtime.
289#[derive(Default)]
290struct AsyncRuntime {
291 tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
292 next_task_id: u64,
293 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
294 panicked_tasks: std::collections::HashSet<u64>,
295}
296
297impl AsyncRuntime {
298 fn new() -> Self {
299 Self {
300 // We only create a new async runtime inside a thread_local, which has lazy initialization on first use.
301 tasks: Vec::with_capacity(16),
302 next_task_id: 0,
303 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
304 panicked_tasks: std::collections::HashSet::default(),
305 }
306 }
307
308 /// Get the next task ID.
309 fn next_id(&mut self) -> u64 {
310 let id = self.next_task_id;
311 self.next_task_id += 1;
312 id
313 }
314
315 /// Store a new async task in the runtime.
316 ///
317 /// First, a linear search is performed to locate an already existing but currently unoccupied slot in the task buffer. If there is no
318 /// free slot, a new slot is added which may grow the underlying [`Vec`].
319 ///
320 /// The future storage always starts out with a capacity of 10 tasks.
321 fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
322 let id = self.next_id();
323 let index_slot = self
324 .tasks
325 // If we find an available slot, we will assign the new future to it.
326 .iter_mut()
327 .enumerate()
328 .find(|(_, slot)| slot.is_empty());
329
330 let boxed = Box::pin(future);
331
332 let index = match index_slot {
333 Some((index, slot)) => {
334 *slot = FutureSlot::pending(id, boxed);
335 index
336 }
337 None => {
338 self.tasks.push(FutureSlot::pending(id, boxed));
339 self.tasks.len() - 1
340 }
341 };
342
343 TaskHandle::new(index, id)
344 }
345
346 /// Extract a pending task from the storage.
347 ///
348 /// Attempts to extract a future with the given ID from the specified index and leaves the slot in state [`FutureSlotState::Polling`].
349 /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
350 fn take_task_for_polling(
351 &mut self,
352 index: usize,
353 id: u64,
354 ) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
355 let slot = self.tasks.get_mut(index);
356 slot.map(|inner| inner.take_for_polling(id))
357 .unwrap_or(FutureSlotState::Empty)
358 }
359
360 /// Remove a future from the storage and free up its slot.
361 ///
362 /// The slot is left in the [`FutureSlotState::Gone`] state.
363 fn clear_task(&mut self, index: usize) {
364 self.tasks[index].clear();
365 }
366
367 /// Move a future back into its slot.
368 ///
369 /// # Panic
370 /// - If the underlying slot is not in the [`FutureSlotState::Polling`] state.
371 fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
372 self.tasks[index].park(future);
373 }
374
375 /// Track that a future caused a panic.
376 ///
377 /// This is only available for itest.
378 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
379 fn track_panic(&mut self, task_id: u64) {
380 self.panicked_tasks.insert(task_id);
381 }
382}
383
384trait WithRuntime {
385 fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R;
386 fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R;
387}
388
389impl WithRuntime for LocalKey<RefCell<Option<AsyncRuntime>>> {
390 fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R {
391 self.with_borrow(|rt| {
392 let rt_ref = rt.as_ref().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
393
394 f(rt_ref)
395 })
396 }
397
398 fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R {
399 self.with_borrow_mut(|rt| {
400 let rt_ref = rt.as_mut().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
401
402 f(rt_ref)
403 })
404 }
405}
406
407/// Use a godot waker to poll it's associated future.
408///
409/// # Panics
410/// - If called from a thread other than the main-thread.
411fn poll_future(godot_waker: Arc<GodotWaker>) {
412 let current_thread = thread::current().id();
413
414 assert_eq!(
415 godot_waker.thread_id,
416 current_thread,
417 "trying to poll future on a different thread!\n Current thread: {:?}\n Future thread: {:?}",
418 current_thread,
419 godot_waker.thread_id,
420 );
421
422 let waker = Waker::from(godot_waker.clone());
423 let mut ctx = Context::from_waker(&waker);
424
425 // Move future out of the runtime while we are polling it to avoid holding a mutable reference for the entire runtime.
426 let future = ASYNC_RUNTIME.with_runtime_mut(|rt| {
427 match rt.take_task_for_polling(godot_waker.runtime_index, godot_waker.task_id) {
428 FutureSlotState::Empty => {
429 panic!("Future slot is empty when waking it! This is a bug!");
430 }
431
432 FutureSlotState::Gone => None,
433
434 FutureSlotState::Polling => {
435 unreachable!("the same GodotWaker has been called recursively");
436 }
437
438 FutureSlotState::Pending(future) => Some(future),
439 }
440 });
441
442 let Some(future) = future else {
443 // Future has been canceled while the waker was already triggered.
444 return;
445 };
446
447 let error_context = || "Godot async task failed".to_string();
448
449 // If Future::poll() panics, the future is immediately dropped and cannot be accessed again,
450 // thus any state that may not have been unwind-safe cannot be observed later.
451 let mut future = AssertUnwindSafe(future);
452
453 let panic_result = handle_panic(error_context, move || {
454 (future.as_mut().poll(&mut ctx), future)
455 });
456
457 let Ok((poll_result, future)) = panic_result else {
458 // Polling the future caused a panic. The task state has to be cleaned up and we want track the panic if the trace feature is enabled.
459 ASYNC_RUNTIME.with_runtime_mut(|rt| {
460 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
461 rt.track_panic(godot_waker.task_id);
462 rt.clear_task(godot_waker.runtime_index);
463 });
464
465 return;
466 };
467
468 // Update the state of the Future in the runtime.
469 ASYNC_RUNTIME.with_runtime_mut(|rt| match poll_result {
470 // Future is still pending, so we park it again.
471 Poll::Pending => rt.park_task(godot_waker.runtime_index, future.0),
472
473 // Future has resolved, so we remove it from the runtime.
474 Poll::Ready(()) => rt.clear_task(godot_waker.runtime_index),
475 });
476}
477
478/// Implementation of a [`Waker`] to poll futures with the engine.
479struct GodotWaker {
480 runtime_index: usize,
481 task_id: u64,
482 thread_id: ThreadId,
483}
484
485impl GodotWaker {
486 fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
487 Self {
488 runtime_index: index,
489 thread_id,
490 task_id,
491 }
492 }
493}
494
495// Uses a deferred callable to poll the associated future, i.e. at the end of the current frame.
496impl Wake for GodotWaker {
497 fn wake(self: Arc<Self>) {
498 let mut waker = Some(self);
499
500 /// Enforce the passed closure is generic over its lifetime. The compiler gets confused about the livetime of the argument otherwise.
501 /// This appears to be a common issue: https://github.com/rust-lang/rust/issues/89976
502 fn callback_type_hint<F>(f: F) -> F
503 where
504 F: for<'a> FnMut(&'a [&Variant]) -> Result<Variant, ()>,
505 {
506 f
507 }
508
509 #[cfg(not(feature = "experimental-threads"))] #[cfg_attr(published_docs, doc(cfg(not(feature = "experimental-threads"))))]
510 let create_callable = Callable::from_local_fn;
511
512 #[cfg(feature = "experimental-threads")] #[cfg_attr(published_docs, doc(cfg(feature = "experimental-threads")))]
513 let create_callable = Callable::from_sync_fn;
514
515 let callable = create_callable(
516 "GodotWaker::wake",
517 callback_type_hint(move |_args| {
518 poll_future(waker.take().expect("Callable will never be called again"));
519 Ok(Variant::nil())
520 }),
521 );
522
523 // Schedule waker to poll the Future at the end of the frame.
524 callable.call_deferred(&[]);
525 }
526}