use
{
crate :: { SpawnHandle, JoinHandle, BlockingHandle } ,
std :: { sync::Arc, future::Future } ,
futures_task :: { FutureObj, Spawn, SpawnError } ,
tokio::runtime :: { Runtime } ,
};
#[ derive( Debug, Clone ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_tp" )) ) ]
pub struct TokioTp
{
pub(crate) exec: Option< Arc<Runtime> >,
}
impl TokioTp
{
pub fn block_on< F: Future >( &self, f: F ) -> F::Output
{
self.exec.as_ref().unwrap().block_on( f )
}
pub fn shutdown_timeout( mut self, duration: std::time::Duration ) -> Result<(), Self>
{
let arc = self.exec.take().unwrap();
let rt = match Arc::try_unwrap( arc )
{
Ok(rt) => rt,
Err(arc) =>
{
self.exec = Some(arc);
return Err(self);
}
};
rt.shutdown_timeout( duration );
Ok(())
}
pub fn shutdown_background( mut self ) -> Result<(), Self>
{
let arc = self.exec.take().unwrap();
let rt = match Arc::try_unwrap( arc )
{
Ok(rt) => rt,
Err(arc) =>
{
self.exec = Some(arc);
return Err(self);
}
};
rt.shutdown_background();
Ok(())
}
}
#[ cfg( feature = "tokio_io" ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_io" )) ) ]
impl crate::TokioIo for TokioTp {}
impl Spawn for TokioTp
{
fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
{
let _ = self.exec.as_ref().unwrap().spawn( future );
Ok(())
}
}
impl<Out: 'static + Send> SpawnHandle<Out> for TokioTp
{
fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let handle = self.exec.as_ref().unwrap().spawn( future );
Ok( JoinHandle::tokio(handle) )
}
}
impl crate::YieldNow for TokioTp {}
impl<R: Send + 'static> crate::SpawnBlocking<R> for TokioTp
{
fn spawn_blocking<F>( &self, f: F ) -> BlockingHandle<R>
where F: FnOnce() -> R + Send + 'static ,
{
let handle = self.exec.as_ref().unwrap().spawn_blocking( f );
BlockingHandle::tokio( handle )
}
fn spawn_blocking_dyn( &self, f: Box< dyn FnOnce()->R + Send > ) -> BlockingHandle<R>
{
self.spawn_blocking( f )
}
}
#[ cfg(all( feature = "timer", not(feature="tokio_timer" )) ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "timer", feature = "tokio_tp" ))) ) ]
impl crate::Timer for TokioTp
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( futures_timer::Delay::new(dur) )
}
}
#[ cfg( feature = "tokio_timer" ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "tokio_timer", feature = "tokio_tp" ))) ) ]
impl crate::Timer for TokioTp
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( tokio::time::sleep(dur) )
}
}