1use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::global::godot_error;
19use crate::meta::InParamTuple;
20use crate::meta::sealed::Sealed;
21use crate::obj::signal::TypedSignal;
22use crate::obj::{Gd, GodotClass, WithSignals};
23use crate::sys;
24
25#[rustfmt::skip] pub(crate) use crate::impl_dynamic_send;
29
30pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
41
42impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
43 fn new(signal: Signal) -> Self {
44 Self(FallibleSignalFuture::new(signal))
45 }
46}
47
48impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
49 type Output = R;
50
51 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52 let poll_result = self.get_mut().0.poll(cx);
53
54 match poll_result {
55 Poll::Pending => Poll::Pending,
56 Poll::Ready(Ok(value)) => Poll::Ready(value),
57 Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
58 "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
59 ),
60 }
61 }
62}
63
64struct SignalFutureData<T> {
66 state: SignalFutureState<T>,
67 waker: Option<Waker>,
68}
69
70impl<T> Default for SignalFutureData<T> {
71 fn default() -> Self {
72 Self {
73 state: Default::default(),
74 waker: None,
75 }
76 }
77}
78
79pub struct SignalFutureResolver<R: IntoDynamicSend> {
81 data: Arc<Mutex<SignalFutureData<R::Target>>>,
82}
83
84impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
85 fn clone(&self) -> Self {
86 Self {
87 data: self.data.clone(),
88 }
89 }
90}
91
92#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
94pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
95 SignalFutureResolver {
96 data: Arc::new(Mutex::new(SignalFutureData::default())),
97 }
98}
99
100impl<R: IntoDynamicSend> SignalFutureResolver<R> {
101 fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
102 Self { data }
103 }
104}
105
106impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
107 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
108 state.write_usize(Arc::as_ptr(&self.data) as usize);
109 }
110}
111
112impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
113 fn eq(&self, other: &Self) -> bool {
114 Arc::ptr_eq(&self.data, &other.data)
115 }
116}
117
118impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
119 fn invoke(&mut self, args: &[&Variant]) -> Variant {
120 let waker = {
121 let mut data = self.data.lock().unwrap();
122 data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
123
124 data.waker.take()
126 };
127
128 if let Some(waker) = waker {
129 waker.wake();
130 }
131
132 Variant::nil()
133 }
134}
135
136impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
139 }
140}
141
142impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
145 fn drop(&mut self) {
146 let mut data = self.data.lock().unwrap();
147
148 if !matches!(data.state, SignalFutureState::Pending) {
149 return;
151 }
152
153 data.state = SignalFutureState::Dead;
155
156 if let Some(ref waker) = data.waker {
159 waker.wake_by_ref();
160 }
161 }
162}
163
164#[derive(Default)]
165enum SignalFutureState<T> {
166 #[default]
167 Pending,
168 Ready(T),
169 Dead,
170 Dropped,
171}
172
173impl<T> SignalFutureState<T> {
174 fn take(&mut self) -> Self {
175 let new_value = match self {
176 Self::Pending => Self::Pending,
177 Self::Ready(_) | Self::Dead => Self::Dead,
178 Self::Dropped => Self::Dropped,
179 };
180
181 std::mem::replace(self, new_value)
182 }
183}
184
185pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
194 data: Arc<Mutex<SignalFutureData<R::Target>>>,
195 callable: SignalFutureResolver<R>,
196 signal: Signal,
197}
198
199impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
200 fn new(signal: Signal) -> Self {
201 sys::strict_assert!(
202 !signal.is_null(),
203 "Failed to create future for invalid signal:\n\
204 Either the signal object was already freed, or it\n\
205 was not registered in the object before being used.",
206 );
207
208 let data = Arc::new(Mutex::new(SignalFutureData::default()));
209
210 let callable = SignalFutureResolver::new(data.clone());
212
213 signal.connect_flags(
214 &Callable::from_custom(callable.clone()),
215 ConnectFlags::ONE_SHOT,
216 );
217
218 Self {
219 data,
220 callable,
221 signal,
222 }
223 }
224
225 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
226 let mut data = self.data.lock().unwrap();
227
228 data.waker.replace(cx.waker().clone());
229
230 let value = data.state.take();
231
232 drop(data);
234
235 match value {
236 SignalFutureState::Pending => Poll::Pending,
237 SignalFutureState::Dropped => unreachable!(),
238 SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
239 SignalFutureState::Ready(value) => {
240 let Some(value) = DynamicSend::extract_if_safe(value) else {
241 panic!(
242 "the awaited signal was not emitted on the main-thread, but contained a non Send argument"
243 );
244 };
245
246 Poll::Ready(Ok(value))
247 }
248 }
249 }
250}
251
252#[derive(Debug)]
256pub struct FallibleSignalFutureError;
257
258impl Display for FallibleSignalFutureError {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(
261 f,
262 "The signal object was freed before the awaited signal was emitted"
263 )
264 }
265}
266
267impl std::error::Error for FallibleSignalFutureError {}
268
269impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
270 type Output = Result<R, FallibleSignalFutureError>;
271
272 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 self.get_mut().poll(cx)
274 }
275}
276
277impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
278 fn drop(&mut self) {
279 if self.signal.is_null() {
281 return;
282 }
283
284 let mut data_lock = self.data.lock().unwrap();
285
286 data_lock.state = SignalFutureState::Dropped;
287
288 drop(data_lock);
289
290 let gd_callable = Callable::from_custom(self.callable.clone());
292
293 if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
301 self.signal.disconnect(&gd_callable);
302 }
303 }
304}
305
306impl Signal {
307 pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
315 FallibleSignalFuture::new(self.clone())
316 }
317
318 pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
326 SignalFuture::new(self.clone())
327 }
328}
329
330impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
331 pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
336 FallibleSignalFuture::new(self.to_untyped())
337 }
338
339 pub fn to_future(&self) -> SignalFuture<R> {
344 SignalFuture::new(self.to_untyped())
345 }
346}
347
348impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
349 type Output = R;
350
351 type IntoFuture = SignalFuture<R>;
352
353 fn into_future(self) -> Self::IntoFuture {
354 self.to_future()
355 }
356}
357
358pub trait IntoDynamicSend: Sealed + 'static {
363 type Target: DynamicSend<Inner = Self>;
364
365 fn into_dynamic_send(self) -> Self::Target;
366}
367
368pub unsafe trait DynamicSend: Send + Sealed {
378 type Inner;
379
380 fn extract_if_safe(self) -> Option<Self::Inner>;
381}
382
383pub struct ThreadConfined<T> {
387 value: Option<T>,
388 thread_id: ThreadId,
389}
390
391unsafe impl<T> Send for ThreadConfined<T> {}
393
394impl<T> ThreadConfined<T> {
395 pub(crate) fn new(value: T) -> Self {
396 Self {
397 value: Some(value),
398 thread_id: std::thread::current().id(),
399 }
400 }
401
402 pub(crate) fn extract(mut self) -> Option<T> {
406 if self.is_original_thread() {
407 self.value.take()
408 } else {
409 None }
411 }
412
413 fn is_original_thread(&self) -> bool {
414 self.thread_id == std::thread::current().id()
415 }
416}
417
418impl<T> Drop for ThreadConfined<T> {
419 fn drop(&mut self) {
420 if !self.is_original_thread() {
421 std::mem::forget(self.value.take());
422
423 godot_error!(
425 "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
426 );
427 }
428 }
429}
430
431unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
432 type Inner = Gd<T>;
433
434 fn extract_if_safe(self) -> Option<Self::Inner> {
435 self.extract()
436 }
437}
438
439impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
440
441impl<T: GodotClass> IntoDynamicSend for Gd<T> {
442 type Target = ThreadConfined<Self>;
443
444 fn into_dynamic_send(self) -> Self::Target {
445 ThreadConfined::new(self)
446 }
447}
448
449#[macro_export(local_inner_macros)]
453macro_rules! impl_dynamic_send {
454 (Send; $($ty:ty),+) => {
455 $(
456 unsafe impl $crate::task::DynamicSend for $ty {
457 type Inner = Self;
458
459 fn extract_if_safe(self) -> Option<Self::Inner> {
460 Some(self)
461 }
462 }
463
464 impl $crate::task::IntoDynamicSend for $ty {
465 type Target = Self;
466 fn into_dynamic_send(self) -> Self::Target {
467 self
468 }
469 }
470 )+
471 };
472
473 (tuple; $($arg:ident: $ty:ident),*) => {
474 unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
475 type Inner = ($($ty::Inner,)*);
476
477 fn extract_if_safe(self) -> Option<Self::Inner> {
478 #[allow(non_snake_case)]
479 let ($($arg,)*) = self;
480
481 #[allow(clippy::unused_unit)]
482 match ($($arg.extract_if_safe(),)*) {
483 ($(Some($arg),)*) => Some(($($arg,)*)),
484
485 #[allow(unreachable_patterns)]
486 _ => None,
487 }
488 }
489 }
490
491 impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
492 type Target = ($($ty::Target,)*);
493
494 fn into_dynamic_send(self) -> Self::Target {
495 #[allow(non_snake_case)]
496 let ($($arg,)*) = self;
497
498 #[allow(clippy::unused_unit)]
499 ($($arg.into_dynamic_send(),)*)
500 }
501 }
502 };
503
504 (!Send; $($ty:ident),+) => {
505 $(
506 impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
507
508 unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
509 type Inner = $crate::builtin::$ty;
510
511 fn extract_if_safe(self) -> Option<Self::Inner> {
512 self.extract()
513 }
514 }
515
516 impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
517 type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
518
519 fn into_dynamic_send(self) -> Self::Target {
520 $crate::task::ThreadConfined::new(self)
521 }
522 }
523 )+
524 };
525}
526
527#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
528mod tests {
529 use std::sync::Arc;
530 use std::sync::atomic::{AtomicUsize, Ordering};
531 use std::thread;
532
533 use super::{SignalFutureResolver, ThreadConfined};
534 use crate::classes::Object;
535 use crate::obj::Gd;
536 use crate::sys;
537
538 #[test]
541 fn future_resolver_cloned_hash() {
542 let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
543 let resolver_b = resolver_a.clone();
544
545 let hash_a = sys::hash_value(&resolver_a);
546 let hash_b = sys::hash_value(&resolver_b);
547
548 assert_eq!(hash_a, hash_b);
549 }
550
551 #[test]
553 #[cfg_attr(
554 all(target_family = "wasm", not(target_feature = "atomics")),
555 ignore = "Threading not available"
556 )]
557 fn thread_confined_extract() {
558 let confined = ThreadConfined::new(772);
559 assert_eq!(confined.extract(), Some(772));
560
561 let confined = ThreadConfined::new(772);
562
563 let handle = thread::spawn(move || {
564 assert!(confined.extract().is_none());
565 });
566 handle.join().unwrap();
567 }
568
569 #[test]
570 #[cfg_attr(
571 all(target_family = "wasm", not(target_feature = "atomics")),
572 ignore = "Threading not available"
573 )]
574 fn thread_confined_leak_on_other_thread() {
575 static COUNTER: AtomicUsize = AtomicUsize::new(0);
576
577 struct DropCounter;
578 impl Drop for DropCounter {
579 fn drop(&mut self) {
580 COUNTER.fetch_add(1, Ordering::SeqCst);
581 }
582 }
583
584 let drop_counter = DropCounter;
585 let confined = ThreadConfined::new(drop_counter);
586
587 let handle = thread::spawn(move || drop(confined));
588 handle.join().unwrap();
589
590 assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
592 }
593}