1use core::panic;
9use std::fmt::Display;
10use std::future::{Future, IntoFuture};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15
16use crate::builtin::{Callable, RustCallable, Signal, Variant};
17use crate::classes::object::ConnectFlags;
18use crate::godot_error;
19use crate::meta::sealed::Sealed;
20use crate::meta::InParamTuple;
21use crate::obj::{Gd, GodotClass, WithSignals};
22use crate::registry::signal::TypedSignal;
23
24#[rustfmt::skip] pub(crate) use crate::impl_dynamic_send;
28
29pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
40
41impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
42 fn new(signal: Signal) -> Self {
43 Self(FallibleSignalFuture::new(signal))
44 }
45}
46
47impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
48 type Output = R;
49
50 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51 let poll_result = self.get_mut().0.poll(cx);
52
53 match poll_result {
54 Poll::Pending => Poll::Pending,
55 Poll::Ready(Ok(value)) => Poll::Ready(value),
56 Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
57 "the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
58 ),
59 }
60 }
61}
62
63struct SignalFutureData<T> {
65 state: SignalFutureState<T>,
66 waker: Option<Waker>,
67}
68
69impl<T> Default for SignalFutureData<T> {
70 fn default() -> Self {
71 Self {
72 state: Default::default(),
73 waker: None,
74 }
75 }
76}
77
78pub struct SignalFutureResolver<R: IntoDynamicSend> {
80 data: Arc<Mutex<SignalFutureData<R::Target>>>,
81}
82
83impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
84 fn clone(&self) -> Self {
85 Self {
86 data: self.data.clone(),
87 }
88 }
89}
90
91#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
93pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
94 SignalFutureResolver {
95 data: Arc::new(Mutex::new(SignalFutureData::default())),
96 }
97}
98
99impl<R: IntoDynamicSend> SignalFutureResolver<R> {
100 fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
101 Self { data }
102 }
103}
104
105impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
106 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
107 state.write_usize(Arc::as_ptr(&self.data) as usize);
108 }
109}
110
111impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
112 fn eq(&self, other: &Self) -> bool {
113 Arc::ptr_eq(&self.data, &other.data)
114 }
115}
116
117impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
118 fn invoke(&mut self, args: &[&Variant]) -> Variant {
119 let waker = {
120 let mut data = self.data.lock().unwrap();
121 data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
122
123 data.waker.take()
125 };
126
127 if let Some(waker) = waker {
128 waker.wake();
129 }
130
131 Variant::nil()
132 }
133}
134
135impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
138 }
139}
140
141impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
144 fn drop(&mut self) {
145 let mut data = self.data.lock().unwrap();
146
147 if !matches!(data.state, SignalFutureState::Pending) {
148 return;
150 }
151
152 data.state = SignalFutureState::Dead;
154
155 if let Some(ref waker) = data.waker {
158 waker.wake_by_ref();
159 }
160 }
161}
162
163#[derive(Default)]
164enum SignalFutureState<T> {
165 #[default]
166 Pending,
167 Ready(T),
168 Dead,
169 Dropped,
170}
171
172impl<T> SignalFutureState<T> {
173 fn take(&mut self) -> Self {
174 let new_value = match self {
175 Self::Pending => Self::Pending,
176 Self::Ready(_) | Self::Dead => Self::Dead,
177 Self::Dropped => Self::Dropped,
178 };
179
180 std::mem::replace(self, new_value)
181 }
182}
183
184pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
193 data: Arc<Mutex<SignalFutureData<R::Target>>>,
194 callable: SignalFutureResolver<R>,
195 signal: Signal,
196}
197
198impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
199 fn new(signal: Signal) -> Self {
200 debug_assert!(
201 !signal.is_null(),
202 "Failed to create a future for an invalid Signal!\nEither the signal object was already freed or the signal was not registered in the object before using it.",
203 );
204
205 let data = Arc::new(Mutex::new(SignalFutureData::default()));
206
207 let callable = SignalFutureResolver::new(data.clone());
209
210 signal.connect_flags(
211 &Callable::from_custom(callable.clone()),
212 ConnectFlags::ONE_SHOT,
213 );
214
215 Self {
216 data,
217 callable,
218 signal,
219 }
220 }
221
222 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
223 let mut data = self.data.lock().unwrap();
224
225 data.waker.replace(cx.waker().clone());
226
227 let value = data.state.take();
228
229 drop(data);
231
232 match value {
233 SignalFutureState::Pending => Poll::Pending,
234 SignalFutureState::Dropped => unreachable!(),
235 SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
236 SignalFutureState::Ready(value) => {
237 let Some(value) = DynamicSend::extract_if_safe(value) else {
238 panic!("the awaited signal was not emitted on the main-thread, but contained a non Send argument");
239 };
240
241 Poll::Ready(Ok(value))
242 }
243 }
244 }
245}
246
247#[derive(Debug)]
251pub struct FallibleSignalFutureError;
252
253impl Display for FallibleSignalFutureError {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 write!(
256 f,
257 "The signal object was freed before the awaited signal was emitted"
258 )
259 }
260}
261
262impl std::error::Error for FallibleSignalFutureError {}
263
264impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
265 type Output = Result<R, FallibleSignalFutureError>;
266
267 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268 self.get_mut().poll(cx)
269 }
270}
271
272impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
273 fn drop(&mut self) {
274 if self.signal.is_null() {
276 return;
277 }
278
279 let mut data_lock = self.data.lock().unwrap();
280
281 data_lock.state = SignalFutureState::Dropped;
282
283 drop(data_lock);
284
285 let gd_callable = Callable::from_custom(self.callable.clone());
287
288 if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
296 self.signal.disconnect(&gd_callable);
297 }
298 }
299}
300
301impl Signal {
302 pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
310 FallibleSignalFuture::new(self.clone())
311 }
312
313 pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
321 SignalFuture::new(self.clone())
322 }
323}
324
325impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
326 pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
331 FallibleSignalFuture::new(self.to_untyped())
332 }
333
334 pub fn to_future(&self) -> SignalFuture<R> {
339 SignalFuture::new(self.to_untyped())
340 }
341}
342
343impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
344 type Output = R;
345
346 type IntoFuture = SignalFuture<R>;
347
348 fn into_future(self) -> Self::IntoFuture {
349 self.to_future()
350 }
351}
352
353pub trait IntoDynamicSend: Sealed + 'static {
358 type Target: DynamicSend<Inner = Self>;
359
360 fn into_dynamic_send(self) -> Self::Target;
361}
362
363pub unsafe trait DynamicSend: Send + Sealed {
373 type Inner;
374
375 fn extract_if_safe(self) -> Option<Self::Inner>;
376}
377
378pub struct ThreadConfined<T> {
382 value: Option<T>,
383 thread_id: ThreadId,
384}
385
386unsafe impl<T> Send for ThreadConfined<T> {}
388
389impl<T> ThreadConfined<T> {
390 pub(crate) fn new(value: T) -> Self {
391 Self {
392 value: Some(value),
393 thread_id: std::thread::current().id(),
394 }
395 }
396
397 pub(crate) fn extract(mut self) -> Option<T> {
401 if self.is_original_thread() {
402 self.value.take()
403 } else {
404 None }
406 }
407
408 fn is_original_thread(&self) -> bool {
409 self.thread_id == std::thread::current().id()
410 }
411}
412
413impl<T> Drop for ThreadConfined<T> {
414 fn drop(&mut self) {
415 if !self.is_original_thread() {
416 std::mem::forget(self.value.take());
417
418 godot_error!(
420 "Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
421 );
422 }
423 }
424}
425
426unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
427 type Inner = Gd<T>;
428
429 fn extract_if_safe(self) -> Option<Self::Inner> {
430 self.extract()
431 }
432}
433
434impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
435
436impl<T: GodotClass> IntoDynamicSend for Gd<T> {
437 type Target = ThreadConfined<Self>;
438
439 fn into_dynamic_send(self) -> Self::Target {
440 ThreadConfined::new(self)
441 }
442}
443
444#[macro_export(local_inner_macros)]
448macro_rules! impl_dynamic_send {
449 (Send; $($ty:ty),+) => {
450 $(
451 unsafe impl $crate::task::DynamicSend for $ty {
452 type Inner = Self;
453
454 fn extract_if_safe(self) -> Option<Self::Inner> {
455 Some(self)
456 }
457 }
458
459 impl $crate::task::IntoDynamicSend for $ty {
460 type Target = Self;
461 fn into_dynamic_send(self) -> Self::Target {
462 self
463 }
464 }
465 )+
466 };
467
468 (tuple; $($arg:ident: $ty:ident),*) => {
469 unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
470 type Inner = ($($ty::Inner,)*);
471
472 fn extract_if_safe(self) -> Option<Self::Inner> {
473 #[allow(non_snake_case)]
474 let ($($arg,)*) = self;
475
476 #[allow(clippy::unused_unit)]
477 match ($($arg.extract_if_safe(),)*) {
478 ($(Some($arg),)*) => Some(($($arg,)*)),
479
480 #[allow(unreachable_patterns)]
481 _ => None,
482 }
483 }
484 }
485
486 impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
487 type Target = ($($ty::Target,)*);
488
489 fn into_dynamic_send(self) -> Self::Target {
490 #[allow(non_snake_case)]
491 let ($($arg,)*) = self;
492
493 #[allow(clippy::unused_unit)]
494 ($($arg.into_dynamic_send(),)*)
495 }
496 }
497 };
498
499 (!Send; $($ty:ident),+) => {
500 $(
501 impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
502
503 unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
504 type Inner = $crate::builtin::$ty;
505
506 fn extract_if_safe(self) -> Option<Self::Inner> {
507 self.extract()
508 }
509 }
510
511 impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
512 type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
513
514 fn into_dynamic_send(self) -> Self::Target {
515 $crate::task::ThreadConfined::new(self)
516 }
517 }
518 )+
519 };
520}
521
522#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
523mod tests {
524 use std::sync::atomic::{AtomicUsize, Ordering};
525 use std::sync::Arc;
526 use std::thread;
527
528 use super::{SignalFutureResolver, ThreadConfined};
529 use crate::classes::Object;
530 use crate::obj::Gd;
531 use crate::sys;
532
533 #[test]
536 fn future_resolver_cloned_hash() {
537 let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
538 let resolver_b = resolver_a.clone();
539
540 let hash_a = sys::hash_value(&resolver_a);
541 let hash_b = sys::hash_value(&resolver_b);
542
543 assert_eq!(hash_a, hash_b);
544 }
545
546 #[test]
548 fn thread_confined_extract() {
549 let confined = ThreadConfined::new(772);
550 assert_eq!(confined.extract(), Some(772));
551
552 let confined = ThreadConfined::new(772);
553
554 let handle = thread::spawn(move || {
555 assert!(confined.extract().is_none());
556 });
557 handle.join().unwrap();
558 }
559
560 #[test]
561 fn thread_confined_leak_on_other_thread() {
562 static COUNTER: AtomicUsize = AtomicUsize::new(0);
563
564 struct DropCounter;
565 impl Drop for DropCounter {
566 fn drop(&mut self) {
567 COUNTER.fetch_add(1, Ordering::SeqCst);
568 }
569 }
570
571 let drop_counter = DropCounter;
572 let confined = ThreadConfined::new(drop_counter);
573
574 let handle = thread::spawn(move || drop(confined));
575 handle.join().unwrap();
576
577 assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
579 }
580}