use alloc::sync::{Arc, Weak};
use futures::stream::FuturesUnordered;
use futures::future::LocalFutureObj;
use futures::StreamExt;
use core::task::{Poll};
use crossbeam::queue::SegQueue;
use futures::task::UnsafeFutureObj;
use crate::poll_fn;
use futures::future::FutureObj;
use futures::task::Spawn;
use futures::task::SpawnError;
#[derive(Debug)]
pub struct LocalPool<'a, Ret = ()> {
pool: FuturesUnordered<LocalFutureObj<'a, Ret>>,
other: Arc<SegQueue<LocalFutureObj<'static, Ret>>>,
}
#[derive(Clone)]
pub struct Spawner<Ret> {
tx: Weak<SegQueue<LocalFutureObj<'static, Ret>>>,
}
impl Spawner<()> {
pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
where F: UnsafeFutureObj<'static, ()> + Send {
self.spawn_obj(FutureObj::new(f))
}
}
impl Spawn for Spawner<()> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
let tx = self.tx.upgrade().ok_or(SpawnError::shutdown())?;
tx.push(future.into());
Ok(())
}
}
impl<'a, Ret> LocalPool<'a, Ret> {
pub fn new() -> Self {
Self {
pool: FuturesUnordered::new(),
other: Arc::new(SegQueue::new())
}
}
pub fn spawner(&self) -> Spawner<Ret> {
Spawner {
tx: Arc::downgrade(&self.other),
}
}
pub fn spawn<F>(&mut self, f: F)
where F: UnsafeFutureObj<'a, Ret> {
self.pool.push(LocalFutureObj::new(f))
}
pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
let mut results = alloc::vec::Vec::new();
loop {
let ret = self.poll_once();
match ret {
Poll::Pending => {}
Poll::Ready(None) => break,
Poll::Ready(Some(r)) => { results.push(r); }
}
}
results
}
pub fn try_run_one(&mut self) -> Poll<Ret> {
let ret = self.poll_once();
match ret {
Poll::Ready(Some(ret)) => {
Poll::Ready(ret)
}
Poll::Ready(None) => {
Poll::Pending
}
Poll::Pending => {
Poll::Pending
}
}
}
pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
poll_fn(|cx| {
while let Some(fut) = self.other.pop() {
self.pool.push(fut);
}
self.pool.poll_next_unpin(cx)
})
}
}
impl<'a, Ret> Default for LocalPool<'a, Ret> {
fn default() -> Self {
Self::new()
}
}