godot_core/task/async_runtime.rs
1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use std::cell::RefCell;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::panic::AssertUnwindSafe;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll, Wake, Waker};
15use std::thread::{self, LocalKey, ThreadId};
16
17use crate::builtin::{Callable, Variant};
18use crate::private::handle_panic;
19
20// ----------------------------------------------------------------------------------------------------------------------------------------------
21// Public interface
22
23/// Create a new async background task.
24///
25/// This function allows creating a new async task in which Godot signals can be awaited, like it is possible in GDScript. The
26/// [`TaskHandle`] that is returned provides synchronous introspection into the current state of the task.
27///
28/// Signals can be converted to futures in the following ways:
29///
30/// | Signal type | Simple future | Fallible future (handles freed object) |
31/// |-------------|------------------------------|----------------------------------------|
32/// | Untyped | [`Signal::to_future()`] | [`Signal::to_fallible_future()`] |
33/// | Typed | [`TypedSignal::to_future()`] | [`TypedSignal::to_fallible_future()`] |
34///
35/// [`Signal::to_future()`]: crate::builtin::Signal::to_future
36/// [`Signal::to_fallible_future()`]: crate::builtin::Signal::to_fallible_future
37/// [`TypedSignal::to_future()`]: crate::registry::signal::TypedSignal::to_future
38/// [`TypedSignal::to_fallible_future()`]: crate::registry::signal::TypedSignal::to_fallible_future
39///
40/// # Panics
41/// If called from any other thread than the main thread.
42///
43/// # Examples
44/// With typed signals:
45///
46/// ```no_run
47/// # use godot::prelude::*;
48/// #[derive(GodotClass)]
49/// #[class(init)]
50/// struct Building {
51/// base: Base<RefCounted>,
52/// }
53///
54/// #[godot_api]
55/// impl Building {
56/// #[signal]
57/// fn constructed(seconds: u32);
58/// }
59///
60/// let house = Building::new_gd();
61/// godot::task::spawn(async move {
62/// println!("Wait for construction...");
63///
64/// // Emitted arguments can be fetched in tuple form.
65/// // If the signal has no parameters, you can skip `let` and just await the future.
66/// let (seconds,) = house.signals().constructed().to_future().await;
67///
68/// println!("Construction complete after {seconds}s.");
69/// });
70/// ```
71///
72/// With untyped signals:
73/// ```no_run
74/// # use godot::builtin::Signal;
75/// # use godot::classes::Node;
76/// # use godot::obj::NewAlloc;
77/// let node = Node::new_alloc();
78/// let signal = Signal::from_object_signal(&node, "signal");
79///
80/// godot::task::spawn(async move {
81/// println!("Starting task...");
82///
83/// // Explicit generic arguments needed, here `()`:
84/// signal.to_future::<()>().await;
85///
86/// println!("Node has changed: {}", node.get_name());
87/// });
88/// ```
89#[doc(alias = "async")]
90pub fn spawn(future: impl Future<Output = ()> + 'static) -> TaskHandle {
91 // Spawning new tasks is only allowed on the main thread for now.
92 // We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
93 // same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
94 // a future will be polled.
95 // By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
96 //
97 // Once thread-safe futures are possible the restriction can be lifted.
98 assert!(
99 crate::init::is_main_thread(),
100 "godot_task() can only be used on the main thread"
101 );
102
103 let (task_handle, godot_waker) = ASYNC_RUNTIME.with_runtime_mut(move |rt| {
104 let task_handle = rt.add_task(Box::pin(future));
105 let godot_waker = Arc::new(GodotWaker::new(
106 task_handle.index,
107 task_handle.id,
108 thread::current().id(),
109 ));
110
111 (task_handle, godot_waker)
112 });
113
114 poll_future(godot_waker);
115 task_handle
116}
117
118/// Handle for an active background task.
119///
120/// This handle provides introspection into the current state of the task, as well as providing a way to cancel it.
121///
122/// The associated task will **not** be canceled if this handle is dropped.
123pub struct TaskHandle {
124 index: usize,
125 id: u64,
126 _no_send_sync: PhantomData<*const ()>,
127}
128
129impl TaskHandle {
130 fn new(index: usize, id: u64) -> Self {
131 Self {
132 index,
133 id,
134 _no_send_sync: PhantomData,
135 }
136 }
137
138 /// Cancels the task if it is still pending and does nothing if it is already completed.
139 pub fn cancel(self) {
140 ASYNC_RUNTIME.with_runtime_mut(|rt| {
141 let Some(task) = rt.tasks.get(self.index) else {
142 // Getting the task from the runtime might return None if the runtime has already been deinitialized. In this case, we just
143 // ignore the cancel request, as the entire runtime has already been canceled.
144 return;
145 };
146
147 let alive = match task.value {
148 FutureSlotState::Empty => {
149 panic!("Future slot is empty when canceling it! This is a bug!")
150 }
151 FutureSlotState::Gone => false,
152 FutureSlotState::Pending(_) => task.id == self.id,
153 FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
154 };
155
156 if !alive {
157 return;
158 }
159
160 rt.clear_task(self.index);
161 })
162 }
163
164 /// Synchronously checks if the task is still pending or has already completed.
165 pub fn is_pending(&self) -> bool {
166 ASYNC_RUNTIME.with_runtime(|rt| {
167 let slot = rt
168 .tasks
169 .get(self.index)
170 .unwrap_or_else(|| unreachable!("missing future slot at index {}", self.index));
171
172 if slot.id != self.id {
173 return false;
174 }
175
176 matches!(
177 slot.value,
178 FutureSlotState::Pending(_) | FutureSlotState::Polling
179 )
180 })
181 }
182}
183
184// ----------------------------------------------------------------------------------------------------------------------------------------------
185// Async Runtime
186
187const ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE: &str = "The async runtime is being accessed after it has been deinitialized. This should not be possible and is most likely a bug.";
188
189thread_local! {
190 /// The thread local is only initialized the first time it's used. This means the async runtime won't be allocated until a task is
191 /// spawned.
192 static ASYNC_RUNTIME: RefCell<Option<AsyncRuntime>> = RefCell::new(Some(AsyncRuntime::new()));
193}
194
195/// Will be called during engine shutdown.
196///
197/// We have to drop all the remaining Futures during engine shutdown. This avoids them being dropped at process termination where they would
198/// try to access engine resources, which leads to SEGFAULTs.
199pub(crate) fn cleanup() {
200 ASYNC_RUNTIME.set(None);
201}
202
203#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
204pub fn has_godot_task_panicked(task_handle: TaskHandle) -> bool {
205 ASYNC_RUNTIME.with_runtime(|rt| rt.panicked_tasks.contains(&task_handle.id))
206}
207
208/// The current state of a future inside the async runtime.
209enum FutureSlotState<T> {
210 /// Slot is currently empty.
211 Empty,
212 /// Slot was previously occupied but the future has been canceled or the slot reused.
213 Gone,
214 /// Slot contains a pending future.
215 Pending(T),
216 /// Slot contains a future which is currently being polled.
217 Polling,
218}
219
220/// Wrapper around a future that is being stored in the async runtime.
221///
222/// This wrapper contains additional metadata for the async runtime.
223struct FutureSlot<T> {
224 value: FutureSlotState<T>,
225 id: u64,
226}
227
228impl<T> FutureSlot<T> {
229 /// Create a new slot with a pending future.
230 fn pending(id: u64, value: T) -> Self {
231 Self {
232 value: FutureSlotState::Pending(value),
233 id,
234 }
235 }
236
237 /// Checks if the future slot is either still empty or has become unoccupied due to a future completing.
238 fn is_empty(&self) -> bool {
239 matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
240 }
241
242 /// Drop the future from this slot.
243 ///
244 /// This transitions the slot into the [`FutureSlotState::Gone`] state.
245 fn clear(&mut self) {
246 self.value = FutureSlotState::Gone;
247 }
248
249 /// Attempts to extract the future with the given ID from the slot.
250 ///
251 /// Puts the slot into [`FutureSlotState::Polling`] state after taking the future out. It is expected that the future is either parked
252 /// again or the slot is cleared.
253 /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
254 fn take_for_polling(&mut self, id: u64) -> FutureSlotState<T> {
255 match self.value {
256 FutureSlotState::Empty => FutureSlotState::Empty,
257 FutureSlotState::Polling => FutureSlotState::Polling,
258 FutureSlotState::Gone => FutureSlotState::Gone,
259 FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
260 FutureSlotState::Pending(_) => {
261 std::mem::replace(&mut self.value, FutureSlotState::Polling)
262 }
263 }
264 }
265
266 /// Parks the future in this slot again.
267 ///
268 /// # Panics
269 /// - If the slot is not in state [`FutureSlotState::Polling`].
270 fn park(&mut self, value: T) {
271 match self.value {
272 FutureSlotState::Empty | FutureSlotState::Gone => {
273 panic!("cannot park future in slot which is unoccupied")
274 }
275 FutureSlotState::Pending(_) => {
276 panic!(
277 "cannot park future in slot, which is already occupied by a different future"
278 )
279 }
280 FutureSlotState::Polling => {
281 self.value = FutureSlotState::Pending(value);
282 }
283 }
284 }
285}
286
287/// The storage for the pending tasks of the async runtime.
288#[derive(Default)]
289struct AsyncRuntime {
290 tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
291 next_task_id: u64,
292 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
293 panicked_tasks: std::collections::HashSet<u64>,
294}
295
296impl AsyncRuntime {
297 fn new() -> Self {
298 Self {
299 // We only create a new async runtime inside a thread_local, which has lazy initialization on first use.
300 tasks: Vec::with_capacity(16),
301 next_task_id: 0,
302 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
303 panicked_tasks: std::collections::HashSet::default(),
304 }
305 }
306
307 /// Get the next task ID.
308 fn next_id(&mut self) -> u64 {
309 let id = self.next_task_id;
310 self.next_task_id += 1;
311 id
312 }
313
314 /// Store a new async task in the runtime.
315 ///
316 /// First, a linear search is performed to locate an already existing but currently unoccupied slot in the task buffer. If there is no
317 /// free slot, a new slot is added which may grow the underlying [`Vec`].
318 ///
319 /// The future storage always starts out with a capacity of 10 tasks.
320 fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
321 let id = self.next_id();
322 let index_slot = self
323 .tasks
324 // If we find an available slot, we will assign the new future to it.
325 .iter_mut()
326 .enumerate()
327 .find(|(_, slot)| slot.is_empty());
328
329 let boxed = Box::pin(future);
330
331 let index = match index_slot {
332 Some((index, slot)) => {
333 *slot = FutureSlot::pending(id, boxed);
334 index
335 }
336 None => {
337 self.tasks.push(FutureSlot::pending(id, boxed));
338 self.tasks.len() - 1
339 }
340 };
341
342 TaskHandle::new(index, id)
343 }
344
345 /// Extract a pending task from the storage.
346 ///
347 /// Attempts to extract a future with the given ID from the specified index and leaves the slot in state [`FutureSlotState::Polling`].
348 /// In cases were the slot state is not [`FutureSlotState::Pending`], a copy of the state is returned but the slot remains untouched.
349 fn take_task_for_polling(
350 &mut self,
351 index: usize,
352 id: u64,
353 ) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
354 let slot = self.tasks.get_mut(index);
355 slot.map(|inner| inner.take_for_polling(id))
356 .unwrap_or(FutureSlotState::Empty)
357 }
358
359 /// Remove a future from the storage and free up its slot.
360 ///
361 /// The slot is left in the [`FutureSlotState::Gone`] state.
362 fn clear_task(&mut self, index: usize) {
363 self.tasks[index].clear();
364 }
365
366 /// Move a future back into its slot.
367 ///
368 /// # Panic
369 /// - If the underlying slot is not in the [`FutureSlotState::Polling`] state.
370 fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
371 self.tasks[index].park(future);
372 }
373
374 /// Track that a future caused a panic.
375 ///
376 /// This is only available for itest.
377 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
378 fn track_panic(&mut self, task_id: u64) {
379 self.panicked_tasks.insert(task_id);
380 }
381}
382
383trait WithRuntime {
384 fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R;
385 fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R;
386}
387
388impl WithRuntime for LocalKey<RefCell<Option<AsyncRuntime>>> {
389 fn with_runtime<R>(&'static self, f: impl FnOnce(&AsyncRuntime) -> R) -> R {
390 self.with_borrow(|rt| {
391 let rt_ref = rt.as_ref().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
392
393 f(rt_ref)
394 })
395 }
396
397 fn with_runtime_mut<R>(&'static self, f: impl FnOnce(&mut AsyncRuntime) -> R) -> R {
398 self.with_borrow_mut(|rt| {
399 let rt_ref = rt.as_mut().expect(ASYNC_RUNTIME_DEINIT_PANIC_MESSAGE);
400
401 f(rt_ref)
402 })
403 }
404}
405
406/// Use a godot waker to poll it's associated future.
407///
408/// # Panics
409/// - If called from a thread other than the main-thread.
410fn poll_future(godot_waker: Arc<GodotWaker>) {
411 let current_thread = thread::current().id();
412
413 assert_eq!(
414 godot_waker.thread_id,
415 current_thread,
416 "trying to poll future on a different thread!\n Current thread: {:?}\n Future thread: {:?}",
417 current_thread,
418 godot_waker.thread_id,
419 );
420
421 let waker = Waker::from(godot_waker.clone());
422 let mut ctx = Context::from_waker(&waker);
423
424 // Move future out of the runtime while we are polling it to avoid holding a mutable reference for the entire runtime.
425 let future = ASYNC_RUNTIME.with_runtime_mut(|rt| {
426 match rt.take_task_for_polling(godot_waker.runtime_index, godot_waker.task_id) {
427 FutureSlotState::Empty => {
428 panic!("Future slot is empty when waking it! This is a bug!");
429 }
430
431 FutureSlotState::Gone => None,
432
433 FutureSlotState::Polling => {
434 unreachable!("the same GodotWaker has been called recursively");
435 }
436
437 FutureSlotState::Pending(future) => Some(future),
438 }
439 });
440
441 let Some(future) = future else {
442 // Future has been canceled while the waker was already triggered.
443 return;
444 };
445
446 let error_context = || "Godot async task failed".to_string();
447
448 // If Future::poll() panics, the future is immediately dropped and cannot be accessed again,
449 // thus any state that may not have been unwind-safe cannot be observed later.
450 let mut future = AssertUnwindSafe(future);
451
452 let panic_result = handle_panic(error_context, move || {
453 (future.as_mut().poll(&mut ctx), future)
454 });
455
456 let Ok((poll_result, future)) = panic_result else {
457 // Polling the future caused a panic. The task state has to be cleaned up and we want track the panic if the trace feature is enabled.
458 ASYNC_RUNTIME.with_runtime_mut(|rt| {
459 #[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
460 rt.track_panic(godot_waker.task_id);
461 rt.clear_task(godot_waker.runtime_index);
462 });
463
464 return;
465 };
466
467 // Update the state of the Future in the runtime.
468 ASYNC_RUNTIME.with_runtime_mut(|rt| match poll_result {
469 // Future is still pending, so we park it again.
470 Poll::Pending => rt.park_task(godot_waker.runtime_index, future.0),
471
472 // Future has resolved, so we remove it from the runtime.
473 Poll::Ready(()) => rt.clear_task(godot_waker.runtime_index),
474 });
475}
476
477/// Implementation of a [`Waker`] to poll futures with the engine.
478struct GodotWaker {
479 runtime_index: usize,
480 task_id: u64,
481 thread_id: ThreadId,
482}
483
484impl GodotWaker {
485 fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
486 Self {
487 runtime_index: index,
488 thread_id,
489 task_id,
490 }
491 }
492}
493
494// Uses a deferred callable to poll the associated future, i.e. at the end of the current frame.
495impl Wake for GodotWaker {
496 fn wake(self: Arc<Self>) {
497 let mut waker = Some(self);
498
499 /// Enforce the passed closure is generic over its lifetime. The compiler gets confused about the livetime of the argument otherwise.
500 /// This appears to be a common issue: https://github.com/rust-lang/rust/issues/89976
501 fn callback_type_hint<F>(f: F) -> F
502 where
503 F: for<'a> FnMut(&'a [&Variant]) -> Result<Variant, ()>,
504 {
505 f
506 }
507
508 #[cfg(not(feature = "experimental-threads"))] #[cfg_attr(published_docs, doc(cfg(not(feature = "experimental-threads"))))]
509 let create_callable = Callable::from_local_fn;
510
511 #[cfg(feature = "experimental-threads")] #[cfg_attr(published_docs, doc(cfg(feature = "experimental-threads")))]
512 let create_callable = Callable::from_sync_fn;
513
514 let callable = create_callable(
515 "GodotWaker::wake",
516 callback_type_hint(move |_args| {
517 poll_future(waker.take().expect("Callable will never be called again"));
518 Ok(Variant::nil())
519 }),
520 );
521
522 // Schedule waker to poll the Future at the end of the frame.
523 callable.call_deferred(&[]);
524 }
525}