Skip to main content

godot_core/task/
futures.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::global::godot_error;
19use crate::meta::InParamTuple;
20use crate::meta::sealed::Sealed;
21use crate::obj::signal::TypedSignal;
22use crate::obj::{Gd, GodotClass, WithSignals};
23use crate::sys;
24
25// ----------------------------------------------------------------------------------------------------------------------------------------------
26// Internal re-exports
27#[rustfmt::skip] // Do not reorder.
28pub(crate) use crate::impl_dynamic_send;
29
30/// The panicking counter part to the [`FallibleSignalFuture`].
31///
32/// This future works in the same way as `FallibleSignalFuture`, but panics when the signal object is freed, instead of resolving to a
33/// [`Result::Err`].
34///
35/// # Panics
36/// - If the signal object is freed before the signal has been emitted.
37/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
38/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
39///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
40pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
41
42impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
43    fn new(signal: Signal) -> Self {
44        Self(FallibleSignalFuture::new(signal))
45    }
46}
47
48impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
49    type Output = R;
50
51    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52        let poll_result = self.get_mut().0.poll(cx);
53
54        match poll_result {
55            Poll::Pending => Poll::Pending,
56            Poll::Ready(Ok(value)) => Poll::Ready(value),
57            Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
58                "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
59            ),
60        }
61    }
62}
63
64// Not derived, otherwise an extra bound `Output: Default` is required.
65struct SignalFutureData<T> {
66    state: SignalFutureState<T>,
67    waker: Option<Waker>,
68}
69
70impl<T> Default for SignalFutureData<T> {
71    fn default() -> Self {
72        Self {
73            state: Default::default(),
74            waker: None,
75        }
76    }
77}
78
79// Only public for itest.
80pub struct SignalFutureResolver<R: IntoDynamicSend> {
81    data: Arc<Mutex<SignalFutureData<R::Target>>>,
82}
83
84impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
85    fn clone(&self) -> Self {
86        Self {
87            data: self.data.clone(),
88        }
89    }
90}
91
92/// For itest to construct and test a resolver.
93#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
94pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
95    SignalFutureResolver {
96        data: Arc::new(Mutex::new(SignalFutureData::default())),
97    }
98}
99
100impl<R: IntoDynamicSend> SignalFutureResolver<R> {
101    fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
102        Self { data }
103    }
104}
105
106impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
107    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
108        state.write_usize(Arc::as_ptr(&self.data) as usize);
109    }
110}
111
112impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
113    fn eq(&self, other: &Self) -> bool {
114        Arc::ptr_eq(&self.data, &other.data)
115    }
116}
117
118impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
119    fn invoke(&mut self, args: &[&Variant]) -> Variant {
120        let waker = {
121            let mut data = self.data.lock().unwrap();
122            data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
123
124            // We no longer need the waker after we resolved. If the future is polled again, we'll also get a new waker.
125            data.waker.take()
126        };
127
128        if let Some(waker) = waker {
129            waker.wake();
130        }
131
132        Variant::nil()
133    }
134}
135
136impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
139    }
140}
141
142// This resolver will change the futures state when it's being dropped (i.e. the engine removes all connected signal callables). By marking
143// the future as dead we can resolve it to an error value the next time it gets polled.
144impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
145    fn drop(&mut self) {
146        let mut data = self.data.lock().unwrap();
147
148        if !matches!(data.state, SignalFutureState::Pending) {
149            // The future is no longer pending, so no clean up is required.
150            return;
151        }
152
153        // We mark the future as dead, so the next time it gets polled we can react to it's inability to resolve.
154        data.state = SignalFutureState::Dead;
155
156        // If we got a waker we trigger it to get the future polled. If there is no waker, then the future has not been polled yet and we
157        // simply wait for the runtime to perform the first poll.
158        if let Some(ref waker) = data.waker {
159            waker.wake_by_ref();
160        }
161    }
162}
163
164#[derive(Default)]
165enum SignalFutureState<T> {
166    #[default]
167    Pending,
168    Ready(T),
169    Dead,
170    Dropped,
171}
172
173impl<T> SignalFutureState<T> {
174    fn take(&mut self) -> Self {
175        let new_value = match self {
176            Self::Pending => Self::Pending,
177            Self::Ready(_) | Self::Dead => Self::Dead,
178            Self::Dropped => Self::Dropped,
179        };
180
181        std::mem::replace(self, new_value)
182    }
183}
184
185/// A future that tries to resolve as soon as the provided Godot signal was emitted.
186///
187/// The future might resolve to an error if the signal object is freed before the signal is emitted.
188///
189/// # Panics
190/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
191/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
192///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
193pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
194    data: Arc<Mutex<SignalFutureData<R::Target>>>,
195    callable: SignalFutureResolver<R>,
196    signal: Signal,
197}
198
199impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
200    fn new(signal: Signal) -> Self {
201        sys::strict_assert!(
202            !signal.is_null(),
203            "Failed to create future for invalid signal:\n\
204            Either the signal object was already freed, or it\n\
205            was not registered in the object before being used.",
206        );
207
208        let data = Arc::new(Mutex::new(SignalFutureData::default()));
209
210        // The callable currently requires that the return value is Sync + Send.
211        let callable = SignalFutureResolver::new(data.clone());
212
213        signal.connect_flags(
214            &Callable::from_custom(callable.clone()),
215            ConnectFlags::ONE_SHOT,
216        );
217
218        Self {
219            data,
220            callable,
221            signal,
222        }
223    }
224
225    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
226        let mut data = self.data.lock().unwrap();
227
228        data.waker.replace(cx.waker().clone());
229
230        let value = data.state.take();
231
232        // Drop the data mutex lock to prevent the mutext from getting poisoned by the potential later panic.
233        drop(data);
234
235        match value {
236            SignalFutureState::Pending => Poll::Pending,
237            SignalFutureState::Dropped => unreachable!(),
238            SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
239            SignalFutureState::Ready(value) => {
240                let Some(value) = DynamicSend::extract_if_safe(value) else {
241                    panic!(
242                        "the awaited signal was not emitted on the main-thread, but contained a non Send argument"
243                    );
244                };
245
246                Poll::Ready(Ok(value))
247            }
248        }
249    }
250}
251
252/// Error that might be returned  by the [`FallibleSignalFuture`].
253///
254/// This error is being resolved to when the signal object is freed before the awaited singal is emitted.
255#[derive(Debug)]
256pub struct FallibleSignalFutureError;
257
258impl Display for FallibleSignalFutureError {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(
261            f,
262            "The signal object was freed before the awaited signal was emitted"
263        )
264    }
265}
266
267impl std::error::Error for FallibleSignalFutureError {}
268
269impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
270    type Output = Result<R, FallibleSignalFutureError>;
271
272    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273        self.get_mut().poll(cx)
274    }
275}
276
277impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
278    fn drop(&mut self) {
279        // The callable might alredy be destroyed, this occurs during engine shutdown.
280        if self.signal.is_null() {
281            return;
282        }
283
284        let mut data_lock = self.data.lock().unwrap();
285
286        data_lock.state = SignalFutureState::Dropped;
287
288        drop(data_lock);
289
290        // We create a new Godot Callable from our RustCallable so we get independent reference counting.
291        let gd_callable = Callable::from_custom(self.callable.clone());
292
293        // is_connected will return true if the signal was never emited before the future is dropped.
294        //
295        // There is a TOCTOU issue here that can occur when the FallibleSignalFuture is dropped at the same time as the signal object is
296        // freed on a different thread.
297        // We check in the beginning if the signal object is still alive, and we check here again, but the signal object still can be freed
298        // between our check and our usage of the object in `is_connected` and `disconnect`. The race condition will manifest in a
299        // non-unwinding panic that is hard to track down.
300        if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
301            self.signal.disconnect(&gd_callable);
302        }
303    }
304}
305
306impl Signal {
307    /// Creates a fallible future for this signal.
308    ///
309    /// The future will resolve the next time the signal is emitted.
310    /// See [`FallibleSignalFuture`] for details.
311    ///
312    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
313    /// the call to this function.
314    pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
315        FallibleSignalFuture::new(self.clone())
316    }
317
318    /// Creates a future for this signal.
319    ///
320    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
321    /// See [`SignalFuture`] for details.
322    ///
323    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
324    /// the call to this function.
325    pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
326        SignalFuture::new(self.clone())
327    }
328}
329
330impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
331    /// Creates a fallible future for this signal.
332    ///
333    /// The future will resolve the next time the signal is emitted.
334    /// See [`FallibleSignalFuture`] for details.
335    pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
336        FallibleSignalFuture::new(self.to_untyped())
337    }
338
339    /// Creates a future for this signal.
340    ///
341    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
342    /// See [`SignalFuture`] for details.
343    pub fn to_future(&self) -> SignalFuture<R> {
344        SignalFuture::new(self.to_untyped())
345    }
346}
347
348impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
349    type Output = R;
350
351    type IntoFuture = SignalFuture<R>;
352
353    fn into_future(self) -> Self::IntoFuture {
354        self.to_future()
355    }
356}
357
358/// Convert a value into a type that is [`Send`] at compile-time while the value might not be.
359///
360/// This allows to turn any implementor into a type that is `Send`, but requires to also implement [`DynamicSend`] as well.
361/// The later trait will verify if a value can actually be sent between threads at runtime.
362pub trait IntoDynamicSend: Sealed + 'static {
363    type Target: DynamicSend<Inner = Self>;
364
365    fn into_dynamic_send(self) -> Self::Target;
366}
367
368/// Runtime-checked `Send` capability.
369///
370/// Implemented for types that need a static `Send` bound, but where it is determined at runtime whether sending a value was
371/// actually safe. Only allows to extract the value if sending across threads is safe, thus fulfilling the `Send` supertrait.
372///
373/// # Safety
374/// The implementor has to guarantee that `extract_if_safe` returns `None`, if the value has been sent between threads while being `!Send`.
375///
376/// To uphold the `Send` supertrait guarantees, no public API apart from `extract_if_safe` must exist that would give access to the inner value from another thread.
377pub unsafe trait DynamicSend: Send + Sealed {
378    type Inner;
379
380    fn extract_if_safe(self) -> Option<Self::Inner>;
381}
382
383/// Value that can be sent across threads, but only accessed on its original thread.
384///
385/// When moved to another thread, the inner value can no longer be accessed and will be leaked when the `ThreadConfined` is dropped.
386pub struct ThreadConfined<T> {
387    value: Option<T>,
388    thread_id: ThreadId,
389}
390
391// SAFETY: This type can always be sent across threads, but the inner value can only be accessed on its original thread.
392unsafe impl<T> Send for ThreadConfined<T> {}
393
394impl<T> ThreadConfined<T> {
395    pub(crate) fn new(value: T) -> Self {
396        Self {
397            value: Some(value),
398            thread_id: std::thread::current().id(),
399        }
400    }
401
402    /// Retrieve the inner value, if the current thread is the one in which the `ThreadConfined` was created.
403    ///
404    /// If this fails, the value will be leaked immediately.
405    pub(crate) fn extract(mut self) -> Option<T> {
406        if self.is_original_thread() {
407            self.value.take()
408        } else {
409            None // causes Drop -> leak.
410        }
411    }
412
413    fn is_original_thread(&self) -> bool {
414        self.thread_id == std::thread::current().id()
415    }
416}
417
418impl<T> Drop for ThreadConfined<T> {
419    fn drop(&mut self) {
420        if !self.is_original_thread() {
421            std::mem::forget(self.value.take());
422
423            // Cannot panic, potentially during unwind already.
424            godot_error!(
425                "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
426            );
427        }
428    }
429}
430
431unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
432    type Inner = Gd<T>;
433
434    fn extract_if_safe(self) -> Option<Self::Inner> {
435        self.extract()
436    }
437}
438
439impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
440
441impl<T: GodotClass> IntoDynamicSend for Gd<T> {
442    type Target = ThreadConfined<Self>;
443
444    fn into_dynamic_send(self) -> Self::Target {
445        ThreadConfined::new(self)
446    }
447}
448
449// ----------------------------------------------------------------------------------------------------------------------------------------------
450// Generated impls
451
452#[macro_export(local_inner_macros)]
453macro_rules! impl_dynamic_send {
454    (Send; $($ty:ty),+) => {
455        $(
456            unsafe impl $crate::task::DynamicSend for $ty {
457                type Inner = Self;
458
459                fn extract_if_safe(self) -> Option<Self::Inner> {
460                    Some(self)
461                }
462            }
463
464            impl $crate::task::IntoDynamicSend for $ty {
465                type Target = Self;
466                fn into_dynamic_send(self) -> Self::Target {
467                    self
468                }
469            }
470        )+
471    };
472
473    (tuple; $($arg:ident: $ty:ident),*) => {
474        unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
475            type Inner = ($($ty::Inner,)*);
476
477            fn extract_if_safe(self) -> Option<Self::Inner> {
478                #[allow(non_snake_case)]
479                let ($($arg,)*) = self;
480
481                #[allow(clippy::unused_unit)]
482                match ($($arg.extract_if_safe(),)*) {
483                    ($(Some($arg),)*) => Some(($($arg,)*)),
484
485                    #[allow(unreachable_patterns)]
486                    _ => None,
487                }
488            }
489        }
490
491        impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
492            type Target = ($($ty::Target,)*);
493
494            fn into_dynamic_send(self) -> Self::Target {
495                #[allow(non_snake_case)]
496                let ($($arg,)*) = self;
497
498                #[allow(clippy::unused_unit)]
499                ($($arg.into_dynamic_send(),)*)
500            }
501        }
502    };
503
504    (!Send; $($ty:ident),+) => {
505        $(
506            impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
507
508            unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
509                type Inner = $crate::builtin::$ty;
510
511                fn extract_if_safe(self) -> Option<Self::Inner> {
512                    self.extract()
513                }
514            }
515
516            impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
517                type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
518
519                fn into_dynamic_send(self) -> Self::Target {
520                    $crate::task::ThreadConfined::new(self)
521                }
522            }
523        )+
524    };
525}
526
527#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
528mod tests {
529    use std::sync::Arc;
530    use std::sync::atomic::{AtomicUsize, Ordering};
531    use std::thread;
532
533    use super::{SignalFutureResolver, ThreadConfined};
534    use crate::classes::Object;
535    use crate::obj::Gd;
536    use crate::sys;
537
538    /// Test that the hash of a cloned future resolver is equal to its original version. With this equality in place, we can create new
539    /// Callables that are equal to their original version but have separate reference counting.
540    #[test]
541    fn future_resolver_cloned_hash() {
542        let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
543        let resolver_b = resolver_a.clone();
544
545        let hash_a = sys::hash_value(&resolver_a);
546        let hash_b = sys::hash_value(&resolver_b);
547
548        assert_eq!(hash_a, hash_b);
549    }
550
551    // Test that dropping ThreadConfined<T> on another thread leaks the inner value.
552    #[test]
553    #[cfg_attr(
554        all(target_family = "wasm", not(target_feature = "atomics")),
555        ignore = "Threading not available"
556    )]
557    fn thread_confined_extract() {
558        let confined = ThreadConfined::new(772);
559        assert_eq!(confined.extract(), Some(772));
560
561        let confined = ThreadConfined::new(772);
562
563        let handle = thread::spawn(move || {
564            assert!(confined.extract().is_none());
565        });
566        handle.join().unwrap();
567    }
568
569    #[test]
570    #[cfg_attr(
571        all(target_family = "wasm", not(target_feature = "atomics")),
572        ignore = "Threading not available"
573    )]
574    fn thread_confined_leak_on_other_thread() {
575        static COUNTER: AtomicUsize = AtomicUsize::new(0);
576
577        struct DropCounter;
578        impl Drop for DropCounter {
579            fn drop(&mut self) {
580                COUNTER.fetch_add(1, Ordering::SeqCst);
581            }
582        }
583
584        let drop_counter = DropCounter;
585        let confined = ThreadConfined::new(drop_counter);
586
587        let handle = thread::spawn(move || drop(confined));
588        handle.join().unwrap();
589
590        // The counter should still be 0, meaning Drop was not called (leaked).
591        assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
592    }
593}