async_std/stream/stream/
min.rs1use core::cmp::{Ord, Ordering};
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11 #[doc(hidden)]
12 #[allow(missing_debug_implementations)]
13 pub struct MinFuture<S, T> {
14 #[pin]
15 stream: S,
16 min: Option<T>,
17 }
18}
19
20impl<S, T> MinFuture<S, T> {
21 pub(super) fn new(stream: S) -> Self {
22 Self { stream, min: None }
23 }
24}
25
26impl<S> Future for MinFuture<S, S::Item>
27where
28 S: Stream,
29 S::Item: Ord,
30{
31 type Output = Option<S::Item>;
32
33 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34 let this = self.project();
35 let next = futures_core::ready!(this.stream.poll_next(cx));
36
37 match next {
38 Some(new) => {
39 cx.waker().wake_by_ref();
40 match this.min.take() {
41 None => *this.min = Some(new),
42
43 Some(old) => match new.cmp(&old) {
44 Ordering::Less => *this.min = Some(new),
45 _ => *this.min = Some(old),
46 },
47 }
48 Poll::Pending
49 }
50 None => Poll::Ready(this.min.take()),
51 }
52 }
53}