futures_util/stream/try_stream/
try_for_each_concurrent.rs1use crate::stream::{FuturesUnordered, StreamExt};
2use core::fmt;
3use core::num::NonZeroUsize;
4use core::pin::Pin;
5use futures_core::future::{FusedFuture, Future};
6use futures_core::stream::TryStream;
7use futures_core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[must_use = "futures do nothing unless you `.await` or poll them"]
15 pub struct TryForEachConcurrent<St, Fut, F> {
16 #[pin]
17 stream: Option<St>,
18 f: F,
19 futures: FuturesUnordered<Fut>,
20 limit: Option<NonZeroUsize>,
21 }
22}
23
24impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
25where
26 St: fmt::Debug,
27 Fut: fmt::Debug,
28{
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.debug_struct("TryForEachConcurrent")
31 .field("stream", &self.stream)
32 .field("futures", &self.futures)
33 .field("limit", &self.limit)
34 .finish()
35 }
36}
37
38impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F>
39where
40 St: TryStream,
41 F: FnMut(St::Ok) -> Fut,
42 Fut: Future<Output = Result<(), St::Error>>,
43{
44 fn is_terminated(&self) -> bool {
45 self.stream.is_none() && self.futures.is_empty()
46 }
47}
48
49impl<St, Fut, F> TryForEachConcurrent<St, Fut, F>
50where
51 St: TryStream,
52 F: FnMut(St::Ok) -> Fut,
53 Fut: Future<Output = Result<(), St::Error>>,
54{
55 pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
56 Self {
57 stream: Some(stream),
58 limit: limit.and_then(NonZeroUsize::new),
60 f,
61 futures: FuturesUnordered::new(),
62 }
63 }
64}
65
66impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
67where
68 St: TryStream,
69 F: FnMut(St::Ok) -> Fut,
70 Fut: Future<Output = Result<(), St::Error>>,
71{
72 type Output = Result<(), St::Error>;
73
74 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 let mut this = self.project();
76 loop {
77 let mut made_progress_this_iter = false;
78
79 if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
81 let poll_res = match this.stream.as_mut().as_pin_mut() {
82 Some(stream) => stream.try_poll_next(cx),
83 None => Poll::Ready(None),
84 };
85
86 let elem = match poll_res {
87 Poll::Ready(Some(Ok(elem))) => {
88 made_progress_this_iter = true;
89 Some(elem)
90 }
91 Poll::Ready(None) => {
92 this.stream.set(None);
93 None
94 }
95 Poll::Pending => None,
96 Poll::Ready(Some(Err(e))) => {
97 this.stream.set(None);
100 drop(core::mem::take(this.futures));
101 return Poll::Ready(Err(e));
102 }
103 };
104
105 if let Some(elem) = elem {
106 this.futures.push((this.f)(elem));
107 }
108 }
109
110 match this.futures.poll_next_unpin(cx) {
111 Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
112 Poll::Ready(None) => {
113 if this.stream.is_none() {
114 return Poll::Ready(Ok(()));
115 }
116 }
117 Poll::Pending => {}
118 Poll::Ready(Some(Err(e))) => {
119 this.stream.set(None);
122 drop(core::mem::take(this.futures));
123 return Poll::Ready(Err(e));
124 }
125 }
126
127 if !made_progress_this_iter {
128 return Poll::Pending;
129 }
130 }
131 }
132}