use
{
crate :: { SpawnHandle, JoinHandle, BlockingHandle } ,
std :: { fmt, sync::Arc, future::Future, convert::TryFrom } ,
futures_task :: { FutureObj, Spawn, SpawnError } ,
tokio::runtime :: { Runtime, RuntimeFlavor, Handle, Builder } ,
};
#[ derive( Debug, Clone ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_tp" )) ) ]
pub struct TokioTp
{
spawner: Spawner,
}
#[derive(Debug, Clone)]
enum Spawner
{
Runtime( Arc<Runtime> ) ,
Handle ( Handle ) ,
}
impl TryFrom<Runtime> for TokioTp
{
type Error = Runtime;
fn try_from( rt: Runtime ) -> Result<Self, Runtime>
{
match rt.handle().runtime_flavor()
{
RuntimeFlavor::MultiThread => Ok( Self
{
spawner: Spawner::Runtime( Arc::new(rt) ) ,
}),
_ => Err( rt ),
}
}
}
impl TryFrom<Handle> for TokioTp
{
type Error = Handle;
fn try_from( handle: Handle ) -> Result<Self, Handle>
{
match handle.runtime_flavor()
{
RuntimeFlavor::MultiThread => Ok( Self
{
spawner: Spawner::Handle( handle ) ,
}),
_ => Err( handle ),
}
}
}
impl TokioTp
{
pub fn new() -> Result<Self, TokioTpErr>
{
let mut builder = Builder::new_multi_thread();
#[ cfg( feature = "tokio_io" ) ]
builder.enable_io();
#[ cfg( feature = "tokio_timer" ) ]
builder.enable_time();
let rt = builder.build().map_err( |e| TokioTpErr::Builder(e.kind()) )?;
Ok(Self
{
spawner: Spawner::Runtime(Arc::new( rt )),
})
}
pub fn try_current() -> Result< Self, TokioTpErr >
{
let handle = Handle::try_current()
.map_err(|_| TokioTpErr::NoRuntime )?;
Self::try_from( handle )
.map_err(|_| TokioTpErr::WrongFlavour )
}
pub fn block_on< F: Future >( &self, f: F ) -> F::Output
{
match &self.spawner
{
Spawner::Runtime( rt ) => rt .block_on( f ) ,
Spawner::Handle ( handle ) => handle.block_on( f ) ,
}
}
pub fn shutdown_timeout( self, duration: std::time::Duration ) -> Result<(), TokioTpErr>
{
let Self{ spawner } = self;
let arc = match spawner
{
Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
Spawner::Runtime( arc ) => arc,
};
let rt = match Arc::try_unwrap(arc)
{
Ok(rt) => rt,
Err(arc) =>
{
let this = Self{ spawner: Spawner::Runtime(arc) };
return Err( TokioTpErr::Cloned(this) );
}
};
rt.shutdown_timeout( duration );
Ok(())
}
pub fn shutdown_background( self ) -> Result<(), TokioTpErr>
{
let Self{ spawner } = self;
let arc = match spawner
{
Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
Spawner::Runtime( arc ) => arc,
};
let rt = match Arc::try_unwrap(arc)
{
Ok(rt) => rt,
Err(arc) =>
{
let this = Self{ spawner: Spawner::Runtime(arc) };
return Err( TokioTpErr::Cloned(this) );
}
};
rt.shutdown_background();
Ok(())
}
}
#[ cfg( feature = "tokio_io" ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_io" )) ) ]
impl crate::TokioIo for TokioTp {}
impl Spawn for TokioTp
{
fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
{
match &self.spawner
{
Spawner::Runtime( rt ) => drop( rt .spawn(future) ) ,
Spawner::Handle ( handle ) => drop( handle.spawn(future) ) ,
}
Ok(())
}
}
impl<Out: 'static + Send> SpawnHandle<Out> for TokioTp
{
fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let handle = match &self.spawner
{
Spawner::Runtime( rt ) => rt .spawn(future) ,
Spawner::Handle ( handle ) => handle.spawn(future) ,
};
Ok( JoinHandle::tokio(handle) )
}
}
impl crate::YieldNow for TokioTp {}
impl<R: Send + 'static> crate::SpawnBlocking<R> for TokioTp
{
fn spawn_blocking<F>( &self, f: F ) -> BlockingHandle<R>
where F: FnOnce() -> R + Send + 'static ,
{
let handle = match &self.spawner
{
Spawner::Runtime( rt ) => rt .spawn_blocking( f ) ,
Spawner::Handle ( handle ) => handle.spawn_blocking( f ) ,
};
BlockingHandle::tokio( handle )
}
fn spawn_blocking_dyn( &self, f: Box< dyn FnOnce()->R + Send > ) -> BlockingHandle<R>
{
self.spawn_blocking( f )
}
}
#[ cfg(all( feature = "timer", not(feature="tokio_timer" )) ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "timer", feature = "tokio_tp" ))) ) ]
impl crate::Timer for TokioTp
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( futures_timer::Delay::new(dur) )
}
}
#[ cfg( feature = "tokio_timer" ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "tokio_timer", feature = "tokio_tp" ))) ) ]
impl crate::Timer for TokioTp
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( tokio::time::sleep(dur) )
}
}
#[cfg( feature = "tokio_tp" )]
#[derive(Debug, Clone)]
pub enum TokioTpErr
{
Builder( std::io::ErrorKind ),
Cloned ( TokioTp ),
Handle ( TokioTp ),
NoRuntime,
WrongFlavour,
}
impl fmt::Display for TokioTpErr
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
use TokioTpErr::*;
match self
{
Builder(source) =>
write!( f, "tokio::runtime::Builder returned an error: {source}" ),
Cloned(_) => write!( f, "The TokioTp executor was cloned. Only the last copy can shut it down." ),
Handle(_) => write!( f, "The TokioTp was created from tokio::runtime::Handle. Only an owned executor (created from `Runtime`) can be shut down." ),
NoRuntime => write!( f, "Call to tokio::Handle::try_current failed, generally because no entered runtime is active." ),
WrongFlavour => write!( f, "Can't create TokioTp from a current thread `Runtime`." ),
}
}
}
impl std::error::Error for TokioTpErr {}