use futures::stream::FuturesUnordered;
use futures::future::LocalFutureObj;
use futures::StreamExt;
use core::task::{Poll};
use futures::task::UnsafeFutureObj;
use crate::poll_fn;
use futures::future::FutureObj;
use futures::task::Spawn;
use futures::task::SpawnError;
#[derive(Debug)]
pub struct LocalPool<'a, Ret = ()> {
pool: FuturesUnordered<LocalFutureObj<'a, Ret>>,
rx: kanal::Receiver<FutureObj<'static, Ret>>,
tx: kanal::Sender<FutureObj<'static, Ret>>,
}
#[derive(Clone)]
pub struct Spawner<Ret> {
tx: kanal::Sender<FutureObj<'static, Ret>>,
}
impl<Ret> Spawner<Ret> {
pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
where F: UnsafeFutureObj<'static, Ret> + Send {
self.tx.send(FutureObj::new(f)).map_err(|_| SpawnError::shutdown())
}
}
impl Spawn for Spawner<()> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.tx.send(future).map_err(|_| SpawnError::shutdown())
}
}
impl<'a, Ret> LocalPool<'a, Ret> {
pub fn new() -> Self {
let (tx, rx) = kanal::unbounded();
Self { pool: FuturesUnordered::new(), rx, tx }
}
pub fn spawner(&self) -> Spawner<Ret> {
Spawner {
tx: self.tx.clone()
}
}
pub fn spawn<F>(&mut self, f: F)
where F: UnsafeFutureObj<'a, Ret> {
self.pool.push(LocalFutureObj::new(f))
}
pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
let mut results = alloc::vec::Vec::new();
loop {
let ret = self.poll_once();
match ret {
Poll::Pending => {}
Poll::Ready(None) => break,
Poll::Ready(Some(r)) => { results.push(r); }
}
}
results
}
pub fn try_run_one(&mut self) -> Poll<Ret> {
let ret = self.poll_once();
match ret {
Poll::Ready(Some(ret)) => {
Poll::Ready(ret)
}
Poll::Ready(None) => {
Poll::Pending
}
Poll::Pending => {
Poll::Pending
}
}
}
pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
poll_fn(|cx| {
while let Some(fut) = self.rx.try_recv().ok().flatten() {
self.pool.push(LocalFutureObj::from(fut))
}
self.pool.poll_next_unpin(cx)
})
}
}
impl<'a, Ret> Default for LocalPool<'a, Ret> {
fn default() -> Self {
Self::new()
}
}