async_std/stream/stream/
eq.rs1use core::future::Future;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use super::fuse::Fuse;
7use crate::stream::stream::StreamExt;
8use crate::stream::Stream;
9use crate::task::{Context, Poll};
10
11pin_project! {
12 #[doc(hidden)]
15 #[allow(missing_debug_implementations)]
16 pub struct EqFuture<L: Stream, R: Stream> {
17 #[pin]
18 l: Fuse<L>,
19 #[pin]
20 r: Fuse<R>,
21 }
22}
23
24impl<L: Stream, R: Stream> EqFuture<L, R>
25where
26 L::Item: PartialEq<R::Item>,
27{
28 pub(super) fn new(l: L, r: R) -> Self {
29 Self {
30 l: l.fuse(),
31 r: r.fuse(),
32 }
33 }
34}
35
36impl<L: Stream, R: Stream> Future for EqFuture<L, R>
37where
38 L: Stream + Sized,
39 R: Stream + Sized,
40 L::Item: PartialEq<R::Item>,
41{
42 type Output = bool;
43
44 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 let mut this = self.project();
46
47 loop {
48 let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
49 let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
50
51 if this.l.done && this.r.done {
52 return Poll::Ready(true);
53 }
54
55 match (l_val, r_val) {
56 (Some(l), Some(r)) if l != r => {
57 return Poll::Ready(false);
58 }
59 _ => {}
60 }
61 }
62 }
63}