ruchei-itertools 0.0.0-a.3

async itertools, subproject of ruchei
Documentation
use core::{
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{Stream, StreamExt, future::Either, ready, stream::Fuse};
use pin_project::pin_project;

use crate::EitherOrBoth;

#[pin_project]
pub struct ZipLongest<L, R, Lt = <L as Stream>::Item, Rt = <R as Stream>::Item> {
    #[pin]
    l: Fuse<L>,
    #[pin]
    r: Fuse<R>,
    state: Option<Either<Lt, Rt>>,
}

pub fn zip_longest<L: Stream, R: Stream>(l: L, r: R) -> crate::ZipLongest<L, R> {
    ZipLongest {
        l: l.fuse(),
        r: r.fuse(),
        state: None,
    }
}

impl<L: Stream<Item = Lt>, R: Stream<Item = Rt>, Lt, Rt> Stream for ZipLongest<L, R, Lt, Rt> {
    type Item = EitherOrBoth<Lt, Rt>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        Poll::Ready(loop {
            if let Some(state) = this.state.as_ref() {
                match state {
                    Either::Left(_) => {
                        let r = ready!(this.r.as_mut().poll_next(cx));
                        let l = match this.state.take().unwrap() {
                            Either::Left(l) => l,
                            Either::Right(_) => unreachable!(),
                        };
                        break Some(if let Some(r) = r {
                            EitherOrBoth::Both(l, r)
                        } else {
                            EitherOrBoth::Left(l)
                        });
                    }
                    Either::Right(_) => {
                        let l = ready!(this.l.as_mut().poll_next(cx));
                        let r = match this.state.take().unwrap() {
                            Either::Left(_) => unreachable!(),
                            Either::Right(r) => r,
                        };
                        break Some(if let Some(l) = l {
                            EitherOrBoth::Both(l, r)
                        } else {
                            EitherOrBoth::Right(r)
                        });
                    }
                }
            } else if let Poll::Ready(Some(l)) = this.l.as_mut().poll_next(cx) {
                *this.state = Some(Either::Left(l));
            } else if let Poll::Ready(Some(r)) = this.r.as_mut().poll_next(cx) {
                *this.state = Some(Either::Right(r));
            } else if this.l.is_done() && this.r.is_done() {
                break None;
            } else {
                return Poll::Pending;
            }
        })
    }
}

#[cfg(test)]
mod test {
    use crate::{AsyncItertools, EitherOrBoth};

    #[test]
    fn left_longer() {
        let l = futures_util::stream::iter([1, 2, 3]);
        let r = futures_util::stream::iter([1, 2]);
        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
            EitherOrBoth::Both(1, 1),
            EitherOrBoth::Both(2, 2),
            EitherOrBoth::Left(3),
        ]));
    }

    #[test]
    fn equal_length() {
        let l = futures_util::stream::iter([1, 2, 3]);
        let r = futures_util::stream::iter([1, 2, 3]);
        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
            EitherOrBoth::Both(1, 1),
            EitherOrBoth::Both(2, 2),
            EitherOrBoth::Both(3, 3),
        ]));
    }

    #[test]
    fn right_longer() {
        let l = futures_util::stream::iter([1, 2]);
        let r = futures_util::stream::iter([1, 2, 3]);
        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
            EitherOrBoth::Both(1, 1),
            EitherOrBoth::Both(2, 2),
            EitherOrBoth::Right(3),
        ]));
    }
}