Skip to main content

async_std/stream/stream/
partial_cmp.rs

1use core::cmp::Ordering;
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use super::fuse::Fuse;
8use crate::stream::stream::StreamExt;
9use crate::stream::Stream;
10use crate::task::{Context, Poll};
11
12pin_project! {
13    // Lexicographically compares the elements of this `Stream` with those
14    // of another.
15    #[doc(hidden)]
16    #[allow(missing_debug_implementations)]
17    pub struct PartialCmpFuture<L: Stream, R: Stream> {
18        #[pin]
19        l: Fuse<L>,
20        #[pin]
21        r: Fuse<R>,
22        l_cache: Option<L::Item>,
23        r_cache: Option<R::Item>,
24    }
25}
26
27impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
28    pub(super) fn new(l: L, r: R) -> Self {
29        Self {
30            l: l.fuse(),
31            r: r.fuse(),
32            l_cache: None,
33            r_cache: None,
34        }
35    }
36}
37
38impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
39where
40    L: Stream + Sized,
41    R: Stream + Sized,
42    L::Item: PartialOrd<R::Item>,
43{
44    type Output = Option<Ordering>;
45
46    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        let mut this = self.project();
48        loop {
49            // Short circuit logic
50            // Stream that completes earliest can be considered Less, etc
51            let l_complete = this.l.done && this.l_cache.is_none();
52            let r_complete = this.r.done && this.r_cache.is_none();
53
54            if l_complete && r_complete {
55                return Poll::Ready(Some(Ordering::Equal));
56            } else if l_complete {
57                return Poll::Ready(Some(Ordering::Less));
58            } else if r_complete {
59                return Poll::Ready(Some(Ordering::Greater));
60            }
61
62            // Get next value if possible and necessary
63            if !this.l.done && this.l_cache.is_none() {
64                let l_next = futures_core::ready!(this.l.as_mut().poll_next(cx));
65                if let Some(item) = l_next {
66                    *this.l_cache = Some(item);
67                }
68            }
69
70            if !this.r.done && this.r_cache.is_none() {
71                let r_next = futures_core::ready!(this.r.as_mut().poll_next(cx));
72                if let Some(item) = r_next {
73                    *this.r_cache = Some(item);
74                }
75            }
76
77            // Compare if both values are available.
78            if this.l_cache.is_some() && this.r_cache.is_some() {
79                let l_value = this.l_cache.as_mut().take().unwrap();
80                let r_value = this.r_cache.as_mut().take().unwrap();
81                let result = l_value.partial_cmp(&r_value);
82
83                if let Some(Ordering::Equal) = result {
84                    // Reset cache to prepare for next comparison
85                    *this.l_cache = None;
86                    *this.r_cache = None;
87                } else {
88                    // Return non equal value
89                    return Poll::Ready(result);
90                }
91            }
92        }
93    }
94}