futures_buffered/join_all.rs
1use alloc::{boxed::Box, vec::Vec};
2use core::{
3 future::Future,
4 mem::MaybeUninit,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use crate::FuturesUnorderedBounded;
10
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12/// Future for the [`join_all`] function.
13pub struct JoinAll<F: Future> {
14 queue: FuturesUnorderedBounded<F>,
15 output: Box<[MaybeUninit<F::Output>]>,
16}
17
18impl<F: Future> Unpin for JoinAll<F> {}
19
20/// Creates a future which represents a collection of the outputs of the futures
21/// given.
22///
23/// The returned future will drive execution for all of its underlying futures,
24/// collecting the results into a destination `Vec<T>` in the same order as they
25/// were provided.
26///
27/// # Examples
28///
29/// ```
30/// # futures::executor::block_on(async {
31/// use futures_buffered::join_all;
32///
33/// async fn foo(i: u32) -> u32 { i }
34///
35/// let futures = vec![foo(1), foo(2), foo(3)];
36/// assert_eq!(join_all(futures).await, [1, 2, 3]);
37/// # });
38/// ```
39///
40/// ## Benchmarks
41///
42/// ### Speed
43///
44/// Running 256 100us timers in a single threaded tokio runtime:
45///
46/// ```text
47/// futures::future::join_all time: [3.3207 ms 3.3904 ms 3.4552 ms]
48/// futures_buffered::join_all time: [2.6058 ms 2.6616 ms 2.7189 ms]
49/// ```
50///
51/// ### Memory usage
52///
53/// Running 256 `Ready<i32>` futures.
54///
55/// - count: the number of times alloc/dealloc was called
56/// - alloc: the number of cumulative bytes allocated
57/// - dealloc: the number of cumulative bytes deallocated
58///
59/// ```text
60/// futures::future::join_all
61/// count: 512
62/// alloc: 26744 B
63/// dealloc: 26744 B
64///
65/// futures_buffered::join_all
66/// count: 6
67/// alloc: 10312 B
68/// dealloc: 10312 B
69/// ```
70pub fn join_all<I>(iter: I) -> JoinAll<<I as IntoIterator>::Item>
71where
72 I: IntoIterator,
73 <I as IntoIterator>::Item: Future,
74{
75 // create the queue
76 let queue = FuturesUnorderedBounded::from_iter(iter);
77
78 // create the output buffer
79 let mut output = Vec::with_capacity(queue.capacity());
80 output.resize_with(queue.capacity(), MaybeUninit::uninit);
81
82 JoinAll {
83 queue,
84 output: output.into_boxed_slice(),
85 }
86}
87
88impl<F: Future> Future for JoinAll<F> {
89 type Output = Vec<F::Output>;
90
91 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 loop {
93 match self.as_mut().queue.poll_inner(cx) {
94 Poll::Ready(Some((i, x))) => {
95 self.output[i].write(x);
96 }
97 Poll::Ready(None) => {
98 // SAFETY: for Ready(None) to be returned, we know that every future in the queue
99 // must be consumed. Since we have a 1:1 mapping in the queue to our output, we
100 // know that every output entry is init.
101 let boxed = unsafe {
102 // take the boxed slice
103 let boxed =
104 core::mem::replace(&mut self.output, Vec::new().into_boxed_slice());
105
106 // Box::assume_init
107 let raw = Box::into_raw(boxed);
108 Box::from_raw(raw as *mut [F::Output])
109 };
110
111 break Poll::Ready(boxed.into_vec());
112 }
113 Poll::Pending => break Poll::Pending,
114 }
115 }
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use core::future::ready;
122
123 #[test]
124 fn join_all() {
125 let x = futures::executor::block_on(crate::join_all((0..10).map(ready)));
126
127 assert_eq!(x.len(), 10);
128 assert_eq!(x.capacity(), 10);
129 }
130}