ruchei_itertools/
zip_longest.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_util::{Stream, StreamExt, future::Either, ready, stream::Fuse};
7use pin_project::pin_project;
8
9use crate::EitherOrBoth;
10
11#[pin_project]
12pub struct ZipLongest<L, R, Lt = <L as Stream>::Item, Rt = <R as Stream>::Item> {
13    #[pin]
14    l: Fuse<L>,
15    #[pin]
16    r: Fuse<R>,
17    state: Option<Either<Lt, Rt>>,
18}
19
20pub fn zip_longest<L: Stream, R: Stream>(l: L, r: R) -> crate::ZipLongest<L, R> {
21    ZipLongest {
22        l: l.fuse(),
23        r: r.fuse(),
24        state: None,
25    }
26}
27
28impl<L: Stream<Item = Lt>, R: Stream<Item = Rt>, Lt, Rt> Stream for ZipLongest<L, R, Lt, Rt> {
29    type Item = EitherOrBoth<Lt, Rt>;
30
31    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        let mut this = self.project();
33        Poll::Ready(loop {
34            if let Some(state) = this.state.as_ref() {
35                match state {
36                    Either::Left(_) => {
37                        let r = ready!(this.r.as_mut().poll_next(cx));
38                        let l = match this.state.take().unwrap() {
39                            Either::Left(l) => l,
40                            Either::Right(_) => unreachable!(),
41                        };
42                        break Some(if let Some(r) = r {
43                            EitherOrBoth::Both(l, r)
44                        } else {
45                            EitherOrBoth::Left(l)
46                        });
47                    }
48                    Either::Right(_) => {
49                        let l = ready!(this.l.as_mut().poll_next(cx));
50                        let r = match this.state.take().unwrap() {
51                            Either::Left(_) => unreachable!(),
52                            Either::Right(r) => r,
53                        };
54                        break Some(if let Some(l) = l {
55                            EitherOrBoth::Both(l, r)
56                        } else {
57                            EitherOrBoth::Right(r)
58                        });
59                    }
60                }
61            } else if let Poll::Ready(Some(l)) = this.l.as_mut().poll_next(cx) {
62                *this.state = Some(Either::Left(l));
63            } else if let Poll::Ready(Some(r)) = this.r.as_mut().poll_next(cx) {
64                *this.state = Some(Either::Right(r));
65            } else if this.l.is_done() && this.r.is_done() {
66                break None;
67            } else {
68                return Poll::Pending;
69            }
70        })
71    }
72}
73
74#[cfg(test)]
75mod test {
76    use crate::{AsyncItertools, EitherOrBoth};
77
78    #[test]
79    fn left_longer() {
80        let l = futures_util::stream::iter([1, 2, 3]);
81        let r = futures_util::stream::iter([1, 2]);
82        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
83            EitherOrBoth::Both(1, 1),
84            EitherOrBoth::Both(2, 2),
85            EitherOrBoth::Left(3),
86        ]));
87    }
88
89    #[test]
90    fn equal_length() {
91        let l = futures_util::stream::iter([1, 2, 3]);
92        let r = futures_util::stream::iter([1, 2, 3]);
93        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
94            EitherOrBoth::Both(1, 1),
95            EitherOrBoth::Both(2, 2),
96            EitherOrBoth::Both(3, 3),
97        ]));
98    }
99
100    #[test]
101    fn right_longer() {
102        let l = futures_util::stream::iter([1, 2]);
103        let r = futures_util::stream::iter([1, 2, 3]);
104        assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
105            EitherOrBoth::Both(1, 1),
106            EitherOrBoth::Both(2, 2),
107            EitherOrBoth::Right(3),
108        ]));
109    }
110}