futures_util/stream/
for_each_concurrent.rs

1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3
4use super::futures_unordered::FuturesUnordered;
5
6/// A stream combinator which executes a unit closure over each item on a
7/// stream concurrently.
8///
9/// This structure is returned by the `Stream::for_each` method.
10#[derive(Debug)]
11#[must_use = "streams do nothing unless polled"]
12pub struct ForEachConcurrent<S, U, F> where U: IntoFuture {
13    stream: Option<S>,
14    stream_done: bool,
15    f: F,
16    futures: FuturesUnordered<U::Future>,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> ForEachConcurrent<S, U, F>
20    where S: Stream,
21          F: FnMut(S::Item) -> U,
22          U: IntoFuture<Item = (), Error = S::Error>,
23{
24    ForEachConcurrent {
25        stream: Some(s),
26        stream_done: false,
27        f: f,
28        futures: FuturesUnordered::new(),
29    }
30}
31
32impl<S, U, F> Future for ForEachConcurrent<S, U, F>
33    where S: Stream,
34          F: FnMut(S::Item) -> U,
35          U: IntoFuture<Item= (), Error = S::Error>,
36{
37    type Item = S;
38    type Error = S::Error;
39
40    fn poll(&mut self, cx: &mut task::Context) -> Poll<S, S::Error> {
41        loop {
42            let mut made_progress_this_iter = false;
43
44            // Try and pull an item off of the stream
45            if !self.stream_done {
46                // `unwrap` is valid because the stream is only taken after `stream_done` is set
47                match self.stream.as_mut().unwrap().poll_next(cx)? {
48                    Async::Ready(Some(x)) => {
49                        self.futures.push((self.f)(x).into_future());
50                        made_progress_this_iter = true;
51                    }
52                    // The stream completed, so it shouldn't be polled
53                    // anymore.
54                    Async::Ready(None) => self.stream_done = true,
55                    Async::Pending => {},
56                }
57            }
58
59            match self.futures.poll_next(cx)? {
60                Async::Ready(Some(())) => made_progress_this_iter = true,
61                Async::Ready(None) if self.stream_done => {
62                    // We've processed all of self.futures and self.stream,
63                    // so return self.stream
64                    return Ok(Async::Ready(self.stream.take().expect(
65                        "polled for_each_concurrent after completion"
66                    )));
67                }
68                Async::Ready(None)
69                | Async::Pending => {}
70            }
71
72            if !made_progress_this_iter {
73                return Ok(Async::Pending);
74            }
75        }
76    }
77}