#[ allow(unused_imports) ] use
{
std :: { future::Future, sync::atomic::{ AtomicBool, Ordering } } ,
std :: { task::{ Poll, Context }, pin::Pin } ,
futures_util:: { future::{ AbortHandle, Aborted, RemoteHandle }, ready } ,
super :: *,
};
#[ derive( Debug ) ]
#[ must_use = "JoinHandle will cancel your future when dropped unless you await it." ]
pub struct JoinHandle<T> { inner: InnerJh<T> }
impl<T> JoinHandle<T>
{
#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
pub fn tokio( handle: TokioJoinHandle<T> ) -> Self
{
let detached = false;
let inner = InnerJh::Tokio { handle, detached };
Self{ inner }
}
#[ cfg( feature = "async_global" ) ]
pub fn async_global( task: AsyncGlobalTask<T> ) -> Self
{
let task = Some( task );
let inner = InnerJh::AsyncGlobal{ task };
Self{ inner }
}
#[ cfg( feature = "async_std" ) ]
pub fn async_std
(
handle : AsyncStdJoinHandle<Result<T, Aborted>> ,
a_handle: AbortHandle ,
) -> Self
{
let detached = false;
let inner = InnerJh::AsyncStd{ handle, a_handle, detached };
Self{ inner }
}
pub fn remote_handle( handle: RemoteHandle<T> ) -> Self
{
let inner = InnerJh::RemoteHandle{ handle: Some(handle) };
Self{ inner }
}
}
#[ derive(Debug) ] #[ allow(dead_code) ]
enum InnerJh<T>
{
#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
Tokio
{
handle : TokioJoinHandle<T> ,
detached: bool ,
},
#[ cfg( feature = "async_global" ) ]
AsyncGlobal
{
task: Option< AsyncGlobalTask<T> > ,
},
#[ cfg( feature = "async_std" ) ]
AsyncStd
{
handle : AsyncStdJoinHandle<Result<T, Aborted>> ,
a_handle: AbortHandle ,
detached: bool ,
},
RemoteHandle
{
handle: Option<RemoteHandle<T>>,
},
}
impl<T> JoinHandle<T>
{
pub fn detach( mut self )
{
match &mut self.inner
{
#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
InnerJh::Tokio{ ref mut detached, .. } =>
{
*detached = true;
}
#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task } =>
{
let task = task.take();
task.unwrap().detach();
}
#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ ref mut detached, .. } =>
{
*detached = true;
}
InnerJh::RemoteHandle{ handle } =>
{
if let Some(rh) = handle.take() { rh.forget() };
}
}
}
}
impl<T: 'static> Future for JoinHandle<T>
{
type Output = T;
fn poll( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>
{
match &mut self.get_mut().inner
{
#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
InnerJh::Tokio{ handle, .. } =>
{
match ready!( Pin::new( handle ).poll( cx ) )
{
Ok (t) => Poll::Ready( t ),
Err(e) =>
{
panic!( "Task has been canceled or it has panicked. Are you dropping the executor to early? Error: {}", e );
}
}
}
#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ handle, .. } =>
{
match ready!( Pin::new( handle ).poll( cx ) )
{
Ok (t) => Poll::Ready( t ),
Err(_) => unreachable!(),
}
}
#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task, .. } =>
{
Pin::new( task.as_mut().unwrap() ).poll( cx )
}
InnerJh::RemoteHandle{ ref mut handle } => Pin::new( handle ).as_pin_mut().expect( "no polling after detach" ).poll( cx ),
}
}
}
impl<T> Drop for JoinHandle<T>
{
fn drop( &mut self )
{
match &mut self.inner
{
#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
InnerJh::Tokio{ handle, detached, .. } =>
if !*detached { handle.abort() },
#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd { a_handle, detached, .. } =>
if !*detached { a_handle.abort() },
#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal { .. } => {}
InnerJh::RemoteHandle{ .. } => {},
};
}
}