godot_core/task/
futures.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::meta::sealed::Sealed;
19use crate::meta::InParamTuple;
20use crate::obj::{EngineBitfield, Gd, GodotClass, WithSignals};
21use crate::registry::signal::TypedSignal;
22
23pub(crate) use crate::impl_dynamic_send;
24
25/// The panicking counter part to the [`FallibleSignalFuture`].
26///
27/// This future works in the same way as `FallibleSignalFuture`, but panics when the signal object is freed, instead of resolving to a
28/// [`Result::Err`].
29///
30/// # Panics
31/// - If the signal object is freed before the signal has been emitted.
32/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
33/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
34///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
35pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
36
37impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
38    fn new(signal: Signal) -> Self {
39        Self(FallibleSignalFuture::new(signal))
40    }
41}
42
43impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
44    type Output = R;
45
46    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        let poll_result = self.get_mut().0.poll(cx);
48
49        match poll_result {
50            Poll::Pending => Poll::Pending,
51            Poll::Ready(Ok(value)) => Poll::Ready(value),
52            Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
53                "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
54            ),
55        }
56    }
57}
58
59// Not derived, otherwise an extra bound `Output: Default` is required.
60struct SignalFutureData<T> {
61    state: SignalFutureState<T>,
62    waker: Option<Waker>,
63}
64
65impl<T> Default for SignalFutureData<T> {
66    fn default() -> Self {
67        Self {
68            state: Default::default(),
69            waker: None,
70        }
71    }
72}
73
74// Only public for itest.
75pub struct SignalFutureResolver<R: IntoDynamicSend> {
76    data: Arc<Mutex<SignalFutureData<R::Target>>>,
77}
78
79impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
80    fn clone(&self) -> Self {
81        Self {
82            data: self.data.clone(),
83        }
84    }
85}
86
87/// For itest to construct and test a resolver.
88#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
89pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
90    SignalFutureResolver {
91        data: Arc::new(Mutex::new(SignalFutureData::default())),
92    }
93}
94
95impl<R: IntoDynamicSend> SignalFutureResolver<R> {
96    fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
97        Self { data }
98    }
99}
100
101impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
102    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
103        state.write_usize(Arc::as_ptr(&self.data) as usize);
104    }
105}
106
107impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
108    fn eq(&self, other: &Self) -> bool {
109        Arc::ptr_eq(&self.data, &other.data)
110    }
111}
112
113impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
114    fn invoke(&mut self, args: &[&Variant]) -> Result<Variant, ()> {
115        let waker = {
116            let mut data = self.data.lock().unwrap();
117            data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
118
119            // We no longer need the waker after we resolved. If the future is polled again, we'll also get a new waker.
120            data.waker.take()
121        };
122
123        if let Some(waker) = waker {
124            waker.wake();
125        }
126
127        Ok(Variant::nil())
128    }
129}
130
131impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
134    }
135}
136
137// This resolver will change the futures state when it's being dropped (i.e. the engine removes all connected signal callables). By marking
138// the future as dead we can resolve it to an error value the next time it gets polled.
139impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
140    fn drop(&mut self) {
141        let mut data = self.data.lock().unwrap();
142
143        if !matches!(data.state, SignalFutureState::Pending) {
144            // The future is no longer pending, so no clean up is required.
145            return;
146        }
147
148        // We mark the future as dead, so the next time it gets polled we can react to it's inability to resolve.
149        data.state = SignalFutureState::Dead;
150
151        // If we got a waker we trigger it to get the future polled. If there is no waker, then the future has not been polled yet and we
152        // simply wait for the runtime to perform the first poll.
153        if let Some(ref waker) = data.waker {
154            waker.wake_by_ref();
155        }
156    }
157}
158
159#[derive(Default)]
160enum SignalFutureState<T> {
161    #[default]
162    Pending,
163    Ready(T),
164    Dead,
165    Dropped,
166}
167
168impl<T> SignalFutureState<T> {
169    fn take(&mut self) -> Self {
170        let new_value = match self {
171            Self::Pending => Self::Pending,
172            Self::Ready(_) | Self::Dead => Self::Dead,
173            Self::Dropped => Self::Dropped,
174        };
175
176        std::mem::replace(self, new_value)
177    }
178}
179
180/// A future that tries to resolve as soon as the provided Godot signal was emitted.
181///
182/// The future might resolve to an error if the signal object is freed before the signal is emitted.
183///
184/// # Panics
185/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
186/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
187///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
188pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
189    data: Arc<Mutex<SignalFutureData<R::Target>>>,
190    callable: SignalFutureResolver<R>,
191    signal: Signal,
192}
193
194impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
195    fn new(signal: Signal) -> Self {
196        debug_assert!(
197            !signal.is_null(),
198            "Failed to create a future for an invalid Signal!\nEither the signal object was already freed or the signal was not registered in the object before using it.",
199        );
200
201        let data = Arc::new(Mutex::new(SignalFutureData::default()));
202
203        // The callable currently requires that the return value is Sync + Send.
204        let callable = SignalFutureResolver::new(data.clone());
205
206        signal.connect(
207            &Callable::from_custom(callable.clone()),
208            ConnectFlags::ONE_SHOT.ord() as i64,
209        );
210
211        Self {
212            data,
213            callable,
214            signal,
215        }
216    }
217
218    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
219        let mut data = self.data.lock().unwrap();
220
221        data.waker.replace(cx.waker().clone());
222
223        let value = data.state.take();
224
225        // Drop the data mutex lock to prevent the mutext from getting poisoned by the potential later panic.
226        drop(data);
227
228        match value {
229            SignalFutureState::Pending => Poll::Pending,
230            SignalFutureState::Dropped => unreachable!(),
231            SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
232            SignalFutureState::Ready(value) => {
233                let Some(value) = DynamicSend::extract_if_safe(value) else {
234                    panic!("the awaited signal was not emitted on the main-thread, but contained a non Send argument");
235                };
236
237                Poll::Ready(Ok(value))
238            }
239        }
240    }
241}
242
243/// Error that might be returned  by the [`FallibleSignalFuture`].
244///
245/// This error is being resolved to when the signal object is freed before the awaited singal is emitted.
246#[derive(Debug)]
247pub struct FallibleSignalFutureError;
248
249impl Display for FallibleSignalFutureError {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        write!(
252            f,
253            "The signal object was freed before the awaited signal was emitted"
254        )
255    }
256}
257
258impl std::error::Error for FallibleSignalFutureError {}
259
260impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
261    type Output = Result<R, FallibleSignalFutureError>;
262
263    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
264        self.get_mut().poll(cx)
265    }
266}
267
268impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
269    fn drop(&mut self) {
270        // The callable might alredy be destroyed, this occurs during engine shutdown.
271        if self.signal.is_null() {
272            return;
273        }
274
275        let mut data_lock = self.data.lock().unwrap();
276
277        data_lock.state = SignalFutureState::Dropped;
278
279        drop(data_lock);
280
281        // We create a new Godot Callable from our RustCallable so we get independent reference counting.
282        let gd_callable = Callable::from_custom(self.callable.clone());
283
284        // is_connected will return true if the signal was never emited before the future is dropped.
285        //
286        // There is a TOCTOU issue here that can occur when the FallibleSignalFuture is dropped at the same time as the signal object is
287        // freed on a different thread.
288        // We check in the beginning if the signal object is still alive, and we check here again, but the signal object still can be freed
289        // between our check and our usage of the object in `is_connected` and `disconnect`. The race condition will manifest in a
290        // non-unwinding panic that is hard to track down.
291        if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
292            self.signal.disconnect(&gd_callable);
293        }
294    }
295}
296
297impl Signal {
298    /// Creates a fallible future for this signal.
299    ///
300    /// The future will resolve the next time the signal is emitted.
301    /// See [`FallibleSignalFuture`] for details.
302    ///
303    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
304    /// the call to this function.
305    pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
306        FallibleSignalFuture::new(self.clone())
307    }
308
309    /// Creates a future for this signal.
310    ///
311    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
312    /// See [`SignalFuture`] for details.
313    ///
314    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
315    /// the call to this function.
316    pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
317        SignalFuture::new(self.clone())
318    }
319}
320
321impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
322    /// Creates a fallible future for this signal.
323    ///
324    /// The future will resolve the next time the signal is emitted.
325    /// See [`FallibleSignalFuture`] for details.
326    pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
327        FallibleSignalFuture::new(self.to_untyped())
328    }
329
330    /// Creates a future for this signal.
331    ///
332    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
333    /// See [`SignalFuture`] for details.
334    pub fn to_future(&self) -> SignalFuture<R> {
335        SignalFuture::new(self.to_untyped())
336    }
337}
338
339impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
340    type Output = R;
341
342    type IntoFuture = SignalFuture<R>;
343
344    fn into_future(self) -> Self::IntoFuture {
345        self.to_future()
346    }
347}
348
349/// Convert a value into a type that is [`Send`] at compile-time while the value might not be.
350///
351/// This allows to turn any implementor into a type that is `Send`, but requires to also implement [`DynamicSend`] as well.
352/// The later trait will verify if a value can actually be sent between threads at runtime.
353pub trait IntoDynamicSend: Sealed + 'static {
354    type Target: DynamicSend<Inner = Self>;
355
356    fn into_dynamic_send(self) -> Self::Target;
357}
358
359/// Runtime-checked `Send` capability.
360///
361/// Implemented for types that need a static `Send` bound, but where it is determined at runtime whether sending a value was
362/// actually safe. Only allows to extract the value if sending across threads is safe, thus fulfilling the `Send` supertrait.
363///
364/// # Safety
365/// The implementor has to guarantee that `extract_if_safe` returns `None`, if the value has been sent between threads while being `!Send`.
366///
367/// To uphold the `Send` supertrait guarantees, no public API apart from `extract_if_safe` must exist that would give access to the inner value from another thread.
368pub unsafe trait DynamicSend: Send + Sealed {
369    type Inner;
370
371    fn extract_if_safe(self) -> Option<Self::Inner>;
372}
373
374/// Value that can be sent across threads, but only accessed on its original thread.
375pub struct ThreadConfined<T> {
376    value: T,
377    thread_id: ThreadId,
378}
379
380// SAFETY: This type can always be sent across threads, but the inner value can only be accessed on its original thread.
381unsafe impl<T> Send for ThreadConfined<T> {}
382
383impl<T> ThreadConfined<T> {
384    pub(crate) fn new(value: T) -> Self {
385        Self {
386            value,
387            thread_id: std::thread::current().id(),
388        }
389    }
390
391    pub(crate) fn extract(self) -> Option<T> {
392        (self.thread_id == std::thread::current().id()).then_some(self.value)
393    }
394}
395
396unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
397    type Inner = Gd<T>;
398
399    fn extract_if_safe(self) -> Option<Self::Inner> {
400        self.extract()
401    }
402}
403
404impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
405
406impl<T: GodotClass> IntoDynamicSend for Gd<T> {
407    type Target = ThreadConfined<Self>;
408
409    fn into_dynamic_send(self) -> Self::Target {
410        ThreadConfined::new(self)
411    }
412}
413
414// ----------------------------------------------------------------------------------------------------------------------------------------------
415// Generated impls
416
417#[macro_export(local_inner_macros)]
418macro_rules! impl_dynamic_send {
419    (Send; $($ty:ty),+) => {
420        $(
421            unsafe impl $crate::task::DynamicSend for $ty {
422                type Inner = Self;
423
424                fn extract_if_safe(self) -> Option<Self::Inner> {
425                    Some(self)
426                }
427            }
428
429            impl $crate::task::IntoDynamicSend for $ty {
430                type Target = Self;
431                fn into_dynamic_send(self) -> Self::Target {
432                    self
433                }
434            }
435        )+
436    };
437
438    (Send; builtin::{$($ty:ident),+}) => {
439        impl_dynamic_send!(Send; $($crate::builtin::$ty),+);
440    };
441
442    (tuple; $($arg:ident: $ty:ident),*) => {
443        unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
444            type Inner = ($($ty::Inner,)*);
445
446            fn extract_if_safe(self) -> Option<Self::Inner> {
447                #[allow(non_snake_case)]
448                let ($($arg,)*) = self;
449
450                #[allow(clippy::unused_unit)]
451                match ($($arg.extract_if_safe(),)*) {
452                    ($(Some($arg),)*) => Some(($($arg,)*)),
453
454                    #[allow(unreachable_patterns)]
455                    _ => None,
456                }
457            }
458        }
459
460        impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
461            type Target = ($($ty::Target,)*);
462
463            fn into_dynamic_send(self) -> Self::Target {
464                #[allow(non_snake_case)]
465                let ($($arg,)*) = self;
466
467                #[allow(clippy::unused_unit)]
468                ($($arg.into_dynamic_send(),)*)
469            }
470        }
471    };
472
473    (!Send; $($ty:ident),+) => {
474        $(
475            impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
476
477            unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
478                type Inner = $crate::builtin::$ty;
479
480                fn extract_if_safe(self) -> Option<Self::Inner> {
481                    self.extract()
482                }
483            }
484
485            impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
486                type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
487
488                fn into_dynamic_send(self) -> Self::Target {
489                    $crate::task::ThreadConfined::new(self)
490                }
491            }
492        )+
493    };
494}
495
496#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
497mod tests {
498    use std::sync::Arc;
499
500    use crate::classes::Object;
501    use crate::obj::Gd;
502    use crate::sys;
503
504    use super::SignalFutureResolver;
505
506    /// Test that the hash of a cloned future resolver is equal to its original version. With this equality in place, we can create new
507    /// Callables that are equal to their original version but have separate reference counting.
508    #[test]
509    fn future_resolver_cloned_hash() {
510        let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
511        let resolver_b = resolver_a.clone();
512
513        let hash_a = sys::hash_value(&resolver_a);
514        let hash_b = sys::hash_value(&resolver_b);
515
516        assert_eq!(hash_a, hash_b);
517    }
518}