use std::{future::Future, ops::{Deref, DerefMut}, pin::Pin};
use futures::{stream::FusedStream, Stream, StreamExt};
#[derive(Debug, Clone)]
pub struct Spawner< 'env, T = ()>(
futures::channel::mpsc::UnboundedSender< Pin<Box<dyn Future<Output = T> + 'env>>>,
);
impl<'env, T> Spawner<'env, T> {
pub fn spawn(
&mut self,
future: impl Future<Output = T> + 'env,
) -> Result<(), futures::channel::mpsc::SendError> {
self.0.start_send(Box::pin(future))
}
}
#[derive(Debug)]
pub struct Anchor< 'env, T = ()> {
receiver: futures::channel::mpsc::UnboundedReceiver<
Pin<Box<dyn Future<Output = T> + 'env>>,
>,
pub spawner: Spawner< 'env, T>,
}
impl<'env, T> Anchor<'env, T> {
pub fn new() -> Self {
let (sender, receiver) = futures::channel::mpsc::unbounded();
Anchor {
receiver,
spawner: Spawner(sender),
}
}
pub fn stream(self) -> Pool<'env, T> {
Pool {
receiver: self.receiver,
tasks: futures::stream::FuturesUnordered::new(),
}
}
}
impl<'env, T> Deref for Anchor<'env, T> {
type Target = Spawner<'env, T>;
fn deref(&self) -> &Self::Target {
&self.spawner
}
}
impl<'env, T> DerefMut for Anchor<'env, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.spawner
}
}
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pool< 'env, T = ()> {
receiver: futures::channel::mpsc::UnboundedReceiver<
Pin<Box<dyn Future<Output = T> + 'env>>,
>,
tasks:
futures::stream::FuturesUnordered< Pin<Box<dyn Future<Output = T> + 'env>>>,
}
impl<'env, T> Stream for Pool<'env, T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
if self.receiver.is_terminated() {
return self.tasks.poll_next_unpin(cx);
}
loop {
match self.receiver.poll_next_unpin(cx) {
Poll::Ready(None) => {
if self.tasks.is_terminated() {
return Poll::Ready(None);
} else {
break;
}
}
Poll::Ready(Some(task)) => {
self.tasks.push(task);
continue;
}
Poll::Pending => break,
}
#[allow(unreachable_code)]
{
unreachable!()
}
}
if !self.tasks.is_terminated() {
match self.tasks.poll_next_unpin(cx) {
Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
Poll::Ready(None) => {
if self.receiver.is_terminated() {
return Poll::Ready(None);
}
}
Poll::Pending => (),
}
};
Poll::Pending
}
}
impl<'env, T> FusedStream for Pool<'env, T> {
fn is_terminated(&self) -> bool {
self.tasks.is_terminated() && self.receiver.is_terminated()
}
}