mod task;
use fumio_utils::park::Park;
use futures_core::future::{Future, FutureObj, LocalFutureObj};
use futures_core::task::{Spawn, LocalSpawn, SpawnError};
use futures_executor::Enter;
use futures_util::pin_mut;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
fn run_executor<P: Park, T, F: FnMut(&mut Context<'_>) -> Poll<T>>(park: &mut P, enter: &mut Enter, mut f: F) -> T {
let waker = park.waker();
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
park.park(enter, None);
}
}
#[derive(Debug)]
pub struct LocalPool {
task_list: Rc<task::LocalTaskList>,
}
impl LocalPool {
pub fn new() -> Self {
Self {
task_list: Rc::new(task::LocalTaskList::new()),
}
}
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner {
task_list: Rc::downgrade(&self.task_list)
}
}
pub fn run<P: Park>(&mut self, park: &mut P, enter: &mut Enter) {
run_executor(park, enter, |cx| self.poll_pool(cx))
}
pub fn run_until<P: Park, F: Future>(&mut self, park: &mut P, enter: &mut Enter, future: F) -> F::Output {
pin_mut!(future);
run_executor(park, enter, |cx| {
{
let result = future.as_mut().poll(cx);
if let Poll::Ready(output) = result {
return Poll::Ready(output);
}
}
let _ = self.poll_pool(cx);
Poll::Pending
})
}
pub fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.task_list.poll(cx)
}
pub fn spawn(&self, future: LocalFutureObj<'static, ()>) {
self.task_list.add_task(future);
}
}
impl Default for LocalPool {
fn default() -> Self {
Self::new()
}
}
impl Spawn for LocalPool {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.spawn_local_obj(future.into())
}
fn status(&self) -> Result<(), SpawnError> {
self.status_local()
}
}
impl LocalSpawn for LocalPool {
fn spawn_local_obj(
&mut self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.spawn(future);
Ok(())
}
fn status_local(&self) -> Result<(), SpawnError> {
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct LocalSpawner {
task_list: Weak<task::LocalTaskList>,
}
impl LocalSpawner {
pub fn enter<F, T>(self, enter: &mut Enter, f: F) -> T
where
F: FnOnce(&mut Enter) -> T
{
crate::current::enter_local(self, enter, f)
}
}
impl Spawn for LocalSpawner {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
self.spawn_local_obj(future.into())
}
fn status(&self) -> Result<(), SpawnError> {
self.status_local()
}
}
impl LocalSpawn for LocalSpawner {
fn spawn_local_obj(
&mut self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
if let Some(task_list) = self.task_list.upgrade() {
task_list.add_task(future);
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
fn status_local(&self) -> Result<(), SpawnError> {
if self.task_list.upgrade().is_some() {
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}