async_std/stream/stream/
count.rs1use core::future::Future;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::Stream;
7use crate::task::{Context, Poll};
8
9pin_project! {
10 #[doc(hidden)]
11 #[allow(missing_debug_implementations)]
12 #[cfg(feature = "unstable")]
13 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
14 pub struct CountFuture<S> {
15 #[pin]
16 stream: S,
17 count: usize,
18 }
19}
20
21impl<S> CountFuture<S> {
22 pub(crate) fn new(stream: S) -> Self {
23 Self { stream, count: 0 }
24 }
25}
26
27impl<S> Future for CountFuture<S>
28where
29 S: Stream,
30{
31 type Output = usize;
32
33 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34 let this = self.project();
35 let next = futures_core::ready!(this.stream.poll_next(cx));
36
37 match next {
38 Some(_) => {
39 cx.waker().wake_by_ref();
40 *this.count += 1;
41 Poll::Pending
42 }
43 None => Poll::Ready(*this.count),
44 }
45 }
46}