use {
core::{
future::pending,
pin::Pin,
task::{Context, Poll},
},
futures::{Stream, StreamExt, stream::FuturesUnordered},
};
#[derive(Debug)]
pub struct AsyncWorkQueue<T: Send + 'static = ()>(
FuturesUnordered<Pin<Box<dyn futures::Future<Output = T> + Send + 'static>>>,
);
impl<T: Send + 'static> Default for AsyncWorkQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Send + 'static> AsyncWorkQueue<T> {
pub fn new() -> Self {
let inner = FuturesUnordered::<
Pin<Box<dyn futures::Future<Output = T> + Send + 'static>>,
>::new();
inner.push(Box::pin(pending::<T>()));
Self(inner)
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.0.len() == 1
}
pub fn enqueue<F>(&self, fut: F)
where
F: Future<Output = T> + Send + 'static,
{
self.0.push(Box::pin(fut));
}
}
impl<T: Send + 'static> Stream for AsyncWorkQueue<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}