futures_buffered/buffered/
unordered.rs1use core::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6use futures_core::Stream;
7use pin_project_lite::pin_project;
8
9use crate::FuturesUnorderedBounded;
10
11pin_project!(
12 #[must_use = "streams do nothing unless polled"]
70 pub struct BufferUnordered<S: Stream> {
71 #[pin]
72 pub(crate) stream: Option<S>,
73 pub(crate) in_progress_queue: FuturesUnorderedBounded<S::Item>,
74 }
75);
76
77impl<St> Stream for BufferUnordered<St>
78where
79 St: Stream,
80 St::Item: Future,
81{
82 type Item = <St::Item as Future>::Output;
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 let mut this = self.project();
86
87 let unordered = this.in_progress_queue;
90 while unordered.tasks.len() < unordered.tasks.capacity() {
91 if let Some(s) = this.stream.as_mut().as_pin_mut() {
92 match s.poll_next(cx) {
93 Poll::Ready(Some(fut)) => {
94 unordered.push(fut);
95 continue;
96 }
97 Poll::Ready(None) => this.stream.as_mut().set(None),
98 Poll::Pending => {}
99 }
100 }
101 break;
102 }
103
104 match Pin::new(unordered).poll_next(cx) {
106 x @ (Poll::Pending | Poll::Ready(Some(_))) => return x,
107 Poll::Ready(None) => {}
108 }
109
110 if this.stream.as_pin_mut().is_none() {
112 Poll::Ready(None)
113 } else {
114 Poll::Pending
115 }
116 }
117
118 fn size_hint(&self) -> (usize, Option<usize>) {
119 let queue_len = self.in_progress_queue.len();
120 let (lower, upper) = self
121 .stream
122 .as_ref()
123 .map(|s| s.size_hint())
124 .unwrap_or((0, Some(0)));
125 let lower = lower.saturating_add(queue_len);
126 let upper = match upper {
127 Some(x) => x.checked_add(queue_len),
128 None => None,
129 };
130 (lower, upper)
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use crate::BufferedStreamExt;
137
138 use super::*;
139 use futures::{channel::oneshot, stream, StreamExt};
140 use futures_test::task::noop_context;
141 use rand::{rng, Rng};
142 use tokio::task::JoinSet;
143
144 #[test]
145 fn buffered_unordered() {
146 let (send_one, recv_one) = oneshot::channel();
147 let (send_two, recv_two) = oneshot::channel();
148
149 let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
150 let mut buffered = stream_of_futures.buffered_unordered(10);
151 let mut cx = noop_context();
152
153 assert_eq!(buffered.size_hint(), (2, Some(2)));
155
156 assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
158
159 send_two.send(2i32).unwrap();
161 assert_eq!(
162 buffered.poll_next_unpin(&mut cx),
163 Poll::Ready(Some(Ok(2i32)))
164 );
165
166 send_one.send(1i32).unwrap();
167 assert_eq!(
168 buffered.poll_next_unpin(&mut cx),
169 Poll::Ready(Some(Ok(1i32)))
170 );
171
172 assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Ready(None));
174 }
175
176 #[cfg(not(miri))]
177 #[tokio::test(start_paused = true)]
179 async fn high_concurrency() {
180 let now = tokio::time::Instant::now();
181 let dur = std::time::Duration::from_millis(10);
182 let n = 1024 * 16;
183 let c = 32;
184
185 let estimated = dur.as_secs_f64() * 10.5 * (n as f64) / (c as f64) * 4.0;
186 dbg!(estimated);
187
188 let mut js = JoinSet::new();
189
190 for _ in 0..32 {
191 js.spawn(async move {
192 let x = futures::stream::repeat_with(|| {
193 let n = rng().random_range(1..=20);
194 let fut = async move {
195 for _ in 0..4 {
196 tokio::time::sleep(n * dur).await;
197 }
198 };
199 tokio::time::timeout(dur * (5 * n), fut)
200 });
201 let x = x.take(n as usize).buffered_unordered(c as usize);
202 x.for_each(|res| async { res.unwrap() }).await;
203 });
204 }
205
206 while js.join_next().await.is_some() {}
207
208 let elapsed = now.elapsed().as_secs_f64();
209 dbg!(elapsed);
210 }
211}