futures_buffered/futures_unordered.rs
1use alloc::vec::Vec;
2use core::{
3 fmt,
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use crate::FuturesUnorderedBounded;
10use futures_core::{FusedStream, Stream};
11
12/// A set of futures which may complete in any order.
13///
14/// Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
15/// this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
16///
17/// The is different to [`FuturesUnorderedBounded`] because it doesn't have a fixed capacity.
18/// It still manages to achieve good efficiency however
19///
20/// ## Benchmarks
21///
22/// All benchmarks are run with `FuturesUnordered::new()`, no predefined capacity.
23///
24/// ### Speed
25///
26/// Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
27///
28/// ```text
29/// futures::FuturesUnordered time: [412.52 ms 414.47 ms 416.41 ms]
30/// crate::FuturesUnordered time: [412.96 ms 414.69 ms 416.65 ms]
31/// FuturesUnorderedBounded time: [361.81 ms 362.96 ms 364.13 ms]
32/// ```
33///
34/// ### Memory usage
35///
36/// Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
37///
38/// - count: the number of times alloc/dealloc was called
39/// - alloc: the number of cumulative bytes allocated
40/// - dealloc: the number of cumulative bytes deallocated
41///
42/// ```text
43/// futures::FuturesUnordered
44/// count: 1024002
45/// alloc: 40960144 B
46/// dealloc: 40960000 B
47///
48/// crate::FuturesUnordered
49/// count: 9
50/// alloc: 15840 B
51/// dealloc: 0 B
52/// ```
53///
54/// ### Conclusion
55///
56/// As you can see, our `FuturesUnordered` massively reduces you memory overhead while maintaining good performance.
57///
58/// # Example
59///
60/// Making 1024 total HTTP requests, with a max concurrency of 128
61///
62/// ```
63/// use futures::future::Future;
64/// use futures::stream::StreamExt;
65/// use futures_buffered::FuturesUnordered;
66/// use hyper::client::conn::http1::{handshake, SendRequest};
67/// use hyper::body::Incoming;
68/// use hyper::{Request, Response};
69/// use hyper_util::rt::TokioIo;
70/// use tokio::net::TcpStream;
71///
72/// # #[cfg(miri)] fn main() {}
73/// # #[cfg(not(miri))] #[tokio::main]
74/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
75/// // create a tcp connection
76/// let stream = TcpStream::connect("example.com:80").await?;
77///
78/// // perform the http handshakes
79/// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
80/// tokio::spawn(conn);
81///
82/// /// make http request to example.com and read the response
83/// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
84/// let req = Request::builder()
85/// .header("Host", "example.com")
86/// .method("GET")
87/// .body(String::new())
88/// .unwrap();
89/// rs.send_request(req)
90/// }
91///
92/// // create a queue that can hold 128 concurrent requests
93/// let mut queue = FuturesUnordered::with_capacity(128);
94///
95/// // start up 128 requests
96/// for _ in 0..128 {
97/// queue.push(make_req(&mut rs));
98/// }
99/// // wait for a request to finish and start another to fill its place - up to 1024 total requests
100/// for _ in 128..1024 {
101/// queue.next().await;
102/// queue.push(make_req(&mut rs));
103/// }
104/// // wait for the tail end to finish
105/// for _ in 0..128 {
106/// queue.next().await;
107/// }
108/// # Ok(()) }
109/// ```
110pub struct FuturesUnordered<F> {
111 rem: usize,
112 pub(crate) groups: Vec<FuturesUnorderedBounded<F>>,
113 poll_next: usize,
114}
115
116pub(crate) const MIN_CAPACITY: usize = 32;
117
118impl<F> Unpin for FuturesUnordered<F> {}
119
120impl<F> Default for FuturesUnordered<F> {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126impl<F> FuturesUnordered<F> {
127 /// Constructs a new, empty [`FuturesUnordered`].
128 ///
129 /// The returned [`FuturesUnordered`] does not contain any futures.
130 /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
131 /// return [`Poll::Ready(None)`](Poll::Ready).
132 pub const fn new() -> Self {
133 Self {
134 rem: 0,
135 groups: Vec::new(),
136 poll_next: 0,
137 }
138 }
139
140 /// Constructs a new, empty [`FuturesUnordered`] with the given fixed capacity.
141 ///
142 /// The returned [`FuturesUnordered`] does not contain any futures.
143 /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
144 /// return [`Poll::Ready(None)`](Poll::Ready).
145 pub fn with_capacity(n: usize) -> Self {
146 if n > 0 {
147 Self {
148 rem: 0,
149 groups: Vec::from_iter([FuturesUnorderedBounded::new(n)]),
150 poll_next: 0,
151 }
152 } else {
153 Self::new()
154 }
155 }
156
157 /// Push a future into the set.
158 ///
159 /// This method adds the given future to the set. This method will not
160 /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
161 /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called
162 /// in order to receive wake-up notifications for the given future.
163 pub fn push(&mut self, fut: F) {
164 self.rem += 1;
165
166 let last = match self.groups.last_mut() {
167 Some(last) => last,
168 None => {
169 self.groups.push(FuturesUnorderedBounded::new(MIN_CAPACITY));
170 self.groups
171 .last_mut()
172 .expect("group should have at least one entry")
173 }
174 };
175 match last.try_push(fut) {
176 Ok(()) => {}
177 Err(future) => {
178 let mut next = FuturesUnorderedBounded::new(last.capacity() * 2);
179 next.push(future);
180 self.groups.push(next);
181 }
182 }
183 }
184
185 /// Returns `true` if the set contains no futures.
186 pub fn is_empty(&self) -> bool {
187 self.rem == 0
188 }
189
190 /// Returns the number of futures contained in the set.
191 ///
192 /// This represents the total number of in-flight futures.
193 pub fn len(&self) -> usize {
194 self.rem
195 }
196
197 /// Returns the number of futures that can be contained in the set.
198 pub fn capacity(&self) -> usize {
199 match self.groups.as_slice() {
200 [] => 0,
201 [only] => only.capacity(),
202 [.., last] => {
203 let spare_cap = last.capacity() - last.len();
204 self.rem + spare_cap
205 }
206 }
207 }
208}
209
210impl<F: Future> Stream for FuturesUnordered<F> {
211 type Item = F::Output;
212
213 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
214 let Self {
215 rem,
216 groups,
217 poll_next,
218 } = &mut *self;
219 if groups.is_empty() {
220 return Poll::Ready(None);
221 }
222
223 for _ in 0..groups.len() {
224 if *poll_next >= groups.len() {
225 *poll_next = 0;
226 }
227
228 let poll = Pin::new(&mut groups[*poll_next]).poll_next(cx);
229 match poll {
230 Poll::Ready(Some(x)) => {
231 *rem -= 1;
232 return Poll::Ready(Some(x));
233 }
234 Poll::Ready(None) => {
235 let group = groups.remove(*poll_next);
236 debug_assert!(group.is_empty());
237
238 if groups.is_empty() {
239 // group should contain at least 1 set
240 groups.push(group);
241 debug_assert_eq!(*rem, 0);
242 return Poll::Ready(None);
243 }
244
245 // we do not want to drop the last set as it contains
246 // the largest allocation that we want to keep a hold of
247 if *poll_next == groups.len() {
248 groups.push(group);
249 *poll_next = 0;
250 }
251 }
252 Poll::Pending => {
253 *poll_next += 1;
254 }
255 }
256 }
257 Poll::Pending
258 }
259
260 fn size_hint(&self) -> (usize, Option<usize>) {
261 (self.rem, Some(self.rem))
262 }
263}
264impl<F: Future> FusedStream for FuturesUnordered<F> {
265 fn is_terminated(&self) -> bool {
266 self.is_empty()
267 }
268}
269
270impl<F> FromIterator<F> for FuturesUnordered<F> {
271 /// Constructs a new, empty [`FuturesUnordered`] with a fixed capacity that is the length of the iterator.
272 ///
273 /// # Example
274 ///
275 /// Making 1024 total HTTP requests, with a max concurrency of 128
276 ///
277 /// ```
278 /// use futures::future::Future;
279 /// use futures::stream::StreamExt;
280 /// use futures_buffered::FuturesUnordered;
281 /// use hyper::client::conn::http1::{handshake, SendRequest};
282 /// use hyper::body::Incoming;
283 /// use hyper::{Request, Response};
284 /// use hyper_util::rt::TokioIo;
285 /// use tokio::net::TcpStream;
286 ///
287 /// # #[cfg(miri)] fn main() {}
288 /// # #[cfg(not(miri))] #[tokio::main]
289 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
290 /// // create a tcp connection
291 /// let stream = TcpStream::connect("example.com:80").await?;
292 ///
293 /// // perform the http handshakes
294 /// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
295 /// tokio::spawn(conn);
296 ///
297 /// /// make http request to example.com and read the response
298 /// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
299 /// let req = Request::builder()
300 /// .header("Host", "example.com")
301 /// .method("GET")
302 /// .body(String::new())
303 /// .unwrap();
304 /// rs.send_request(req)
305 /// }
306 ///
307 /// // create a queue with an initial 128 concurrent requests
308 /// let mut queue: FuturesUnordered<_> = (0..128).map(|_| make_req(&mut rs)).collect();
309 ///
310 /// // wait for a request to finish and start another to fill its place - up to 1024 total requests
311 /// for _ in 128..1024 {
312 /// queue.next().await;
313 /// queue.push(make_req(&mut rs));
314 /// }
315 /// // wait for the tail end to finish
316 /// for _ in 0..128 {
317 /// queue.next().await;
318 /// }
319 /// # Ok(()) }
320 /// ```
321 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
322 let iter = iter.into_iter();
323 let mut this =
324 FuturesUnordered::with_capacity(usize::max(iter.size_hint().0, MIN_CAPACITY));
325 for fut in iter {
326 this.push(fut);
327 }
328 this
329 }
330}
331
332impl<Fut> fmt::Debug for FuturesUnordered<Fut> {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 f.debug_struct("FuturesUnordered")
335 .field("queues", &self.groups)
336 .field("len", &self.rem)
337 .finish_non_exhaustive()
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use core::{cell::Cell, future::ready, time::Duration};
345 use futures::StreamExt;
346 use pin_project_lite::pin_project;
347 use std::{thread, time::Instant};
348
349 pin_project!(
350 struct PollCounter<'c, F> {
351 count: &'c Cell<usize>,
352 #[pin]
353 inner: F,
354 }
355 );
356
357 impl<F: Future> Future for PollCounter<'_, F> {
358 type Output = F::Output;
359 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360 self.count.set(self.count.get() + 1);
361 self.project().inner.poll(cx)
362 }
363 }
364
365 struct Sleep {
366 until: Instant,
367 }
368 impl Unpin for Sleep {}
369 impl Future for Sleep {
370 type Output = ();
371
372 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373 let until = self.until;
374 if until > Instant::now() {
375 let waker = cx.waker().clone();
376 thread::spawn(move || {
377 thread::sleep(until.duration_since(Instant::now()));
378 waker.wake();
379 });
380 Poll::Pending
381 } else {
382 Poll::Ready(())
383 }
384 }
385 }
386
387 struct Yield {
388 done: bool,
389 }
390 impl Unpin for Yield {}
391 impl Future for Yield {
392 type Output = ();
393
394 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395 if self.as_mut().done {
396 Poll::Ready(())
397 } else {
398 cx.waker().wake_by_ref();
399 self.as_mut().done = true;
400 Poll::Pending
401 }
402 }
403 }
404
405 fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
406 PollCounter {
407 count,
408 inner: Yield { done: false },
409 }
410 }
411
412 #[test]
413 fn single() {
414 let c = Cell::new(0);
415
416 let mut buffer = FuturesUnordered::new();
417 buffer.push(yield_now(&c));
418 futures::executor::block_on(buffer.next());
419
420 drop(buffer);
421 assert_eq!(c.into_inner(), 2);
422 }
423
424 #[test]
425 fn len() {
426 let mut buffer = FuturesUnordered::with_capacity(1);
427
428 assert_eq!(buffer.len(), 0);
429 assert!(buffer.is_empty());
430 assert_eq!(buffer.capacity(), 1);
431 assert_eq!(buffer.size_hint(), (0, Some(0)));
432 assert!(buffer.is_terminated());
433
434 buffer.push(ready(()));
435
436 assert_eq!(buffer.len(), 1);
437 assert!(!buffer.is_empty());
438 assert_eq!(buffer.capacity(), 1);
439 assert_eq!(buffer.size_hint(), (1, Some(1)));
440 assert!(!buffer.is_terminated());
441
442 buffer.push(ready(()));
443
444 assert_eq!(buffer.len(), 2);
445 assert!(!buffer.is_empty());
446 assert_eq!(buffer.capacity(), 3);
447 assert_eq!(buffer.size_hint(), (2, Some(2)));
448 assert!(!buffer.is_terminated());
449
450 futures::executor::block_on(buffer.next());
451 futures::executor::block_on(buffer.next());
452
453 assert_eq!(buffer.len(), 0);
454 assert!(buffer.is_empty());
455 assert_eq!(buffer.capacity(), 2);
456 assert_eq!(buffer.size_hint(), (0, Some(0)));
457 assert!(buffer.is_terminated());
458 }
459
460 #[test]
461 fn from_iter() {
462 let buffer = FuturesUnordered::from_iter((0..10).map(|_| ready(())));
463
464 assert_eq!(buffer.len(), 10);
465 assert_eq!(buffer.capacity(), 32);
466 assert_eq!(buffer.size_hint(), (10, Some(10)));
467 }
468
469 #[test]
470 fn multi() {
471 fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
472 yield_now(count)
473 }
474
475 let c = Cell::new(0);
476
477 let mut buffer = FuturesUnordered::with_capacity(1);
478 // build up
479 for _ in 0..10 {
480 buffer.push(wait(&c));
481 }
482 // poll and insert
483 for _ in 0..100 {
484 assert!(futures::executor::block_on(buffer.next()).is_some());
485 buffer.push(wait(&c));
486 }
487 // drain down
488 for _ in 0..10 {
489 assert!(futures::executor::block_on(buffer.next()).is_some());
490 }
491
492 let count = c.into_inner();
493 assert_eq!(count, 220);
494 }
495
496 #[test]
497 fn very_slow_task() {
498 let c = Cell::new(0);
499
500 let now = Instant::now();
501
502 let mut buffer = FuturesUnordered::with_capacity(1);
503 // build up
504 for _ in 0..9 {
505 buffer.push(yield_now(&c));
506 }
507 // spawn a slow future among a bunch of fast ones.
508 // the test is to make sure this doesn't block the rest getting completed
509 buffer.push(yield_now(&c));
510 // poll and insert
511 for _ in 0..100 {
512 assert!(futures::executor::block_on(buffer.next()).is_some());
513 buffer.push(yield_now(&c));
514 }
515 // drain down
516 for _ in 0..10 {
517 assert!(futures::executor::block_on(buffer.next()).is_some());
518 }
519
520 let dur = now.elapsed();
521 assert!(dur < Duration::from_millis(2050));
522
523 let count = c.into_inner();
524 assert_eq!(count, 220);
525 }
526
527 #[cfg(not(miri))]
528 #[tokio::test]
529 async fn unordered_large() {
530 for i in 0..256 {
531 let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
532 tokio::time::sleep(Duration::from_nanos(1)).await;
533 }))
534 .collect();
535 for _ in 0..i {
536 queue.next().await.unwrap();
537 }
538 }
539 }
540}