ruchei_itertools/
zip_longest.rs1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_util::{Stream, StreamExt, future::Either, ready, stream::Fuse};
7use pin_project::pin_project;
8
9use crate::EitherOrBoth;
10
11#[pin_project]
12pub struct ZipLongest<L, R, Lt = <L as Stream>::Item, Rt = <R as Stream>::Item> {
13 #[pin]
14 l: Fuse<L>,
15 #[pin]
16 r: Fuse<R>,
17 state: Option<Either<Lt, Rt>>,
18}
19
20pub fn zip_longest<L: Stream, R: Stream>(l: L, r: R) -> crate::ZipLongest<L, R> {
21 ZipLongest {
22 l: l.fuse(),
23 r: r.fuse(),
24 state: None,
25 }
26}
27
28impl<L: Stream<Item = Lt>, R: Stream<Item = Rt>, Lt, Rt> Stream for ZipLongest<L, R, Lt, Rt> {
29 type Item = EitherOrBoth<Lt, Rt>;
30
31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 let mut this = self.project();
33 Poll::Ready(loop {
34 if let Some(state) = this.state.as_ref() {
35 match state {
36 Either::Left(_) => {
37 let r = ready!(this.r.as_mut().poll_next(cx));
38 let l = match this.state.take().unwrap() {
39 Either::Left(l) => l,
40 Either::Right(_) => unreachable!(),
41 };
42 break Some(if let Some(r) = r {
43 EitherOrBoth::Both(l, r)
44 } else {
45 EitherOrBoth::Left(l)
46 });
47 }
48 Either::Right(_) => {
49 let l = ready!(this.l.as_mut().poll_next(cx));
50 let r = match this.state.take().unwrap() {
51 Either::Left(_) => unreachable!(),
52 Either::Right(r) => r,
53 };
54 break Some(if let Some(l) = l {
55 EitherOrBoth::Both(l, r)
56 } else {
57 EitherOrBoth::Right(r)
58 });
59 }
60 }
61 } else if let Poll::Ready(Some(l)) = this.l.as_mut().poll_next(cx) {
62 *this.state = Some(Either::Left(l));
63 } else if let Poll::Ready(Some(r)) = this.r.as_mut().poll_next(cx) {
64 *this.state = Some(Either::Right(r));
65 } else if this.l.is_done() && this.r.is_done() {
66 break None;
67 } else {
68 return Poll::Pending;
69 }
70 })
71 }
72}
73
74#[cfg(test)]
75mod test {
76 use crate::{AsyncItertools, EitherOrBoth};
77
78 #[test]
79 fn left_longer() {
80 let l = futures_util::stream::iter([1, 2, 3]);
81 let r = futures_util::stream::iter([1, 2]);
82 assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
83 EitherOrBoth::Both(1, 1),
84 EitherOrBoth::Both(2, 2),
85 EitherOrBoth::Left(3),
86 ]));
87 }
88
89 #[test]
90 fn equal_length() {
91 let l = futures_util::stream::iter([1, 2, 3]);
92 let r = futures_util::stream::iter([1, 2, 3]);
93 assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
94 EitherOrBoth::Both(1, 1),
95 EitherOrBoth::Both(2, 2),
96 EitherOrBoth::Both(3, 3),
97 ]));
98 }
99
100 #[test]
101 fn right_longer() {
102 let l = futures_util::stream::iter([1, 2]);
103 let r = futures_util::stream::iter([1, 2, 3]);
104 assert!(futures_executor::block_on_stream(l.zip_longest(r)).eq([
105 EitherOrBoth::Both(1, 1),
106 EitherOrBoth::Both(2, 2),
107 EitherOrBoth::Right(3),
108 ]));
109 }
110}