1use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::global::godot_error;
19use crate::meta::sealed::Sealed;
20use crate::meta::InParamTuple;
21use crate::obj::{Gd, GodotClass, WithSignals};
22use crate::registry::signal::TypedSignal;
23use crate::sys;
24
25#[rustfmt::skip] pub(crate) use crate::impl_dynamic_send;
29
30pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
41
42impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
43 fn new(signal: Signal) -> Self {
44 Self(FallibleSignalFuture::new(signal))
45 }
46}
47
48impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
49 type Output = R;
50
51 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52 let poll_result = self.get_mut().0.poll(cx);
53
54 match poll_result {
55 Poll::Pending => Poll::Pending,
56 Poll::Ready(Ok(value)) => Poll::Ready(value),
57 Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
58 "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
59 ),
60 }
61 }
62}
63
64struct SignalFutureData<T> {
66 state: SignalFutureState<T>,
67 waker: Option<Waker>,
68}
69
70impl<T> Default for SignalFutureData<T> {
71 fn default() -> Self {
72 Self {
73 state: Default::default(),
74 waker: None,
75 }
76 }
77}
78
79pub struct SignalFutureResolver<R: IntoDynamicSend> {
81 data: Arc<Mutex<SignalFutureData<R::Target>>>,
82}
83
84impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
85 fn clone(&self) -> Self {
86 Self {
87 data: self.data.clone(),
88 }
89 }
90}
91
92#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
94pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
95 SignalFutureResolver {
96 data: Arc::new(Mutex::new(SignalFutureData::default())),
97 }
98}
99
100impl<R: IntoDynamicSend> SignalFutureResolver<R> {
101 fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
102 Self { data }
103 }
104}
105
106impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
107 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
108 state.write_usize(Arc::as_ptr(&self.data) as usize);
109 }
110}
111
112impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
113 fn eq(&self, other: &Self) -> bool {
114 Arc::ptr_eq(&self.data, &other.data)
115 }
116}
117
118impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
119 fn invoke(&mut self, args: &[&Variant]) -> Variant {
120 let waker = {
121 let mut data = self.data.lock().unwrap();
122 data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
123
124 data.waker.take()
126 };
127
128 if let Some(waker) = waker {
129 waker.wake();
130 }
131
132 Variant::nil()
133 }
134}
135
136impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
139 }
140}
141
142impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
145 fn drop(&mut self) {
146 let mut data = self.data.lock().unwrap();
147
148 if !matches!(data.state, SignalFutureState::Pending) {
149 return;
151 }
152
153 data.state = SignalFutureState::Dead;
155
156 if let Some(ref waker) = data.waker {
159 waker.wake_by_ref();
160 }
161 }
162}
163
164#[derive(Default)]
165enum SignalFutureState<T> {
166 #[default]
167 Pending,
168 Ready(T),
169 Dead,
170 Dropped,
171}
172
173impl<T> SignalFutureState<T> {
174 fn take(&mut self) -> Self {
175 let new_value = match self {
176 Self::Pending => Self::Pending,
177 Self::Ready(_) | Self::Dead => Self::Dead,
178 Self::Dropped => Self::Dropped,
179 };
180
181 std::mem::replace(self, new_value)
182 }
183}
184
185pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
194 data: Arc<Mutex<SignalFutureData<R::Target>>>,
195 callable: SignalFutureResolver<R>,
196 signal: Signal,
197}
198
199impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
200 fn new(signal: Signal) -> Self {
201 sys::strict_assert!(
202 !signal.is_null(),
203 "Failed to create future for invalid signal:\n\
204 Either the signal object was already freed, or it\n\
205 was not registered in the object before being used.",
206 );
207
208 let data = Arc::new(Mutex::new(SignalFutureData::default()));
209
210 let callable = SignalFutureResolver::new(data.clone());
212
213 signal.connect_flags(
214 &Callable::from_custom(callable.clone()),
215 ConnectFlags::ONE_SHOT,
216 );
217
218 Self {
219 data,
220 callable,
221 signal,
222 }
223 }
224
225 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
226 let mut data = self.data.lock().unwrap();
227
228 data.waker.replace(cx.waker().clone());
229
230 let value = data.state.take();
231
232 drop(data);
234
235 match value {
236 SignalFutureState::Pending => Poll::Pending,
237 SignalFutureState::Dropped => unreachable!(),
238 SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
239 SignalFutureState::Ready(value) => {
240 let Some(value) = DynamicSend::extract_if_safe(value) else {
241 panic!("the awaited signal was not emitted on the main-thread, but contained a non Send argument");
242 };
243
244 Poll::Ready(Ok(value))
245 }
246 }
247 }
248}
249
250#[derive(Debug)]
254pub struct FallibleSignalFutureError;
255
256impl Display for FallibleSignalFutureError {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 write!(
259 f,
260 "The signal object was freed before the awaited signal was emitted"
261 )
262 }
263}
264
265impl std::error::Error for FallibleSignalFutureError {}
266
267impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
268 type Output = Result<R, FallibleSignalFutureError>;
269
270 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271 self.get_mut().poll(cx)
272 }
273}
274
275impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
276 fn drop(&mut self) {
277 if self.signal.is_null() {
279 return;
280 }
281
282 let mut data_lock = self.data.lock().unwrap();
283
284 data_lock.state = SignalFutureState::Dropped;
285
286 drop(data_lock);
287
288 let gd_callable = Callable::from_custom(self.callable.clone());
290
291 if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
299 self.signal.disconnect(&gd_callable);
300 }
301 }
302}
303
304impl Signal {
305 pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
313 FallibleSignalFuture::new(self.clone())
314 }
315
316 pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
324 SignalFuture::new(self.clone())
325 }
326}
327
328impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
329 pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
334 FallibleSignalFuture::new(self.to_untyped())
335 }
336
337 pub fn to_future(&self) -> SignalFuture<R> {
342 SignalFuture::new(self.to_untyped())
343 }
344}
345
346impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
347 type Output = R;
348
349 type IntoFuture = SignalFuture<R>;
350
351 fn into_future(self) -> Self::IntoFuture {
352 self.to_future()
353 }
354}
355
356pub trait IntoDynamicSend: Sealed + 'static {
361 type Target: DynamicSend<Inner = Self>;
362
363 fn into_dynamic_send(self) -> Self::Target;
364}
365
366pub unsafe trait DynamicSend: Send + Sealed {
376 type Inner;
377
378 fn extract_if_safe(self) -> Option<Self::Inner>;
379}
380
381pub struct ThreadConfined<T> {
385 value: Option<T>,
386 thread_id: ThreadId,
387}
388
389unsafe impl<T> Send for ThreadConfined<T> {}
391
392impl<T> ThreadConfined<T> {
393 pub(crate) fn new(value: T) -> Self {
394 Self {
395 value: Some(value),
396 thread_id: std::thread::current().id(),
397 }
398 }
399
400 pub(crate) fn extract(mut self) -> Option<T> {
404 if self.is_original_thread() {
405 self.value.take()
406 } else {
407 None }
409 }
410
411 fn is_original_thread(&self) -> bool {
412 self.thread_id == std::thread::current().id()
413 }
414}
415
416impl<T> Drop for ThreadConfined<T> {
417 fn drop(&mut self) {
418 if !self.is_original_thread() {
419 std::mem::forget(self.value.take());
420
421 godot_error!(
423 "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
424 );
425 }
426 }
427}
428
429unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
430 type Inner = Gd<T>;
431
432 fn extract_if_safe(self) -> Option<Self::Inner> {
433 self.extract()
434 }
435}
436
437impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
438
439impl<T: GodotClass> IntoDynamicSend for Gd<T> {
440 type Target = ThreadConfined<Self>;
441
442 fn into_dynamic_send(self) -> Self::Target {
443 ThreadConfined::new(self)
444 }
445}
446
447#[macro_export(local_inner_macros)]
451macro_rules! impl_dynamic_send {
452 (Send; $($ty:ty),+) => {
453 $(
454 unsafe impl $crate::task::DynamicSend for $ty {
455 type Inner = Self;
456
457 fn extract_if_safe(self) -> Option<Self::Inner> {
458 Some(self)
459 }
460 }
461
462 impl $crate::task::IntoDynamicSend for $ty {
463 type Target = Self;
464 fn into_dynamic_send(self) -> Self::Target {
465 self
466 }
467 }
468 )+
469 };
470
471 (tuple; $($arg:ident: $ty:ident),*) => {
472 unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
473 type Inner = ($($ty::Inner,)*);
474
475 fn extract_if_safe(self) -> Option<Self::Inner> {
476 #[allow(non_snake_case)]
477 let ($($arg,)*) = self;
478
479 #[allow(clippy::unused_unit)]
480 match ($($arg.extract_if_safe(),)*) {
481 ($(Some($arg),)*) => Some(($($arg,)*)),
482
483 #[allow(unreachable_patterns)]
484 _ => None,
485 }
486 }
487 }
488
489 impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
490 type Target = ($($ty::Target,)*);
491
492 fn into_dynamic_send(self) -> Self::Target {
493 #[allow(non_snake_case)]
494 let ($($arg,)*) = self;
495
496 #[allow(clippy::unused_unit)]
497 ($($arg.into_dynamic_send(),)*)
498 }
499 }
500 };
501
502 (!Send; $($ty:ident),+) => {
503 $(
504 impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
505
506 unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
507 type Inner = $crate::builtin::$ty;
508
509 fn extract_if_safe(self) -> Option<Self::Inner> {
510 self.extract()
511 }
512 }
513
514 impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
515 type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
516
517 fn into_dynamic_send(self) -> Self::Target {
518 $crate::task::ThreadConfined::new(self)
519 }
520 }
521 )+
522 };
523}
524
525#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
526mod tests {
527 use std::sync::atomic::{AtomicUsize, Ordering};
528 use std::sync::Arc;
529 use std::thread;
530
531 use super::{SignalFutureResolver, ThreadConfined};
532 use crate::classes::Object;
533 use crate::obj::Gd;
534 use crate::sys;
535
536 #[test]
539 fn future_resolver_cloned_hash() {
540 let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
541 let resolver_b = resolver_a.clone();
542
543 let hash_a = sys::hash_value(&resolver_a);
544 let hash_b = sys::hash_value(&resolver_b);
545
546 assert_eq!(hash_a, hash_b);
547 }
548
549 #[test]
551 fn thread_confined_extract() {
552 let confined = ThreadConfined::new(772);
553 assert_eq!(confined.extract(), Some(772));
554
555 let confined = ThreadConfined::new(772);
556
557 let handle = thread::spawn(move || {
558 assert!(confined.extract().is_none());
559 });
560 handle.join().unwrap();
561 }
562
563 #[test]
564 fn thread_confined_leak_on_other_thread() {
565 static COUNTER: AtomicUsize = AtomicUsize::new(0);
566
567 struct DropCounter;
568 impl Drop for DropCounter {
569 fn drop(&mut self) {
570 COUNTER.fetch_add(1, Ordering::SeqCst);
571 }
572 }
573
574 let drop_counter = DropCounter;
575 let confined = ThreadConfined::new(drop_counter);
576
577 let handle = thread::spawn(move || drop(confined));
578 handle.join().unwrap();
579
580 assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
582 }
583}