Skip to main content

async_std/stream/stream/
try_for_each.rs

1use core::future::Future;
2use core::pin::Pin;
3
4use crate::stream::Stream;
5use crate::task::{Context, Poll};
6
7#[doc(hidden)]
8#[allow(missing_debug_implementations)]
9pub struct TryForEachFuture<'a, S, F> {
10    stream: &'a mut S,
11    f: F,
12}
13
14impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {}
15
16impl<'a, S, F> TryForEachFuture<'a, S, F> {
17    pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
18        Self { stream, f }
19    }
20}
21
22impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
23where
24    S: Stream + Unpin,
25    F: FnMut(S::Item) -> Result<(), E>,
26{
27    type Output = Result<(), E>;
28
29    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30        loop {
31            let item = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
32
33            match item {
34                None => return Poll::Ready(Ok(())),
35                Some(v) => {
36                    let res = (&mut self.f)(v);
37                    if let Err(e) = res {
38                        return Poll::Ready(Err(e));
39                    }
40                }
41            }
42        }
43    }
44}