use alloc::sync::{Arc, Weak};
use futures::future::LocalFutureObj;
use futures::{FutureExt};
use core::task::{Poll};
use crossbeam::queue::ArrayQueue;
use futures::task::UnsafeFutureObj;
use crate::poll_fn;
use futures::future::FutureObj;
use futures::task::Spawn;
use futures::task::SpawnError;
#[derive(Debug)]
pub struct LocalPool<'a, Ret = ()> {
pool: Arc<ArrayQueue<LocalFutureObj<'a, Ret>>>,
}
#[derive(Clone)]
pub struct Spawner<'a, Ret> {
tx: Weak<ArrayQueue<LocalFutureObj<'a, Ret>>>,
}
impl<'a> Spawner<'a, ()> {
pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
where F: UnsafeFutureObj<'a, ()> + Send {
let tx = self.tx.upgrade().ok_or(SpawnError::shutdown())?;
tx.push(LocalFutureObj::new(f)).expect("Queue full");
Ok(())
}
}
impl Spawn for Spawner<'static, ()> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
let tx = self.tx.upgrade().ok_or(SpawnError::shutdown())?;
tx.push(future.into()).expect("Queue full");
Ok(())
}
}
impl<'a, Ret> LocalPool<'a, Ret> {
pub fn new(cap: usize) -> Self {
Self {
pool: Arc::new(ArrayQueue::new(cap)),
}
}
pub fn spawner(&self) -> Spawner<'a, Ret> {
Spawner {
tx: Arc::downgrade(&self.pool),
}
}
pub fn spawn<F>(&mut self, f: F)
where F: UnsafeFutureObj<'a, Ret> {
self.pool.push(LocalFutureObj::new(f)).expect("Queue full");
}
pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
let mut results = alloc::vec::Vec::new();
loop {
let ret = self.poll_once();
match ret {
Poll::Pending => {}
Poll::Ready(None) => break,
Poll::Ready(Some(r)) => { results.push(r); }
}
}
results
}
pub fn try_run_one(&mut self) -> Poll<Ret> {
let ret = self.poll_though();
match ret {
Poll::Ready(Some(ret)) => {
Poll::Ready(ret)
}
Poll::Ready(None) => {
Poll::Pending
}
Poll::Pending => {
Poll::Pending
}
}
}
pub fn poll_though(&mut self) -> Poll<Option<Ret>> {
let len = self.pool.len();
if len == 0 {
return Poll::Ready(None);
}
poll_fn(|cx| {
for _ in 0..len {
if let Some(mut future) = self.pool.pop() {
match future.poll_unpin(cx) {
Poll::Pending => {
self.pool.push(future).expect("Queue full");
}
Poll::Ready(ret) => {
return Poll::Ready(Some(ret));
}
}
}
}
Poll::Pending
})
}
pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
if let Some(mut future) = self.pool.pop() {
match poll_fn(|cx| future.poll_unpin(cx)) {
Poll::Pending => {
self.pool.push(future).expect("Queue full");
Poll::Pending
}
Poll::Ready(ret) => {
Poll::Ready(Some(ret))
}
}
} else {
Poll::Ready(None)
}
}
}