use
{
crate :: { TokioHandle, SpawnHandle, LocalSpawnHandle, JoinHandle, join_handle::InnerJh } ,
std :: { rc::Rc, cell::RefCell, convert::TryFrom, future::Future, sync::atomic::AtomicBool } ,
tokio :: { task::LocalSet, runtime::{ Builder, Runtime, Handle as TokioRtHandle } } ,
futures_task :: { FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError } ,
futures_util :: { future::abortable } ,
};
#[ derive( Debug, Clone ) ]
#[ cfg_attr( nightly, doc(cfg( feature = "tokio_ct" )) ) ]
pub struct TokioCt
{
pub(crate) exec : Rc<RefCell< Runtime >> ,
pub(crate) local : Rc< LocalSet > ,
pub(crate) handle: TokioRtHandle,
}
impl TokioCt
{
pub fn block_on< F: Future >( &self, f: F ) -> F::Output
{
self.exec.borrow_mut().block_on( self.local.run_until( f ) )
}
pub fn handle( &self ) -> TokioHandle
{
TokioHandle::new( self.handle.clone() )
}
}
impl TryFrom<&mut Builder> for TokioCt
{
type Error = std::io::Error;
fn try_from( builder: &mut Builder ) -> Result<Self, Self::Error>
{
let exec = builder.basic_scheduler().build()?;
let local = LocalSet::new();
Ok( Self
{
handle : exec.handle().clone() ,
exec : Rc::new( RefCell::new(exec ) ) ,
local : Rc::new( local ) ,
})
}
}
impl Spawn for TokioCt
{
fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
{
let _ = self.local.spawn_local( future );
Ok(())
}
}
impl LocalSpawn for TokioCt
{
fn spawn_local_obj( &self, future: LocalFutureObj<'static, ()> ) -> Result<(), SpawnError>
{
let _ = self.local.spawn_local( future );
Ok(())
}
}
impl<Out: 'static + Send> SpawnHandle<Out> for TokioCt
{
fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let (fut, a_handle) = abortable( future );
Ok( JoinHandle{ inner: InnerJh::Tokio
{
handle : self.handle.spawn( fut ) ,
detached: AtomicBool::new( false ) ,
a_handle ,
}})
}
}
impl<Out: 'static> LocalSpawnHandle<Out> for TokioCt
{
fn spawn_handle_local_obj( &self, future: LocalFutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
{
let (fut, a_handle) = abortable( future );
Ok( JoinHandle{ inner: InnerJh::Tokio
{
handle : self.local.spawn_local( fut ) ,
detached: AtomicBool::new( false ) ,
a_handle ,
}})
}
}
#[ cfg(test) ]
mod tests
{
use super::*;
static_assertions::assert_not_impl_any!( TokioCt: Send, Sync );
}