async_std/stream/stream/
partial_cmp.rs1use core::cmp::Ordering;
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use super::fuse::Fuse;
8use crate::stream::stream::StreamExt;
9use crate::stream::Stream;
10use crate::task::{Context, Poll};
11
12pin_project! {
13 #[doc(hidden)]
16 #[allow(missing_debug_implementations)]
17 pub struct PartialCmpFuture<L: Stream, R: Stream> {
18 #[pin]
19 l: Fuse<L>,
20 #[pin]
21 r: Fuse<R>,
22 l_cache: Option<L::Item>,
23 r_cache: Option<R::Item>,
24 }
25}
26
27impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
28 pub(super) fn new(l: L, r: R) -> Self {
29 Self {
30 l: l.fuse(),
31 r: r.fuse(),
32 l_cache: None,
33 r_cache: None,
34 }
35 }
36}
37
38impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
39where
40 L: Stream + Sized,
41 R: Stream + Sized,
42 L::Item: PartialOrd<R::Item>,
43{
44 type Output = Option<Ordering>;
45
46 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 let mut this = self.project();
48 loop {
49 let l_complete = this.l.done && this.l_cache.is_none();
52 let r_complete = this.r.done && this.r_cache.is_none();
53
54 if l_complete && r_complete {
55 return Poll::Ready(Some(Ordering::Equal));
56 } else if l_complete {
57 return Poll::Ready(Some(Ordering::Less));
58 } else if r_complete {
59 return Poll::Ready(Some(Ordering::Greater));
60 }
61
62 if !this.l.done && this.l_cache.is_none() {
64 let l_next = futures_core::ready!(this.l.as_mut().poll_next(cx));
65 if let Some(item) = l_next {
66 *this.l_cache = Some(item);
67 }
68 }
69
70 if !this.r.done && this.r_cache.is_none() {
71 let r_next = futures_core::ready!(this.r.as_mut().poll_next(cx));
72 if let Some(item) = r_next {
73 *this.r_cache = Some(item);
74 }
75 }
76
77 if this.l_cache.is_some() && this.r_cache.is_some() {
79 let l_value = this.l_cache.as_mut().take().unwrap();
80 let r_value = this.r_cache.as_mut().take().unwrap();
81 let result = l_value.partial_cmp(&r_value);
82
83 if let Some(Ordering::Equal) = result {
84 *this.l_cache = None;
86 *this.r_cache = None;
87 } else {
88 return Poll::Ready(result);
90 }
91 }
92 }
93 }
94}