async_std/stream/stream/
min.rs

1use core::cmp::{Ord, Ordering};
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11    #[doc(hidden)]
12    #[allow(missing_debug_implementations)]
13    pub struct MinFuture<S, T> {
14        #[pin]
15        stream: S,
16        min: Option<T>,
17    }
18}
19
20impl<S, T> MinFuture<S, T> {
21    pub(super) fn new(stream: S) -> Self {
22        Self { stream, min: None }
23    }
24}
25
26impl<S> Future for MinFuture<S, S::Item>
27where
28    S: Stream,
29    S::Item: Ord,
30{
31    type Output = Option<S::Item>;
32
33    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34        let this = self.project();
35        let next = futures_core::ready!(this.stream.poll_next(cx));
36
37        match next {
38            Some(new) => {
39                cx.waker().wake_by_ref();
40                match this.min.take() {
41                    None => *this.min = Some(new),
42
43                    Some(old) => match new.cmp(&old) {
44                        Ordering::Less => *this.min = Some(new),
45                        _ => *this.min = Some(old),
46                    },
47                }
48                Poll::Pending
49            }
50            None => Poll::Ready(this.min.take()),
51        }
52    }
53}