use core::panic;
use std::fmt::Display;
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::ThreadId;
use crate::builtin::{Callable, RustCallable, Signal, Variant};
use crate::classes::object::ConnectFlags;
use crate::global::godot_error;
use crate::meta::InParamTuple;
use crate::meta::sealed::Sealed;
use crate::obj::signal::TypedSignal;
use crate::obj::{Gd, GodotClass, WithSignals};
use crate::sys;
#[rustfmt::skip] pub(crate) use crate::impl_dynamic_send;
pub struct SignalFuture<R: InParamTuple + IntoDynamicSend>(FallibleSignalFuture<R>);
impl<R: InParamTuple + IntoDynamicSend> SignalFuture<R> {
fn new(signal: Signal) -> Self {
Self(FallibleSignalFuture::new(signal))
}
}
impl<R: InParamTuple + IntoDynamicSend> Future for SignalFuture<R> {
type Output = R;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.get_mut().0.poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(value)) => Poll::Ready(value),
Poll::Ready(Err(FallibleSignalFutureError)) => panic!(
"the signal object of a SignalFuture was freed, while the future was still waiting for the signal to be emitted"
),
}
}
}
struct SignalFutureData<T> {
state: SignalFutureState<T>,
waker: Option<Waker>,
}
impl<T> Default for SignalFutureData<T> {
fn default() -> Self {
Self {
state: Default::default(),
waker: None,
}
}
}
pub struct SignalFutureResolver<R: IntoDynamicSend> {
data: Arc<Mutex<SignalFutureData<R::Target>>>,
}
impl<R: IntoDynamicSend> Clone for SignalFutureResolver<R> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
}
}
}
#[cfg(feature = "trace")] #[cfg_attr(published_docs, doc(cfg(feature = "trace")))]
pub fn create_test_signal_future_resolver<R: IntoDynamicSend>() -> SignalFutureResolver<R> {
SignalFutureResolver {
data: Arc::new(Mutex::new(SignalFutureData::default())),
}
}
impl<R: IntoDynamicSend> SignalFutureResolver<R> {
fn new(data: Arc<Mutex<SignalFutureData<R::Target>>>) -> Self {
Self { data }
}
}
impl<R: IntoDynamicSend> std::hash::Hash for SignalFutureResolver<R> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_usize(Arc::as_ptr(&self.data) as usize);
}
}
impl<R: IntoDynamicSend> PartialEq for SignalFutureResolver<R> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.data, &other.data)
}
}
impl<R: InParamTuple + IntoDynamicSend> RustCallable for SignalFutureResolver<R> {
fn invoke(&mut self, args: &[&Variant]) -> Variant {
let waker = {
let mut data = self.data.lock().unwrap();
data.state = SignalFutureState::Ready(R::from_variant_array(args).into_dynamic_send());
data.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
Variant::nil()
}
}
impl<R: IntoDynamicSend> Display for SignalFutureResolver<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SignalFutureResolver::<{}>", std::any::type_name::<R>())
}
}
impl<R: IntoDynamicSend> Drop for SignalFutureResolver<R> {
fn drop(&mut self) {
let mut data = self.data.lock().unwrap();
if !matches!(data.state, SignalFutureState::Pending) {
return;
}
data.state = SignalFutureState::Dead;
if let Some(ref waker) = data.waker {
waker.wake_by_ref();
}
}
}
#[derive(Default)]
enum SignalFutureState<T> {
#[default]
Pending,
Ready(T),
Dead,
Dropped,
}
impl<T> SignalFutureState<T> {
fn take(&mut self) -> Self {
let new_value = match self {
Self::Pending => Self::Pending,
Self::Ready(_) | Self::Dead => Self::Dead,
Self::Dropped => Self::Dropped,
};
std::mem::replace(self, new_value)
}
}
pub struct FallibleSignalFuture<R: InParamTuple + IntoDynamicSend> {
data: Arc<Mutex<SignalFutureData<R::Target>>>,
callable: SignalFutureResolver<R>,
signal: Signal,
}
impl<R: InParamTuple + IntoDynamicSend> FallibleSignalFuture<R> {
fn new(signal: Signal) -> Self {
sys::strict_assert!(
!signal.is_null(),
"Failed to create future for invalid signal:\n\
Either the signal object was already freed, or it\n\
was not registered in the object before being used.",
);
let data = Arc::new(Mutex::new(SignalFutureData::default()));
let callable = SignalFutureResolver::new(data.clone());
signal.connect_flags(
&Callable::from_custom(callable.clone()),
ConnectFlags::ONE_SHOT,
);
Self {
data,
callable,
signal,
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<R, FallibleSignalFutureError>> {
let mut data = self.data.lock().unwrap();
data.waker.replace(cx.waker().clone());
let value = data.state.take();
drop(data);
match value {
SignalFutureState::Pending => Poll::Pending,
SignalFutureState::Dropped => unreachable!(),
SignalFutureState::Dead => Poll::Ready(Err(FallibleSignalFutureError)),
SignalFutureState::Ready(value) => {
let Some(value) = DynamicSend::extract_if_safe(value) else {
panic!(
"the awaited signal was not emitted on the main-thread, but contained a non Send argument"
);
};
Poll::Ready(Ok(value))
}
}
}
}
#[derive(Debug)]
pub struct FallibleSignalFutureError;
impl Display for FallibleSignalFutureError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"The signal object was freed before the awaited signal was emitted"
)
}
}
impl std::error::Error for FallibleSignalFutureError {}
impl<R: InParamTuple + IntoDynamicSend> Future for FallibleSignalFuture<R> {
type Output = Result<R, FallibleSignalFutureError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().poll(cx)
}
}
impl<R: InParamTuple + IntoDynamicSend> Drop for FallibleSignalFuture<R> {
fn drop(&mut self) {
if self.signal.is_null() {
return;
}
let mut data_lock = self.data.lock().unwrap();
data_lock.state = SignalFutureState::Dropped;
drop(data_lock);
let gd_callable = Callable::from_custom(self.callable.clone());
if !self.signal.is_null() && self.signal.is_connected(&gd_callable) {
self.signal.disconnect(&gd_callable);
}
}
}
impl Signal {
pub fn to_fallible_future<R: InParamTuple + IntoDynamicSend>(&self) -> FallibleSignalFuture<R> {
FallibleSignalFuture::new(self.clone())
}
pub fn to_future<R: InParamTuple + IntoDynamicSend>(&self) -> SignalFuture<R> {
SignalFuture::new(self.clone())
}
}
impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> TypedSignal<'_, C, R> {
pub fn to_fallible_future(&self) -> FallibleSignalFuture<R> {
FallibleSignalFuture::new(self.to_untyped())
}
pub fn to_future(&self) -> SignalFuture<R> {
SignalFuture::new(self.to_untyped())
}
}
impl<C: WithSignals, R: InParamTuple + IntoDynamicSend> IntoFuture for &TypedSignal<'_, C, R> {
type Output = R;
type IntoFuture = SignalFuture<R>;
fn into_future(self) -> Self::IntoFuture {
self.to_future()
}
}
pub trait IntoDynamicSend: Sealed + 'static {
type Target: DynamicSend<Inner = Self>;
fn into_dynamic_send(self) -> Self::Target;
}
pub unsafe trait DynamicSend: Send + Sealed {
type Inner;
fn extract_if_safe(self) -> Option<Self::Inner>;
}
pub struct ThreadConfined<T> {
value: Option<T>,
thread_id: ThreadId,
}
unsafe impl<T> Send for ThreadConfined<T> {}
impl<T> ThreadConfined<T> {
pub(crate) fn new(value: T) -> Self {
Self {
value: Some(value),
thread_id: std::thread::current().id(),
}
}
pub(crate) fn extract(mut self) -> Option<T> {
if self.is_original_thread() {
self.value.take()
} else {
None }
}
fn is_original_thread(&self) -> bool {
self.thread_id == std::thread::current().id()
}
}
impl<T> Drop for ThreadConfined<T> {
fn drop(&mut self) {
if !self.is_original_thread() {
std::mem::forget(self.value.take());
godot_error!(
"Dropped ThreadConfined<T> on a different thread than it was created on. The inner T value will be leaked."
);
}
}
}
unsafe impl<T: GodotClass> DynamicSend for ThreadConfined<Gd<T>> {
type Inner = Gd<T>;
fn extract_if_safe(self) -> Option<Self::Inner> {
self.extract()
}
}
impl<T: GodotClass> Sealed for ThreadConfined<Gd<T>> {}
impl<T: GodotClass> IntoDynamicSend for Gd<T> {
type Target = ThreadConfined<Self>;
fn into_dynamic_send(self) -> Self::Target {
ThreadConfined::new(self)
}
}
#[macro_export(local_inner_macros)]
macro_rules! impl_dynamic_send {
(Send; $($ty:ty),+) => {
$(
unsafe impl $crate::task::DynamicSend for $ty {
type Inner = Self;
fn extract_if_safe(self) -> Option<Self::Inner> {
Some(self)
}
}
impl $crate::task::IntoDynamicSend for $ty {
type Target = Self;
fn into_dynamic_send(self) -> Self::Target {
self
}
}
)+
};
(tuple; $($arg:ident: $ty:ident),*) => {
unsafe impl<$($ty: $crate::task::DynamicSend ),*> $crate::task::DynamicSend for ($($ty,)*) {
type Inner = ($($ty::Inner,)*);
fn extract_if_safe(self) -> Option<Self::Inner> {
#[allow(non_snake_case)]
let ($($arg,)*) = self;
#[allow(clippy::unused_unit)]
match ($($arg.extract_if_safe(),)*) {
($(Some($arg),)*) => Some(($($arg,)*)),
#[allow(unreachable_patterns)]
_ => None,
}
}
}
impl<$($ty: $crate::task::IntoDynamicSend),*> $crate::task::IntoDynamicSend for ($($ty,)*) {
type Target = ($($ty::Target,)*);
fn into_dynamic_send(self) -> Self::Target {
#[allow(non_snake_case)]
let ($($arg,)*) = self;
#[allow(clippy::unused_unit)]
($($arg.into_dynamic_send(),)*)
}
}
};
(!Send; $($ty:ident),+) => {
$(
impl $crate::meta::sealed::Sealed for $crate::task::ThreadConfined<$crate::builtin::$ty> {}
unsafe impl $crate::task::DynamicSend for $crate::task::ThreadConfined<$crate::builtin::$ty> {
type Inner = $crate::builtin::$ty;
fn extract_if_safe(self) -> Option<Self::Inner> {
self.extract()
}
}
impl $crate::task::IntoDynamicSend for $crate::builtin::$ty {
type Target = $crate::task::ThreadConfined<$crate::builtin::$ty>;
fn into_dynamic_send(self) -> Self::Target {
$crate::task::ThreadConfined::new(self)
}
}
)+
};
}
#[cfg(test)] #[cfg_attr(published_docs, doc(cfg(test)))]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use super::{SignalFutureResolver, ThreadConfined};
use crate::classes::Object;
use crate::obj::Gd;
use crate::sys;
#[test]
fn future_resolver_cloned_hash() {
let resolver_a = SignalFutureResolver::<(Gd<Object>, i64)>::new(Arc::default());
let resolver_b = resolver_a.clone();
let hash_a = sys::hash_value(&resolver_a);
let hash_b = sys::hash_value(&resolver_b);
assert_eq!(hash_a, hash_b);
}
#[test]
#[cfg_attr(
all(target_family = "wasm", not(target_feature = "atomics")),
ignore = "Threading not available"
)]
fn thread_confined_extract() {
let confined = ThreadConfined::new(772);
assert_eq!(confined.extract(), Some(772));
let confined = ThreadConfined::new(772);
let handle = thread::spawn(move || {
assert!(confined.extract().is_none());
});
handle.join().unwrap();
}
#[test]
#[cfg_attr(
all(target_family = "wasm", not(target_feature = "atomics")),
ignore = "Threading not available"
)]
fn thread_confined_leak_on_other_thread() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
COUNTER.fetch_add(1, Ordering::SeqCst);
}
}
let drop_counter = DropCounter;
let confined = ThreadConfined::new(drop_counter);
let handle = thread::spawn(move || drop(confined));
handle.join().unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
}
}