use
{
crate :: { SpawnHandle, LocalSpawnHandle, JoinHandle, BlockingHandle } ,
std :: { fmt, rc::Rc, future::Future, convert::TryFrom } ,
tokio :: { task::LocalSet, runtime::{ Builder, Runtime, Handle, RuntimeFlavor } } ,
futures_task :: { FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError } ,
};
#[derive(Debug, Clone)]
enum Spawner
{
Runtime( Rc<Runtime> ) ,
Handle ( Handle ) ,
}
#[ derive( Debug, Clone ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_ct" )) ) ]
pub struct TokioCt
{
spawner: Spawner,
local: Rc< LocalSet > ,
}
impl TryFrom<Runtime> for TokioCt
{
type Error = Runtime;
fn try_from( rt: Runtime ) -> Result<Self, Runtime>
{
match rt.handle().runtime_flavor()
{
RuntimeFlavor::CurrentThread => Ok( Self
{
spawner: Spawner::Runtime( Rc::new(rt) ) ,
local: Rc::new( LocalSet::new() ) ,
}),
_ => Err( rt ),
}
}
}
impl TryFrom<Handle> for TokioCt
{
type Error = Handle;
fn try_from( handle: Handle ) -> Result<Self, Handle>
{
match handle.runtime_flavor()
{
RuntimeFlavor::CurrentThread => Ok( Self
{
spawner: Spawner::Handle( handle ) ,
local: Rc::new( LocalSet::new() ) ,
}),
_ => Err( handle ),
}
}
}
impl TokioCt
{
pub fn new() -> Result<Self, TokioCtErr>
{
let mut builder = Builder::new_current_thread();
#[ cfg( feature = "tokio_io" ) ]
builder.enable_io();
#[ cfg( feature = "tokio_timer" ) ]
builder.enable_time();
let rt = builder.build().map_err( |e| TokioCtErr::Builder(e.kind()) )?;
Ok(Self
{
spawner: Spawner::Runtime(Rc::new( rt )),
local : Rc::new( LocalSet::new() ) ,
})
}
pub fn try_current() -> Result< Self, TokioCtErr >
{
let handle = Handle::try_current()
.map_err(|_| TokioCtErr::NoRuntime )?;
Self::try_from( handle )
.map_err(|_| TokioCtErr::WrongFlavour )
}
pub fn block_on<F: Future>( &self, f: F ) -> F::Output
{
match &self.spawner
{
Spawner::Runtime( rt ) => rt .block_on( self.local.run_until( f ) ) ,
Spawner::Handle ( handle ) => handle.block_on( self.local.run_until( f ) ) ,
}
}
pub async fn run_until<F: Future>( &self, f: F ) -> F::Output
{
self.local.run_until( f ).await
}
}
impl Spawn for TokioCt
{
fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
{
drop( self.local.spawn_local(future) );
Ok(())
}
}
impl LocalSpawn for TokioCt
{
fn spawn_local_obj( &self, future: LocalFutureObj<'static, ()> ) -> Result<(), SpawnError>
{
drop( self.local.spawn_local(future) );
Ok(())
}
}
impl<Out: 'static + Send> SpawnHandle<Out> for TokioCt
{
fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let handle = match &self.spawner
{
Spawner::Runtime( rt ) => rt .spawn( future ) ,
Spawner::Handle ( handle ) => handle.spawn( future ) ,
};
Ok( JoinHandle::tokio(handle) )
}
}
impl<Out: 'static> LocalSpawnHandle<Out> for TokioCt
{
fn spawn_handle_local_obj( &self, future: LocalFutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let handle = self.local.spawn_local( future );
Ok( JoinHandle::tokio(handle) )
}
}
#[ cfg(all( feature = "timer", not(feature="tokio_timer" )) ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "timer", feature = "tokio_ct" ))) ) ]
impl crate::Timer for TokioCt
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( futures_timer::Delay::new(dur) )
}
}
#[ cfg( feature = "tokio_timer" ) ]
#[ cfg_attr( nightly, doc(cfg(all( feature = "tokio_timer", feature = "tokio_ct" ))) ) ]
impl crate::Timer for TokioCt
{
fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
{
Box::pin( tokio::time::sleep(dur) )
}
}
#[ cfg( feature = "tokio_io" ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_io" )) ) ]
impl crate::TokioIo for TokioCt {}
impl crate::YieldNow for TokioCt {}
impl<R: Send + 'static> crate::SpawnBlocking<R> for TokioCt
{
fn spawn_blocking<F>( &self, f: F ) -> BlockingHandle<R>
where F: FnOnce() -> R + Send + 'static ,
{
let handle = match &self.spawner
{
Spawner::Runtime( rt ) => rt .spawn_blocking( f ) ,
Spawner::Handle ( handle ) => handle.spawn_blocking( f ) ,
};
BlockingHandle::tokio( handle )
}
fn spawn_blocking_dyn( &self, f: Box< dyn FnOnce()->R + Send > ) -> BlockingHandle<R>
{
self.spawn_blocking( f )
}
}
#[cfg( feature = "tokio_ct" )]
#[derive(Debug, Clone)]
pub enum TokioCtErr
{
Builder( std::io::ErrorKind ),
Cloned( TokioCt ),
Handle( TokioCt ),
NoRuntime,
WrongFlavour,
}
impl fmt::Display for TokioCtErr
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
use TokioCtErr::*;
match self
{
Builder(source) =>
write!( f, "tokio::runtime::Builder returned an error: {source}" ),
Cloned(_) => write!( f, "The TokioCt executor was cloned. Only the last copy can shut it down." ),
Handle(_) => write!( f, "The TokioCt was created from tokio::runtime::Handle. Only an owned executor (created from `Runtime`) can be shut down." ),
NoRuntime => write!( f, "Call to tokio::Handle::try_current failed, generally because no entered runtime is active." ),
WrongFlavour => write!( f, "Can't create TokioCt from a multithreaded `Runtime`." ),
}
}
}
impl std::error::Error for TokioCtErr {}
#[ cfg(test) ]
mod tests
{
use super::*;
static_assertions::assert_not_impl_any!( TokioCt: Send, Sync );
}