ruchei 0.1.2

Utilities for working with many streams
Documentation
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{
    Stream, StreamExt, ready,
    stream::{Fuse, FusedStream},
};
use pin_project::pin_project;

#[pin_project]
#[derive(Debug)]
pub struct Grouped<S, T, K = <<S as Stream>::Item as GroupItem>::K> {
    #[pin]
    stream: Fuse<S>,
    current: Option<(K, T)>,
}

impl<S: Default + Stream, T, K> Default for Grouped<S, T, K> {
    fn default() -> Self {
        S::default().into()
    }
}

pub trait GroupItem: Sized {
    type K: PartialEq;
    type V;
    type Grouped<T: Extend<Self::V>>;

    fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
        stream: Pin<&mut S>,
        current: &mut Option<(Self::K, T)>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Grouped<T>>>;
}

impl<K: PartialEq, V> GroupItem for (K, V) {
    type K = K;
    type V = V;
    type Grouped<T: Extend<Self::V>> = (K, T);

    fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
        mut stream: Pin<&mut S>,
        current: &mut Option<(Self::K, T)>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Grouped<T>>> {
        let mut next = || {
            if stream.is_terminated() {
                Poll::Ready(None)
            } else {
                stream.as_mut().poll_next(cx)
            }
        };
        let (k, t) = match current.as_mut() {
            Some(current) => current,
            None => {
                let Some((k, v)) = ready!(next()) else {
                    return Poll::Ready(None);
                };
                let mut t = T::default();
                t.extend(std::iter::once(v));
                current.insert((k, t))
            }
        };
        while let Some((mut k_other, v)) = ready!(next()) {
            if k_other != *k {
                std::mem::swap(&mut k_other, k);
                let t_other = std::mem::take(t);
                t.extend(std::iter::once(v));
                return Poll::Ready(Some((k_other, t_other)));
            }
            t.extend(std::iter::once(v));
        }
        let current = current.take().expect("has just dealt with Some");
        Poll::Ready(Some(current))
    }
}

impl<K: PartialEq, V, E> GroupItem for Result<(K, V), E> {
    type K = K;
    type V = V;
    type Grouped<T: Extend<Self::V>> = Result<(K, T), E>;

    fn poll_next_group<S: FusedStream<Item = Self>, T: Default + Extend<Self::V>>(
        mut stream: Pin<&mut S>,
        current: &mut Option<(Self::K, T)>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Grouped<T>>> {
        let mut next = || {
            if stream.is_terminated() {
                Poll::Ready(None)
            } else {
                stream.as_mut().poll_next(cx)
            }
        };
        let (k, t) = match current.as_mut() {
            Some(current) => current,
            None => {
                let Some((k, v)) = ready!(next()?) else {
                    return Poll::Ready(None);
                };
                let mut t = T::default();
                t.extend(std::iter::once(v));
                current.insert((k, t))
            }
        };
        while let Some((mut k_other, v)) = ready!(next()?) {
            if k_other != *k {
                std::mem::swap(&mut k_other, k);
                let t_other = std::mem::take(t);
                t.extend(std::iter::once(v));
                return Poll::Ready(Some(Ok((k_other, t_other))));
            }
            t.extend(std::iter::once(v));
        }
        let current = current.take().expect("has just dealt with Some");
        Poll::Ready(Some(Ok(current)))
    }
}

impl<S: Stream<Item = I>, I: GroupItem, T: Default + Extend<I::V>> Stream for Grouped<S, T, I::K> {
    type Item = I::Grouped<T>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        I::poll_next_group(this.stream, this.current, cx)
    }
}

impl<S: Stream<Item = I>, I: GroupItem, T: Default + Extend<I::V>> FusedStream
    for Grouped<S, T, I::K>
{
    fn is_terminated(&self) -> bool {
        self.current.is_none() && self.stream.is_terminated()
    }
}

impl<S: Stream, T, K> From<S> for Grouped<S, T, K> {
    fn from(stream: S) -> Self {
        Self {
            stream: stream.fuse(),
            current: None,
        }
    }
}

pub trait GroupSequential: Sized + Stream<Item: GroupItem<V = Self::V>> {
    type V;
    #[must_use]
    fn group_sequential<T: Default + Extend<Self::V>>(self) -> Grouped<Self, T> {
        self.into()
    }
}

impl<S: Stream<Item = I>, I: GroupItem> GroupSequential for S {
    type V = I::V;
}