deluge 0.2.1

A highly concurrent stream library driving the underlying futures either concurrently or in parallel to process streaming operations as quickly as possible.
Documentation
use pin_project::pin_project;

use super::collect::Collect;
use super::map::Map;
use crate::deluge::Deluge;
use futures::task::{Context, Poll};
use futures::Stream;
use std::future::Future;
use std::pin::Pin;

#[pin_project]
pub struct All<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    #[pin]
    stream: Collect<'a, Map<Del, F>, ()>,
}

impl<'a, Del, Fut, F> All<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    pub(crate) fn new(deluge: Del, concurrency: impl Into<Option<usize>>, f: F) -> Self {
        Self {
            stream: Collect::new(Map::new(deluge, f), concurrency),
        }
    }
}

impl<'a, Del, Fut, F> Future for All<'a, Del, Fut, F>
where
    Del: Deluge + 'a,
    F: Fn(Del::Item) -> Fut + Send + 'a,
    Fut: Future<Output = bool> + Send,
{
    type Output = bool;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.as_mut().project();
        match this.stream.poll_next(cx) {
            Poll::Ready(Some(true)) => {
                cx.waker().wake_by_ref();
                Poll::Pending
            }
            Poll::Ready(Some(false)) => Poll::Ready(false),
            Poll::Ready(None) => Poll::Ready(true),
            Poll::Pending => Poll::Pending,
        }
    }
}