deluge 0.2.0

A highly concurrent stream library driving the underlying futures either concurrently or in parallel to process streaming operations as quickly as possible.
Documentation
use pin_project::pin_project;

use super::collect_par::CollectPar;
use super::map::Map;
use crate::deluge::Deluge;
use futures::task::{Context, Poll};
use futures::Stream;
use std::future::Future;
use std::pin::Pin;

#[pin_project]
pub struct AllPar<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    #[pin]
    stream: CollectPar<'a, Map<Del, F>, ()>,
}

impl<'a, Del, Fut, F> AllPar<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    pub(crate) fn new(
        deluge: Del,
        worker_count: impl Into<Option<usize>>,
        worker_concurrency: impl Into<Option<usize>>,
        f: F,
    ) -> Self {
        Self {
            stream: CollectPar::new(Map::new(deluge, f), worker_count, worker_concurrency),
        }
    }
}

impl<'a, Del, Fut, F> Future for AllPar<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    type Output = bool;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.as_mut().project();
        match this.stream.poll_next(cx) {
            Poll::Ready(Some(true)) => {
                cx.waker().wake_by_ref();
                Poll::Pending
            }
            Poll::Ready(Some(false)) => Poll::Ready(false),
            Poll::Ready(None) => Poll::Ready(true),
            Poll::Pending => Poll::Pending,
        }
    }
}