Skip to main content

futures_util/stream/try_stream/
try_for_each_concurrent.rs

1use crate::stream::{FuturesUnordered, StreamExt};
2use core::fmt;
3use core::num::NonZeroUsize;
4use core::pin::Pin;
5use futures_core::future::{FusedFuture, Future};
6use futures_core::stream::TryStream;
7use futures_core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Future for the
12    /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
13    /// method.
14    #[must_use = "futures do nothing unless you `.await` or poll them"]
15    pub struct TryForEachConcurrent<St, Fut, F> {
16        #[pin]
17        stream: Option<St>,
18        f: F,
19        futures: FuturesUnordered<Fut>,
20        limit: Option<NonZeroUsize>,
21    }
22}
23
24impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
25where
26    St: fmt::Debug,
27    Fut: fmt::Debug,
28{
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        f.debug_struct("TryForEachConcurrent")
31            .field("stream", &self.stream)
32            .field("futures", &self.futures)
33            .field("limit", &self.limit)
34            .finish()
35    }
36}
37
38impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F>
39where
40    St: TryStream,
41    F: FnMut(St::Ok) -> Fut,
42    Fut: Future<Output = Result<(), St::Error>>,
43{
44    fn is_terminated(&self) -> bool {
45        self.stream.is_none() && self.futures.is_empty()
46    }
47}
48
49impl<St, Fut, F> TryForEachConcurrent<St, Fut, F>
50where
51    St: TryStream,
52    F: FnMut(St::Ok) -> Fut,
53    Fut: Future<Output = Result<(), St::Error>>,
54{
55    pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
56        Self {
57            stream: Some(stream),
58            // Note: `limit` = 0 gets ignored.
59            limit: limit.and_then(NonZeroUsize::new),
60            f,
61            futures: FuturesUnordered::new(),
62        }
63    }
64}
65
66impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
67where
68    St: TryStream,
69    F: FnMut(St::Ok) -> Fut,
70    Fut: Future<Output = Result<(), St::Error>>,
71{
72    type Output = Result<(), St::Error>;
73
74    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75        let mut this = self.project();
76        loop {
77            let mut made_progress_this_iter = false;
78
79            // Check if we've already created a number of futures greater than `limit`
80            if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
81                let poll_res = match this.stream.as_mut().as_pin_mut() {
82                    Some(stream) => stream.try_poll_next(cx),
83                    None => Poll::Ready(None),
84                };
85
86                let elem = match poll_res {
87                    Poll::Ready(Some(Ok(elem))) => {
88                        made_progress_this_iter = true;
89                        Some(elem)
90                    }
91                    Poll::Ready(None) => {
92                        this.stream.set(None);
93                        None
94                    }
95                    Poll::Pending => None,
96                    Poll::Ready(Some(Err(e))) => {
97                        // Empty the stream and futures so that we know
98                        // the future has completed.
99                        this.stream.set(None);
100                        drop(core::mem::take(this.futures));
101                        return Poll::Ready(Err(e));
102                    }
103                };
104
105                if let Some(elem) = elem {
106                    this.futures.push((this.f)(elem));
107                }
108            }
109
110            match this.futures.poll_next_unpin(cx) {
111                Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
112                Poll::Ready(None) => {
113                    if this.stream.is_none() {
114                        return Poll::Ready(Ok(()));
115                    }
116                }
117                Poll::Pending => {}
118                Poll::Ready(Some(Err(e))) => {
119                    // Empty the stream and futures so that we know
120                    // the future has completed.
121                    this.stream.set(None);
122                    drop(core::mem::take(this.futures));
123                    return Poll::Ready(Err(e));
124                }
125            }
126
127            if !made_progress_this_iter {
128                return Poll::Pending;
129            }
130        }
131    }
132}