futures_buffered/
join_all.rs

1use alloc::{boxed::Box, vec::Vec};
2use core::{
3    future::Future,
4    mem::MaybeUninit,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use crate::FuturesUnorderedBounded;
10
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12/// Future for the [`join_all`] function.
13pub struct JoinAll<F: Future> {
14    queue: FuturesUnorderedBounded<F>,
15    output: Box<[MaybeUninit<F::Output>]>,
16}
17
18impl<F: Future> Unpin for JoinAll<F> {}
19
20/// Creates a future which represents a collection of the outputs of the futures
21/// given.
22///
23/// The returned future will drive execution for all of its underlying futures,
24/// collecting the results into a destination `Vec<T>` in the same order as they
25/// were provided.
26///
27/// # Examples
28///
29/// ```
30/// # futures::executor::block_on(async {
31/// use futures_buffered::join_all;
32///
33/// async fn foo(i: u32) -> u32 { i }
34///
35/// let futures = vec![foo(1), foo(2), foo(3)];
36/// assert_eq!(join_all(futures).await, [1, 2, 3]);
37/// # });
38/// ```
39///
40/// ## Benchmarks
41///
42/// ### Speed
43///
44/// Running 256 100us timers in a single threaded tokio runtime:
45///
46/// ```text
47/// futures::future::join_all   time:   [3.3207 ms 3.3904 ms 3.4552 ms]
48/// futures_buffered::join_all  time:   [2.6058 ms 2.6616 ms 2.7189 ms]
49/// ```
50///
51/// ### Memory usage
52///
53/// Running 256 `Ready<i32>` futures.
54///
55/// - count: the number of times alloc/dealloc was called
56/// - alloc: the number of cumulative bytes allocated
57/// - dealloc: the number of cumulative bytes deallocated
58///
59/// ```text
60/// futures::future::join_all
61///     count:    512
62///     alloc:    26744 B
63///     dealloc:  26744 B
64///
65/// futures_buffered::join_all
66///     count:    6
67///     alloc:    10312 B
68///     dealloc:  10312 B
69/// ```
70pub fn join_all<I>(iter: I) -> JoinAll<<I as IntoIterator>::Item>
71where
72    I: IntoIterator,
73    <I as IntoIterator>::Item: Future,
74{
75    // create the queue
76    let queue = FuturesUnorderedBounded::from_iter(iter);
77
78    // create the output buffer
79    let mut output = Vec::with_capacity(queue.capacity());
80    output.resize_with(queue.capacity(), MaybeUninit::uninit);
81
82    JoinAll {
83        queue,
84        output: output.into_boxed_slice(),
85    }
86}
87
88impl<F: Future> Future for JoinAll<F> {
89    type Output = Vec<F::Output>;
90
91    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        loop {
93            match self.as_mut().queue.poll_inner(cx) {
94                Poll::Ready(Some((i, x))) => {
95                    self.output[i].write(x);
96                }
97                Poll::Ready(None) => {
98                    // SAFETY: for Ready(None) to be returned, we know that every future in the queue
99                    // must be consumed. Since we have a 1:1 mapping in the queue to our output, we
100                    // know that every output entry is init.
101                    let boxed = unsafe {
102                        // take the boxed slice
103                        let boxed =
104                            core::mem::replace(&mut self.output, Vec::new().into_boxed_slice());
105
106                        // Box::assume_init
107                        let raw = Box::into_raw(boxed);
108                        Box::from_raw(raw as *mut [F::Output])
109                    };
110
111                    break Poll::Ready(boxed.into_vec());
112                }
113                Poll::Pending => break Poll::Pending,
114            }
115        }
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use core::future::ready;
122
123    #[test]
124    fn join_all() {
125        let x = futures::executor::block_on(crate::join_all((0..10).map(ready)));
126
127        assert_eq!(x.len(), 10);
128        assert_eq!(x.capacity(), 10);
129    }
130}