use crate::timer_reactor::TimerReactor;
use crate::pool::{LocalPool, LocalSpawner};
use futures_core::future::{FutureObj, LocalFutureObj};
use futures_core::task::{Spawn, LocalSpawn, SpawnError};
use futures_executor::Enter;
use std::future::Future;
use std::io;
#[derive(Debug)]
pub struct Runtime {
timer_reactor: TimerReactor,
local_pool: LocalPool,
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Ok(Self {
timer_reactor: TimerReactor::new()?,
local_pool: LocalPool::new(),
})
}
pub fn handle(&self) -> Handle {
Handle {
reactor_handle: self.timer_reactor.reactor_handle(),
timer_handle: self.timer_reactor.timer_handle(),
local_spawner: self.local_pool.spawner(),
}
}
fn enter<F, T>(&mut self, enter: &mut Enter, f: F) -> T
where
F: FnOnce(&mut Self, &mut Enter) -> T,
{
self.timer_reactor.reactor_handle().enter(enter, move |enter| {
let timer_handle = self.timer_reactor.timer_handle();
let _scoped_timer = tokio_timer::timer::set_default(&timer_handle);
self.local_pool.spawner().enter(enter, move |enter| {
f(self, enter)
})
})
}
pub fn spawn<F>(&self, future: F)
where
F: Future<Output=()> + 'static,
{
self.local_pool.spawn(Box::pin(future).into())
}
pub fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) {
self.local_pool.spawn(future)
}
pub fn enter_run_until<F, T>(&mut self, enter: &mut Enter, future: F) -> T
where
F: Future<Output = T>,
{
self.enter(enter, |this, enter| {
this.local_pool.run_until(&mut this.timer_reactor, enter, future)
})
}
pub fn run_until<F, T>(&mut self, future: F) -> T
where
F: Future<Output = T>,
{
let mut enter = futures_executor::enter().unwrap();
self.enter_run_until(&mut enter, future)
}
pub fn enter_run(&mut self, enter: &mut Enter) {
self.enter(enter, |this, enter| {
this.local_pool.run(&mut this.timer_reactor, enter)
})
}
pub fn run<F, T>(&mut self) {
let mut enter = futures_executor::enter().unwrap();
self.enter_run(&mut enter)
}
}
impl Spawn for Runtime {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.spawn_local_obj(future.into())
}
fn status(&self) -> Result<(), SpawnError> {
self.status_local()
}
}
impl LocalSpawn for Runtime {
fn spawn_local_obj(
&mut self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.local_pool.spawn_local_obj(future)
}
fn status_local(&self) -> Result<(), SpawnError> {
self.local_pool.status_local()
}
}
#[derive(Clone, Debug)]
pub struct Handle {
reactor_handle: crate::reactor::Handle,
timer_handle: tokio_timer::timer::Handle,
local_spawner: LocalSpawner,
}
impl Handle {
pub fn enter<F, T>(&self, enter: &mut Enter, f: F) -> T
where
F: FnOnce(&mut Enter) -> T,
{
self.reactor_handle.clone().enter(enter, move |enter| {
let _scoped_timer = tokio_timer::timer::set_default(&self.timer_handle);
self.local_spawner.clone().enter(enter, move |enter| {
f(enter)
})
})
}
pub fn reactor(&self) -> crate::reactor::Handle {
self.reactor_handle.clone()
}
pub fn timer(&self) -> tokio_timer::timer::Handle {
self.timer_handle.clone()
}
pub fn spawner(&self) -> LocalSpawner {
self.local_spawner.clone()
}
}
impl Spawn for Handle {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.spawn_local_obj(future.into())
}
fn status(&self) -> Result<(), SpawnError> {
self.status_local()
}
}
impl LocalSpawn for Handle {
fn spawn_local_obj(
&mut self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.local_spawner.spawn_local_obj(future)
}
fn status_local(&self) -> Result<(), SpawnError> {
self.local_spawner.status_local()
}
}