Skip to main content

async_std/stream/stream/
count.rs

1use core::future::Future;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::Stream;
7use crate::task::{Context, Poll};
8
9pin_project! {
10    #[doc(hidden)]
11    #[allow(missing_debug_implementations)]
12    #[cfg(feature = "unstable")]
13    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
14    pub struct CountFuture<S> {
15        #[pin]
16        stream: S,
17        count: usize,
18    }
19}
20
21impl<S> CountFuture<S> {
22    pub(crate) fn new(stream: S) -> Self {
23        Self { stream, count: 0 }
24    }
25}
26
27impl<S> Future for CountFuture<S>
28where
29    S: Stream,
30{
31    type Output = usize;
32
33    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34        let this = self.project();
35        let next = futures_core::ready!(this.stream.poll_next(cx));
36
37        match next {
38            Some(_) => {
39                cx.waker().wake_by_ref();
40                *this.count += 1;
41                Poll::Pending
42            }
43            None => Poll::Ready(*this.count),
44        }
45    }
46}