godot_core/task/
futures.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::global::godot_error;
19use crate::meta::sealed::Sealed;
20use crate::meta::InParamTuple;
21use crate::obj::{Gd, GodotClass, WithSignals};
22use crate::registry::signal::TypedSignal;
23use crate::sys;
24
25// ----------------------------------------------------------------------------------------------------------------------------------------------
26// Internal re-exports
27#[rustfmt::skip] // Do not reorder.
28pub(crate) use crate::impl_dynamic_send;
29
30/// The panicking counter part to the [`FallibleSignalFuture`].
31///
32/// This future works in the same way as `FallibleSignalFuture`, but panics when the signal object is freed, instead of resolving to a
33/// [`Result::Err`].
34///
35/// # Panics
36/// - If the signal object is freed before the signal has been emitted.
37/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
38/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
39///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
40pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
41
42impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
43    fn new(signal: Signal) -> Self {
44        Self(FallibleSignalFuture::new(signal))
45    }
46}
47
48impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
49    type Output = R;
50
51    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52        let poll_result = self.get_mut().0.poll(cx);
53
54        match poll_result {
55            Poll::Pending => Poll::Pending,
56            Poll::Ready(Ok(value)) => Poll::Ready(value),
57            Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
58                "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
59            ),
60        }
61    }
62}
63
64// Not derived, otherwise an extra bound `Output: Default` is required.
65struct SignalFutureData<T> {
66    state: SignalFutureState<T>,
67    waker: Option<Waker>,
68}
69
70impl<T> Default for SignalFutureData<T> {
71    fn default() -> Self {
72        Self {
73            state: Default::default(),
74            waker: None,
75        }
76    }
77}
78
79// Only public for itest.
80pub struct SignalFutureResolver<R: IntoDynamicSend> {
81    data: Arc<Mutex<SignalFutureData<R::Target>>>,
82}
83
84impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
85    fn clone(&self) -> Self {
86        Self {
87            data: self.data.clone(),
88        }
89    }
90}
91
92/// For itest to construct and test a resolver.
93#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
94pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
95    SignalFutureResolver {
96        data: Arc::new(Mutex::new(SignalFutureData::default())),
97    }
98}
99
100impl<R: IntoDynamicSend> SignalFutureResolver<R> {
101    fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
102        Self { data }
103    }
104}
105
106impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
107    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
108        state.write_usize(Arc::as_ptr(&self.data) as usize);
109    }
110}
111
112impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
113    fn eq(&self, other: &Self) -> bool {
114        Arc::ptr_eq(&self.data, &other.data)
115    }
116}
117
118impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
119    fn invoke(&mut self, args: &[&Variant]) -> Variant {
120        let waker = {
121            let mut data = self.data.lock().unwrap();
122            data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
123
124            // We no longer need the waker after we resolved. If the future is polled again, we'll also get a new waker.
125            data.waker.take()
126        };
127
128        if let Some(waker) = waker {
129            waker.wake();
130        }
131
132        Variant::nil()
133    }
134}
135
136impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
139    }
140}
141
142// This resolver will change the futures state when it's being dropped (i.e. the engine removes all connected signal callables). By marking
143// the future as dead we can resolve it to an error value the next time it gets polled.
144impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
145    fn drop(&mut self) {
146        let mut data = self.data.lock().unwrap();
147
148        if !matches!(data.state, SignalFutureState::Pending) {
149            // The future is no longer pending, so no clean up is required.
150            return;
151        }
152
153        // We mark the future as dead, so the next time it gets polled we can react to it's inability to resolve.
154        data.state = SignalFutureState::Dead;
155
156        // If we got a waker we trigger it to get the future polled. If there is no waker, then the future has not been polled yet and we
157        // simply wait for the runtime to perform the first poll.
158        if let Some(ref waker) = data.waker {
159            waker.wake_by_ref();
160        }
161    }
162}
163
164#[derive(Default)]
165enum SignalFutureState<T> {
166    #[default]
167    Pending,
168    Ready(T),
169    Dead,
170    Dropped,
171}
172
173impl<T> SignalFutureState<T> {
174    fn take(&mut self) -> Self {
175        let new_value = match self {
176            Self::Pending => Self::Pending,
177            Self::Ready(_) | Self::Dead => Self::Dead,
178            Self::Dropped => Self::Dropped,
179        };
180
181        std::mem::replace(self, new_value)
182    }
183}
184
185/// A future that tries to resolve as soon as the provided Godot signal was emitted.
186///
187/// The future might resolve to an error if the signal object is freed before the signal is emitted.
188///
189/// # Panics
190/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
191/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
192///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
193pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
194    data: Arc<Mutex<SignalFutureData<R::Target>>>,
195    callable: SignalFutureResolver<R>,
196    signal: Signal,
197}
198
199impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
200    fn new(signal: Signal) -> Self {
201        sys::strict_assert!(
202            !signal.is_null(),
203            "Failed to create future for invalid signal:\n\
204            Either the signal object was already freed, or it\n\
205            was not registered in the object before being used.",
206        );
207
208        let data = Arc::new(Mutex::new(SignalFutureData::default()));
209
210        // The callable currently requires that the return value is Sync + Send.
211        let callable = SignalFutureResolver::new(data.clone());
212
213        signal.connect_flags(
214            &Callable::from_custom(callable.clone()),
215            ConnectFlags::ONE_SHOT,
216        );
217
218        Self {
219            data,
220            callable,
221            signal,
222        }
223    }
224
225    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
226        let mut data = self.data.lock().unwrap();
227
228        data.waker.replace(cx.waker().clone());
229
230        let value = data.state.take();
231
232        // Drop the data mutex lock to prevent the mutext from getting poisoned by the potential later panic.
233        drop(data);
234
235        match value {
236            SignalFutureState::Pending => Poll::Pending,
237            SignalFutureState::Dropped => unreachable!(),
238            SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
239            SignalFutureState::Ready(value) => {
240                let Some(value) = DynamicSend::extract_if_safe(value) else {
241                    panic!("the awaited signal was not emitted on the main-thread, but contained a non Send argument");
242                };
243
244                Poll::Ready(Ok(value))
245            }
246        }
247    }
248}
249
250/// Error that might be returned  by the [`FallibleSignalFuture`].
251///
252/// This error is being resolved to when the signal object is freed before the awaited singal is emitted.
253#[derive(Debug)]
254pub struct FallibleSignalFutureError;
255
256impl Display for FallibleSignalFutureError {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        write!(
259            f,
260            "The signal object was freed before the awaited signal was emitted"
261        )
262    }
263}
264
265impl std::error::Error for FallibleSignalFutureError {}
266
267impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
268    type Output = Result<R, FallibleSignalFutureError>;
269
270    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271        self.get_mut().poll(cx)
272    }
273}
274
275impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
276    fn drop(&mut self) {
277        // The callable might alredy be destroyed, this occurs during engine shutdown.
278        if self.signal.is_null() {
279            return;
280        }
281
282        let mut data_lock = self.data.lock().unwrap();
283
284        data_lock.state = SignalFutureState::Dropped;
285
286        drop(data_lock);
287
288        // We create a new Godot Callable from our RustCallable so we get independent reference counting.
289        let gd_callable = Callable::from_custom(self.callable.clone());
290
291        // is_connected will return true if the signal was never emited before the future is dropped.
292        //
293        // There is a TOCTOU issue here that can occur when the FallibleSignalFuture is dropped at the same time as the signal object is
294        // freed on a different thread.
295        // We check in the beginning if the signal object is still alive, and we check here again, but the signal object still can be freed
296        // between our check and our usage of the object in `is_connected` and `disconnect`. The race condition will manifest in a
297        // non-unwinding panic that is hard to track down.
298        if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
299            self.signal.disconnect(&gd_callable);
300        }
301    }
302}
303
304impl Signal {
305    /// Creates a fallible future for this signal.
306    ///
307    /// The future will resolve the next time the signal is emitted.
308    /// See [`FallibleSignalFuture`] for details.
309    ///
310    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
311    /// the call to this function.
312    pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
313        FallibleSignalFuture::new(self.clone())
314    }
315
316    /// Creates a future for this signal.
317    ///
318    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
319    /// See [`SignalFuture`] for details.
320    ///
321    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
322    /// the call to this function.
323    pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
324        SignalFuture::new(self.clone())
325    }
326}
327
328impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
329    /// Creates a fallible future for this signal.
330    ///
331    /// The future will resolve the next time the signal is emitted.
332    /// See [`FallibleSignalFuture`] for details.
333    pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
334        FallibleSignalFuture::new(self.to_untyped())
335    }
336
337    /// Creates a future for this signal.
338    ///
339    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
340    /// See [`SignalFuture`] for details.
341    pub fn to_future(&self) -> SignalFuture<R> {
342        SignalFuture::new(self.to_untyped())
343    }
344}
345
346impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
347    type Output = R;
348
349    type IntoFuture = SignalFuture<R>;
350
351    fn into_future(self) -> Self::IntoFuture {
352        self.to_future()
353    }
354}
355
356/// Convert a value into a type that is [`Send`] at compile-time while the value might not be.
357///
358/// This allows to turn any implementor into a type that is `Send`, but requires to also implement [`DynamicSend`] as well.
359/// The later trait will verify if a value can actually be sent between threads at runtime.
360pub trait IntoDynamicSend: Sealed + 'static {
361    type Target: DynamicSend<Inner = Self>;
362
363    fn into_dynamic_send(self) -> Self::Target;
364}
365
366/// Runtime-checked `Send` capability.
367///
368/// Implemented for types that need a static `Send` bound, but where it is determined at runtime whether sending a value was
369/// actually safe. Only allows to extract the value if sending across threads is safe, thus fulfilling the `Send` supertrait.
370///
371/// # Safety
372/// The implementor has to guarantee that `extract_if_safe` returns `None`, if the value has been sent between threads while being `!Send`.
373///
374/// To uphold the `Send` supertrait guarantees, no public API apart from `extract_if_safe` must exist that would give access to the inner value from another thread.
375pub unsafe trait DynamicSend: Send + Sealed {
376    type Inner;
377
378    fn extract_if_safe(self) -> Option<Self::Inner>;
379}
380
381/// Value that can be sent across threads, but only accessed on its original thread.
382///
383/// When moved to another thread, the inner value can no longer be accessed and will be leaked when the `ThreadConfined` is dropped.
384pub struct ThreadConfined<T> {
385    value: Option<T>,
386    thread_id: ThreadId,
387}
388
389// SAFETY: This type can always be sent across threads, but the inner value can only be accessed on its original thread.
390unsafe impl<T> Send for ThreadConfined<T> {}
391
392impl<T> ThreadConfined<T> {
393    pub(crate) fn new(value: T) -> Self {
394        Self {
395            value: Some(value),
396            thread_id: std::thread::current().id(),
397        }
398    }
399
400    /// Retrieve the inner value, if the current thread is the one in which the `ThreadConfined` was created.
401    ///
402    /// If this fails, the value will be leaked immediately.
403    pub(crate) fn extract(mut self) -> Option<T> {
404        if self.is_original_thread() {
405            self.value.take()
406        } else {
407            None // causes Drop -> leak.
408        }
409    }
410
411    fn is_original_thread(&self) -> bool {
412        self.thread_id == std::thread::current().id()
413    }
414}
415
416impl<T> Drop for ThreadConfined<T> {
417    fn drop(&mut self) {
418        if !self.is_original_thread() {
419            std::mem::forget(self.value.take());
420
421            // Cannot panic, potentially during unwind already.
422            godot_error!(
423                "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
424            );
425        }
426    }
427}
428
429unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
430    type Inner = Gd<T>;
431
432    fn extract_if_safe(self) -> Option<Self::Inner> {
433        self.extract()
434    }
435}
436
437impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
438
439impl<T: GodotClass> IntoDynamicSend for Gd<T> {
440    type Target = ThreadConfined<Self>;
441
442    fn into_dynamic_send(self) -> Self::Target {
443        ThreadConfined::new(self)
444    }
445}
446
447// ----------------------------------------------------------------------------------------------------------------------------------------------
448// Generated impls
449
450#[macro_export(local_inner_macros)]
451macro_rules! impl_dynamic_send {
452    (Send; $($ty:ty),+) => {
453        $(
454            unsafe impl $crate::task::DynamicSend for $ty {
455                type Inner = Self;
456
457                fn extract_if_safe(self) -> Option<Self::Inner> {
458                    Some(self)
459                }
460            }
461
462            impl $crate::task::IntoDynamicSend for $ty {
463                type Target = Self;
464                fn into_dynamic_send(self) -> Self::Target {
465                    self
466                }
467            }
468        )+
469    };
470
471    (tuple; $($arg:ident: $ty:ident),*) => {
472        unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
473            type Inner = ($($ty::Inner,)*);
474
475            fn extract_if_safe(self) -> Option<Self::Inner> {
476                #[allow(non_snake_case)]
477                let ($($arg,)*) = self;
478
479                #[allow(clippy::unused_unit)]
480                match ($($arg.extract_if_safe(),)*) {
481                    ($(Some($arg),)*) => Some(($($arg,)*)),
482
483                    #[allow(unreachable_patterns)]
484                    _ => None,
485                }
486            }
487        }
488
489        impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
490            type Target = ($($ty::Target,)*);
491
492            fn into_dynamic_send(self) -> Self::Target {
493                #[allow(non_snake_case)]
494                let ($($arg,)*) = self;
495
496                #[allow(clippy::unused_unit)]
497                ($($arg.into_dynamic_send(),)*)
498            }
499        }
500    };
501
502    (!Send; $($ty:ident),+) => {
503        $(
504            impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
505
506            unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
507                type Inner = $crate::builtin::$ty;
508
509                fn extract_if_safe(self) -> Option<Self::Inner> {
510                    self.extract()
511                }
512            }
513
514            impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
515                type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
516
517                fn into_dynamic_send(self) -> Self::Target {
518                    $crate::task::ThreadConfined::new(self)
519                }
520            }
521        )+
522    };
523}
524
525#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
526mod tests {
527    use std::sync::atomic::{AtomicUsize, Ordering};
528    use std::sync::Arc;
529    use std::thread;
530
531    use super::{SignalFutureResolver, ThreadConfined};
532    use crate::classes::Object;
533    use crate::obj::Gd;
534    use crate::sys;
535
536    /// Test that the hash of a cloned future resolver is equal to its original version. With this equality in place, we can create new
537    /// Callables that are equal to their original version but have separate reference counting.
538    #[test]
539    fn future_resolver_cloned_hash() {
540        let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
541        let resolver_b = resolver_a.clone();
542
543        let hash_a = sys::hash_value(&resolver_a);
544        let hash_b = sys::hash_value(&resolver_b);
545
546        assert_eq!(hash_a, hash_b);
547    }
548
549    // Test that dropping ThreadConfined<T> on another thread leaks the inner value.
550    #[test]
551    fn thread_confined_extract() {
552        let confined = ThreadConfined::new(772);
553        assert_eq!(confined.extract(), Some(772));
554
555        let confined = ThreadConfined::new(772);
556
557        let handle = thread::spawn(move || {
558            assert!(confined.extract().is_none());
559        });
560        handle.join().unwrap();
561    }
562
563    #[test]
564    fn thread_confined_leak_on_other_thread() {
565        static COUNTER: AtomicUsize = AtomicUsize::new(0);
566
567        struct DropCounter;
568        impl Drop for DropCounter {
569            fn drop(&mut self) {
570                COUNTER.fetch_add(1, Ordering::SeqCst);
571            }
572        }
573
574        let drop_counter = DropCounter;
575        let confined = ThreadConfined::new(drop_counter);
576
577        let handle = thread::spawn(move || drop(confined));
578        handle.join().unwrap();
579
580        // The counter should still be 0, meaning Drop was not called (leaked).
581        assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
582    }
583}