futures_util/stream/
for_each_concurrent.rs1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3
4use super::futures_unordered::FuturesUnordered;
5
6#[derive(Debug)]
11#[must_use = "streams do nothing unless polled"]
12pub struct ForEachConcurrent<S, U, F> where U: IntoFuture {
13 stream: Option<S>,
14 stream_done: bool,
15 f: F,
16 futures: FuturesUnordered<U::Future>,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> ForEachConcurrent<S, U, F>
20 where S: Stream,
21 F: FnMut(S::Item) -> U,
22 U: IntoFuture<Item = (), Error = S::Error>,
23{
24 ForEachConcurrent {
25 stream: Some(s),
26 stream_done: false,
27 f: f,
28 futures: FuturesUnordered::new(),
29 }
30}
31
32impl<S, U, F> Future for ForEachConcurrent<S, U, F>
33 where S: Stream,
34 F: FnMut(S::Item) -> U,
35 U: IntoFuture<Item= (), Error = S::Error>,
36{
37 type Item = S;
38 type Error = S::Error;
39
40 fn poll(&mut self, cx: &mut task::Context) -> Poll<S, S::Error> {
41 loop {
42 let mut made_progress_this_iter = false;
43
44 if !self.stream_done {
46 match self.stream.as_mut().unwrap().poll_next(cx)? {
48 Async::Ready(Some(x)) => {
49 self.futures.push((self.f)(x).into_future());
50 made_progress_this_iter = true;
51 }
52 Async::Ready(None) => self.stream_done = true,
55 Async::Pending => {},
56 }
57 }
58
59 match self.futures.poll_next(cx)? {
60 Async::Ready(Some(())) => made_progress_this_iter = true,
61 Async::Ready(None) if self.stream_done => {
62 return Ok(Async::Ready(self.stream.take().expect(
65 "polled for_each_concurrent after completion"
66 )));
67 }
68 Async::Ready(None)
69 | Async::Pending => {}
70 }
71
72 if !made_progress_this_iter {
73 return Ok(Async::Pending);
74 }
75 }
76 }
77}