ruchei 0.1.3-a.0

Utilities for working with many streams
Documentation
use std::{
    cmp::Ordering,
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{Stream, future::Either};
use pin_project::pin_project;

use super::pair_item::{PairCategory, PairItem, PairStream, StreamPair};

#[pin_project]
#[derive(Debug)]
pub struct GroupSorted<
    L,
    R,
    K = <(L, R) as StreamPair>::K,
    Lv = <L as PairStream>::V,
    Rv = <R as PairStream>::V,
> {
    #[pin]
    l: L,
    #[pin]
    r: R,
    last: Option<(K, Either<Lv, Rv>)>,
}

impl<L: Default, R: Default, K, Lv, Rv> Default for GroupSorted<L, R, K, Lv, Rv> {
    fn default() -> Self {
        Self {
            l: Default::default(),
            r: Default::default(),
            last: Default::default(),
        }
    }
}

impl<
    C: PairCategory,
    K: Ord,
    Lv: Clone,
    Rv,
    Li: PairItem<C = C, K = K, V = Lv>,
    Ri: PairItem<C = C, K = K, V = Rv>,
    L: PairStream<C = C, K = K, V = Lv, Item = Li>,
    R: PairStream<C = C, K = K, V = Rv, Item = Ri>,
> Stream for GroupSorted<L, R, K, Lv, Rv>
{
    type Item = C::Pair<K, (Lv, Rv)>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        macro_rules! try_next {
            ($s:ident) => {{
                assert!(this.last.is_none());
                match this.$s.as_mut().poll_next(cx) {
                    Poll::Ready(Some(kv)) => match kv.into_kv::<(Lv, Rv)>() {
                        Ok(kv) => Poll::Ready(kv),
                        Err(e) => {
                            return Poll::Ready(Some(e));
                        }
                    },
                    Poll::Ready(None) => {
                        return Poll::Ready(None);
                    }
                    Poll::Pending => Poll::Pending,
                }
            }};
        }
        Poll::Ready(loop {
            if let Some((k, v)) = this.last.take() {
                let (lk, lv, rk, rv) = match v {
                    Either::Left(lv) => {
                        let lk = k;
                        let Poll::Ready((rk, rv)) = try_next!(r) else {
                            *this.last = Some((lk, Either::Left(lv)));
                            return Poll::Pending;
                        };
                        (lk, lv, rk, rv)
                    }
                    Either::Right(rv) => {
                        let rk = k;
                        let Poll::Ready((lk, lv)) = try_next!(l) else {
                            *this.last = Some((rk, Either::Right(rv)));
                            return Poll::Pending;
                        };
                        (lk, lv, rk, rv)
                    }
                };
                match lk.cmp(&rk) {
                    Ordering::Less => *this.last = Some((rk, Either::Right(rv))),
                    Ordering::Equal => {
                        *this.last = Some((lk, Either::Left(lv.clone())));
                        break Some(PairItem::from_kv(rk, (lv, rv)));
                    }
                    Ordering::Greater => *this.last = Some((lk, Either::Left(lv))),
                }
            } else if let Poll::Ready((lk, lv)) = try_next!(l) {
                *this.last = Some((lk, Either::Left(lv)));
            } else if let Poll::Ready((rk, rv)) = try_next!(r) {
                *this.last = Some((rk, Either::Right(rv)));
            } else {
                return Poll::Pending;
            }
        })
    }
}

#[must_use]
pub fn group_sorted<L: PairStream, R: PairStream>(l: L, r: R) -> GroupSorted<L, R>
where
    GroupSorted<L, R>: Stream,
    (L, R): StreamPair,
{
    GroupSorted { l, r, last: None }
}

pub trait GroupSortedExt: Sized + PairStream<K: Ord, V: Clone> {
    #[must_use]
    fn group_sorted<R: PairStream<C = Self::C, K = Self::K>>(
        self,
        right: R,
    ) -> GroupSorted<Self, R> {
        group_sorted(self, right)
    }
}

impl<L: PairStream<K: Ord, V: Clone>> GroupSortedExt for L {}

#[test]
fn simple() {
    use futures_util::StreamExt;
    let l = futures_util::stream::iter([(1, 2), (4, 5)]);
    let r = futures_util::stream::iter([(1, 3), (4, 6)]);
    let s = async_io::block_on(l.group_sorted(r).collect::<Vec<_>>());
    assert_eq!(s, [(1, (2, 3)), (4, (5, 6))]);
}

#[test]
fn l_extra() {
    use futures_util::StreamExt;
    let l = futures_util::stream::iter([(1, 2), (3, 3), (4, 5)]);
    let r = futures_util::stream::iter([(1, 3), (4, 6)]);
    let s = async_io::block_on(l.group_sorted(r).collect::<Vec<_>>());
    assert_eq!(s, [(1, (2, 3)), (4, (5, 6))]);
}

#[test]
fn r_extra() {
    use futures_util::StreamExt;
    let l = futures_util::stream::iter([(1, 2), (4, 5)]);
    let r = futures_util::stream::iter([(1, 3), (3, 3), (4, 6)]);
    let s = async_io::block_on(l.group_sorted(r).collect::<Vec<_>>());
    assert_eq!(s, [(1, (2, 3)), (4, (5, 6))]);
}

#[test]
fn duplicate_key() {
    use futures_util::StreamExt;
    let l = futures_util::stream::iter([(1, 2), (1, 4)]);
    let r = futures_util::stream::iter([(1, 3), (1, 5)]);
    let s = async_io::block_on(l.group_sorted(r).collect::<Vec<_>>());
    assert_eq!(s, [(1, (2, 3)), (1, (2, 5))]);
}