async_std/stream/stream/
eq.rs

1use core::future::Future;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use super::fuse::Fuse;
7use crate::stream::stream::StreamExt;
8use crate::stream::Stream;
9use crate::task::{Context, Poll};
10
11pin_project! {
12    // Lexicographically compares the elements of this `Stream` with those
13    // of another.
14    #[doc(hidden)]
15    #[allow(missing_debug_implementations)]
16    pub struct EqFuture<L: Stream, R: Stream> {
17        #[pin]
18        l: Fuse<L>,
19        #[pin]
20        r: Fuse<R>,
21    }
22}
23
24impl<L: Stream, R: Stream> EqFuture<L, R>
25where
26    L::Item: PartialEq<R::Item>,
27{
28    pub(super) fn new(l: L, r: R) -> Self {
29        Self {
30            l: l.fuse(),
31            r: r.fuse(),
32        }
33    }
34}
35
36impl<L: Stream, R: Stream> Future for EqFuture<L, R>
37where
38    L: Stream + Sized,
39    R: Stream + Sized,
40    L::Item: PartialEq<R::Item>,
41{
42    type Output = bool;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let mut this = self.project();
46
47        loop {
48            let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
49            let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
50
51            if this.l.done && this.r.done {
52                return Poll::Ready(true);
53            }
54
55            match (l_val, r_val) {
56                (Some(l), Some(r)) if l != r => {
57                    return Poll::Ready(false);
58                }
59                _ => {}
60            }
61        }
62    }
63}