godot_core/task/
futures.rs

1/*
2 * Copyright (c) godot-rust; Bromeon and contributors.
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at https://mozilla.org/MPL/2.0/.
6 */
7
8use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::godot_error;
19use crate::meta::sealed::Sealed;
20use crate::meta::InParamTuple;
21use crate::obj::{Gd, GodotClass, WithSignals};
22use crate::registry::signal::TypedSignal;
23
24// ----------------------------------------------------------------------------------------------------------------------------------------------
25// Internal re-exports
26#[rustfmt::skip] // Do not reorder.
27pub(crate) use crate::impl_dynamic_send;
28
29/// The panicking counter part to the [`FallibleSignalFuture`].
30///
31/// This future works in the same way as `FallibleSignalFuture`, but panics when the signal object is freed, instead of resolving to a
32/// [`Result::Err`].
33///
34/// # Panics
35/// - If the signal object is freed before the signal has been emitted.
36/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
37/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
38///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
39pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
40
41impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
42    fn new(signal: Signal) -> Self {
43        Self(FallibleSignalFuture::new(signal))
44    }
45}
46
47impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
48    type Output = R;
49
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        let poll_result = self.get_mut().0.poll(cx);
52
53        match poll_result {
54            Poll::Pending => Poll::Pending,
55            Poll::Ready(Ok(value)) => Poll::Ready(value),
56            Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
57                "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
58            ),
59        }
60    }
61}
62
63// Not derived, otherwise an extra bound `Output: Default` is required.
64struct SignalFutureData<T> {
65    state: SignalFutureState<T>,
66    waker: Option<Waker>,
67}
68
69impl<T> Default for SignalFutureData<T> {
70    fn default() -> Self {
71        Self {
72            state: Default::default(),
73            waker: None,
74        }
75    }
76}
77
78// Only public for itest.
79pub struct SignalFutureResolver<R: IntoDynamicSend> {
80    data: Arc<Mutex<SignalFutureData<R::Target>>>,
81}
82
83impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
84    fn clone(&self) -> Self {
85        Self {
86            data: self.data.clone(),
87        }
88    }
89}
90
91/// For itest to construct and test a resolver.
92#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
93pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
94    SignalFutureResolver {
95        data: Arc::new(Mutex::new(SignalFutureData::default())),
96    }
97}
98
99impl<R: IntoDynamicSend> SignalFutureResolver<R> {
100    fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
101        Self { data }
102    }
103}
104
105impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
106    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
107        state.write_usize(Arc::as_ptr(&self.data) as usize);
108    }
109}
110
111impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
112    fn eq(&self, other: &Self) -> bool {
113        Arc::ptr_eq(&self.data, &other.data)
114    }
115}
116
117impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
118    fn invoke(&mut self, args: &[&Variant]) -> Variant {
119        let waker = {
120            let mut data = self.data.lock().unwrap();
121            data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
122
123            // We no longer need the waker after we resolved. If the future is polled again, we'll also get a new waker.
124            data.waker.take()
125        };
126
127        if let Some(waker) = waker {
128            waker.wake();
129        }
130
131        Variant::nil()
132    }
133}
134
135impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
138    }
139}
140
141// This resolver will change the futures state when it's being dropped (i.e. the engine removes all connected signal callables). By marking
142// the future as dead we can resolve it to an error value the next time it gets polled.
143impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
144    fn drop(&mut self) {
145        let mut data = self.data.lock().unwrap();
146
147        if !matches!(data.state, SignalFutureState::Pending) {
148            // The future is no longer pending, so no clean up is required.
149            return;
150        }
151
152        // We mark the future as dead, so the next time it gets polled we can react to it's inability to resolve.
153        data.state = SignalFutureState::Dead;
154
155        // If we got a waker we trigger it to get the future polled. If there is no waker, then the future has not been polled yet and we
156        // simply wait for the runtime to perform the first poll.
157        if let Some(ref waker) = data.waker {
158            waker.wake_by_ref();
159        }
160    }
161}
162
163#[derive(Default)]
164enum SignalFutureState<T> {
165    #[default]
166    Pending,
167    Ready(T),
168    Dead,
169    Dropped,
170}
171
172impl<T> SignalFutureState<T> {
173    fn take(&mut self) -> Self {
174        let new_value = match self {
175            Self::Pending => Self::Pending,
176            Self::Ready(_) | Self::Dead => Self::Dead,
177            Self::Dropped => Self::Dropped,
178        };
179
180        std::mem::replace(self, new_value)
181    }
182}
183
184/// A future that tries to resolve as soon as the provided Godot signal was emitted.
185///
186/// The future might resolve to an error if the signal object is freed before the signal is emitted.
187///
188/// # Panics
189/// - If one of the signal arguments is `!Send`, but the signal was emitted on a different thread.
190/// - The future's `Drop` implementation can cause a non-unwinding panic in rare cases, should the signal object be freed at the same time
191///   as the future is dropped. Make sure to keep signal objects alive until there are no pending futures anymore.
192pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
193    data: Arc<Mutex<SignalFutureData<R::Target>>>,
194    callable: SignalFutureResolver<R>,
195    signal: Signal,
196}
197
198impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
199    fn new(signal: Signal) -> Self {
200        debug_assert!(
201            !signal.is_null(),
202            "Failed to create a future for an invalid Signal!\nEither the signal object was already freed or the signal was not registered in the object before using it.",
203        );
204
205        let data = Arc::new(Mutex::new(SignalFutureData::default()));
206
207        // The callable currently requires that the return value is Sync + Send.
208        let callable = SignalFutureResolver::new(data.clone());
209
210        signal.connect_flags(
211            &Callable::from_custom(callable.clone()),
212            ConnectFlags::ONE_SHOT,
213        );
214
215        Self {
216            data,
217            callable,
218            signal,
219        }
220    }
221
222    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
223        let mut data = self.data.lock().unwrap();
224
225        data.waker.replace(cx.waker().clone());
226
227        let value = data.state.take();
228
229        // Drop the data mutex lock to prevent the mutext from getting poisoned by the potential later panic.
230        drop(data);
231
232        match value {
233            SignalFutureState::Pending => Poll::Pending,
234            SignalFutureState::Dropped => unreachable!(),
235            SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
236            SignalFutureState::Ready(value) => {
237                let Some(value) = DynamicSend::extract_if_safe(value) else {
238                    panic!("the awaited signal was not emitted on the main-thread, but contained a non Send argument");
239                };
240
241                Poll::Ready(Ok(value))
242            }
243        }
244    }
245}
246
247/// Error that might be returned  by the [`FallibleSignalFuture`].
248///
249/// This error is being resolved to when the signal object is freed before the awaited singal is emitted.
250#[derive(Debug)]
251pub struct FallibleSignalFutureError;
252
253impl Display for FallibleSignalFutureError {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        write!(
256            f,
257            "The signal object was freed before the awaited signal was emitted"
258        )
259    }
260}
261
262impl std::error::Error for FallibleSignalFutureError {}
263
264impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
265    type Output = Result<R, FallibleSignalFutureError>;
266
267    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268        self.get_mut().poll(cx)
269    }
270}
271
272impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
273    fn drop(&mut self) {
274        // The callable might alredy be destroyed, this occurs during engine shutdown.
275        if self.signal.is_null() {
276            return;
277        }
278
279        let mut data_lock = self.data.lock().unwrap();
280
281        data_lock.state = SignalFutureState::Dropped;
282
283        drop(data_lock);
284
285        // We create a new Godot Callable from our RustCallable so we get independent reference counting.
286        let gd_callable = Callable::from_custom(self.callable.clone());
287
288        // is_connected will return true if the signal was never emited before the future is dropped.
289        //
290        // There is a TOCTOU issue here that can occur when the FallibleSignalFuture is dropped at the same time as the signal object is
291        // freed on a different thread.
292        // We check in the beginning if the signal object is still alive, and we check here again, but the signal object still can be freed
293        // between our check and our usage of the object in `is_connected` and `disconnect`. The race condition will manifest in a
294        // non-unwinding panic that is hard to track down.
295        if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
296            self.signal.disconnect(&gd_callable);
297        }
298    }
299}
300
301impl Signal {
302    /// Creates a fallible future for this signal.
303    ///
304    /// The future will resolve the next time the signal is emitted.
305    /// See [`FallibleSignalFuture`] for details.
306    ///
307    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
308    /// the call to this function.
309    pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
310        FallibleSignalFuture::new(self.clone())
311    }
312
313    /// Creates a future for this signal.
314    ///
315    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
316    /// See [`SignalFuture`] for details.
317    ///
318    /// Since the `Signal` type does not contain information on the signal argument types, the future output type has to be inferred from
319    /// the call to this function.
320    pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
321        SignalFuture::new(self.clone())
322    }
323}
324
325impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
326    /// Creates a fallible future for this signal.
327    ///
328    /// The future will resolve the next time the signal is emitted.
329    /// See [`FallibleSignalFuture`] for details.
330    pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
331        FallibleSignalFuture::new(self.to_untyped())
332    }
333
334    /// Creates a future for this signal.
335    ///
336    /// The future will resolve the next time the signal is emitted, but might panic if the signal object is freed.
337    /// See [`SignalFuture`] for details.
338    pub fn to_future(&self) -> SignalFuture<R> {
339        SignalFuture::new(self.to_untyped())
340    }
341}
342
343impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
344    type Output = R;
345
346    type IntoFuture = SignalFuture<R>;
347
348    fn into_future(self) -> Self::IntoFuture {
349        self.to_future()
350    }
351}
352
353/// Convert a value into a type that is [`Send`] at compile-time while the value might not be.
354///
355/// This allows to turn any implementor into a type that is `Send`, but requires to also implement [`DynamicSend`] as well.
356/// The later trait will verify if a value can actually be sent between threads at runtime.
357pub trait IntoDynamicSend: Sealed + 'static {
358    type Target: DynamicSend<Inner = Self>;
359
360    fn into_dynamic_send(self) -> Self::Target;
361}
362
363/// Runtime-checked `Send` capability.
364///
365/// Implemented for types that need a static `Send` bound, but where it is determined at runtime whether sending a value was
366/// actually safe. Only allows to extract the value if sending across threads is safe, thus fulfilling the `Send` supertrait.
367///
368/// # Safety
369/// The implementor has to guarantee that `extract_if_safe` returns `None`, if the value has been sent between threads while being `!Send`.
370///
371/// To uphold the `Send` supertrait guarantees, no public API apart from `extract_if_safe` must exist that would give access to the inner value from another thread.
372pub unsafe trait DynamicSend: Send + Sealed {
373    type Inner;
374
375    fn extract_if_safe(self) -> Option<Self::Inner>;
376}
377
378/// Value that can be sent across threads, but only accessed on its original thread.
379///
380/// When moved to another thread, the inner value can no longer be accessed and will be leaked when the `ThreadConfined` is dropped.
381pub struct ThreadConfined<T> {
382    value: Option<T>,
383    thread_id: ThreadId,
384}
385
386// SAFETY: This type can always be sent across threads, but the inner value can only be accessed on its original thread.
387unsafe impl<T> Send for ThreadConfined<T> {}
388
389impl<T> ThreadConfined<T> {
390    pub(crate) fn new(value: T) -> Self {
391        Self {
392            value: Some(value),
393            thread_id: std::thread::current().id(),
394        }
395    }
396
397    /// Retrieve the inner value, if the current thread is the one in which the `ThreadConfined` was created.
398    ///
399    /// If this fails, the value will be leaked immediately.
400    pub(crate) fn extract(mut self) -> Option<T> {
401        if self.is_original_thread() {
402            self.value.take()
403        } else {
404            None // causes Drop -> leak.
405        }
406    }
407
408    fn is_original_thread(&self) -> bool {
409        self.thread_id == std::thread::current().id()
410    }
411}
412
413impl<T> Drop for ThreadConfined<T> {
414    fn drop(&mut self) {
415        if !self.is_original_thread() {
416            std::mem::forget(self.value.take());
417
418            // Cannot panic, potentially during unwind already.
419            godot_error!(
420                "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
421            );
422        }
423    }
424}
425
426unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
427    type Inner = Gd<T>;
428
429    fn extract_if_safe(self) -> Option<Self::Inner> {
430        self.extract()
431    }
432}
433
434impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
435
436impl<T: GodotClass> IntoDynamicSend for Gd<T> {
437    type Target = ThreadConfined<Self>;
438
439    fn into_dynamic_send(self) -> Self::Target {
440        ThreadConfined::new(self)
441    }
442}
443
444// ----------------------------------------------------------------------------------------------------------------------------------------------
445// Generated impls
446
447#[macro_export(local_inner_macros)]
448macro_rules! impl_dynamic_send {
449    (Send; $($ty:ty),+) => {
450        $(
451            unsafe impl $crate::task::DynamicSend for $ty {
452                type Inner = Self;
453
454                fn extract_if_safe(self) -> Option<Self::Inner> {
455                    Some(self)
456                }
457            }
458
459            impl $crate::task::IntoDynamicSend for $ty {
460                type Target = Self;
461                fn into_dynamic_send(self) -> Self::Target {
462                    self
463                }
464            }
465        )+
466    };
467
468    (tuple; $($arg:ident: $ty:ident),*) => {
469        unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
470            type Inner = ($($ty::Inner,)*);
471
472            fn extract_if_safe(self) -> Option<Self::Inner> {
473                #[allow(non_snake_case)]
474                let ($($arg,)*) = self;
475
476                #[allow(clippy::unused_unit)]
477                match ($($arg.extract_if_safe(),)*) {
478                    ($(Some($arg),)*) => Some(($($arg,)*)),
479
480                    #[allow(unreachable_patterns)]
481                    _ => None,
482                }
483            }
484        }
485
486        impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
487            type Target = ($($ty::Target,)*);
488
489            fn into_dynamic_send(self) -> Self::Target {
490                #[allow(non_snake_case)]
491                let ($($arg,)*) = self;
492
493                #[allow(clippy::unused_unit)]
494                ($($arg.into_dynamic_send(),)*)
495            }
496        }
497    };
498
499    (!Send; $($ty:ident),+) => {
500        $(
501            impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
502
503            unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
504                type Inner = $crate::builtin::$ty;
505
506                fn extract_if_safe(self) -> Option<Self::Inner> {
507                    self.extract()
508                }
509            }
510
511            impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
512                type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
513
514                fn into_dynamic_send(self) -> Self::Target {
515                    $crate::task::ThreadConfined::new(self)
516                }
517            }
518        )+
519    };
520}
521
522#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
523mod tests {
524    use std::sync::atomic::{AtomicUsize, Ordering};
525    use std::sync::Arc;
526    use std::thread;
527
528    use super::{SignalFutureResolver, ThreadConfined};
529    use crate::classes::Object;
530    use crate::obj::Gd;
531    use crate::sys;
532
533    /// Test that the hash of a cloned future resolver is equal to its original version. With this equality in place, we can create new
534    /// Callables that are equal to their original version but have separate reference counting.
535    #[test]
536    fn future_resolver_cloned_hash() {
537        let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
538        let resolver_b = resolver_a.clone();
539
540        let hash_a = sys::hash_value(&resolver_a);
541        let hash_b = sys::hash_value(&resolver_b);
542
543        assert_eq!(hash_a, hash_b);
544    }
545
546    // Test that dropping ThreadConfined<T> on another thread leaks the inner value.
547    #[test]
548    fn thread_confined_extract() {
549        let confined = ThreadConfined::new(772);
550        assert_eq!(confined.extract(), Some(772));
551
552        let confined = ThreadConfined::new(772);
553
554        let handle = thread::spawn(move || {
555            assert!(confined.extract().is_none());
556        });
557        handle.join().unwrap();
558    }
559
560    #[test]
561    fn thread_confined_leak_on_other_thread() {
562        static COUNTER: AtomicUsize = AtomicUsize::new(0);
563
564        struct DropCounter;
565        impl Drop for DropCounter {
566            fn drop(&mut self) {
567                COUNTER.fetch_add(1, Ordering::SeqCst);
568            }
569        }
570
571        let drop_counter = DropCounter;
572        let confined = ThreadConfined::new(drop_counter);
573
574        let handle = thread::spawn(move || drop(confined));
575        handle.join().unwrap();
576
577        // The counter should still be 0, meaning Drop was not called (leaked).
578        assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
579    }
580}