futures_concurrency/future/join/
vec.rs

1use super::Join as JoinTrait;
2use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec};
3
4#[cfg(all(feature = "alloc", not(feature = "std")))]
5use alloc::vec::Vec;
6
7use core::fmt;
8use core::future::{Future, IntoFuture};
9use core::mem::ManuallyDrop;
10use core::ops::DerefMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13
14use pin_project::{pin_project, pinned_drop};
15
16/// A future which waits for multiple futures to complete.
17///
18/// This `struct` is created by the [`join`] method on the [`Join`] trait. See
19/// its documentation for more.
20///
21/// [`join`]: crate::future::Join::join
22/// [`Join`]: crate::future::Join
23#[must_use = "futures do nothing unless you `.await` or poll them"]
24#[pin_project(PinnedDrop)]
25pub struct Join<Fut>
26where
27    Fut: Future,
28{
29    consumed: bool,
30    pending: usize,
31    items: OutputVec<<Fut as Future>::Output>,
32    wakers: WakerVec,
33    state: PollVec,
34    #[pin]
35    futures: FutureVec<Fut>,
36}
37
38impl<Fut> Join<Fut>
39where
40    Fut: Future,
41{
42    pub(crate) fn new(futures: Vec<Fut>) -> Self {
43        let len = futures.len();
44        Join {
45            consumed: false,
46            pending: len,
47            items: OutputVec::uninit(len),
48            wakers: WakerVec::new(len),
49            state: PollVec::new_pending(len),
50            futures: FutureVec::new(futures),
51        }
52    }
53}
54
55impl<Fut> JoinTrait for Vec<Fut>
56where
57    Fut: IntoFuture,
58{
59    type Output = Vec<Fut::Output>;
60    type Future = Join<Fut::IntoFuture>;
61
62    fn join(self) -> Self::Future {
63        Join::new(self.into_iter().map(IntoFuture::into_future).collect())
64    }
65}
66
67impl<Fut> fmt::Debug for Join<Fut>
68where
69    Fut: Future + fmt::Debug,
70{
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        f.debug_list().entries(self.state.iter()).finish()
73    }
74}
75
76impl<Fut> Future for Join<Fut>
77where
78    Fut: Future,
79{
80    type Output = Vec<Fut::Output>;
81
82    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let mut this = self.project();
84
85        assert!(
86            !*this.consumed,
87            "Futures must not be polled after completing"
88        );
89
90        let mut readiness = this.wakers.readiness();
91        readiness.set_waker(cx.waker());
92        if *this.pending != 0 && !readiness.any_ready() {
93            // Nothing is ready yet
94            return Poll::Pending;
95        }
96
97        // Poll all ready futures
98        let futures = this.futures.as_mut();
99        let states = &mut this.state[..];
100        for (i, mut fut) in futures.iter().enumerate() {
101            if states[i].is_pending() && readiness.clear_ready(i) {
102                // unlock readiness so we don't deadlock when polling
103                #[allow(clippy::drop_non_drop)]
104                drop(readiness);
105
106                // Obtain the intermediate waker.
107                let mut cx = Context::from_waker(this.wakers.get(i).unwrap());
108
109                // Poll the future
110                // SAFETY: the future's state was "pending", so it's safe to poll
111                if let Poll::Ready(value) = unsafe {
112                    fut.as_mut()
113                        .map_unchecked_mut(|t| t.deref_mut())
114                        .poll(&mut cx)
115                } {
116                    this.items.write(i, value);
117                    states[i].set_ready();
118                    *this.pending -= 1;
119                    // SAFETY: the future state has been changed to "ready" which
120                    // means we'll no longer poll the future, so it's safe to drop
121                    unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
122                }
123
124                // Lock readiness so we can use it again
125                readiness = this.wakers.readiness();
126            }
127        }
128
129        // Check whether we're all done now or need to keep going.
130        if *this.pending == 0 {
131            // Mark all data as "consumed" before we take it
132            *this.consumed = true;
133            this.state.iter_mut().for_each(|state| {
134                debug_assert!(
135                    state.is_ready(),
136                    "Future should have reached a `Ready` state"
137                );
138                state.set_none();
139            });
140
141            // SAFETY: we've checked with the state that all of our outputs have been
142            // filled, which means we're ready to take the data and assume it's initialized.
143            Poll::Ready(unsafe { this.items.take() })
144        } else {
145            Poll::Pending
146        }
147    }
148}
149
150/// Drop the already initialized values on cancellation.
151#[pinned_drop]
152impl<Fut> PinnedDrop for Join<Fut>
153where
154    Fut: Future,
155{
156    fn drop(self: Pin<&mut Self>) {
157        let mut this = self.project();
158
159        // Drop all initialized values.
160        for i in this.state.ready_indexes() {
161            // SAFETY: we've just filtered down to *only* the initialized values.
162            // We can assume they're initialized, and this is where we drop them.
163            unsafe { this.items.drop(i) };
164        }
165
166        // Drop all pending futures.
167        for i in this.state.pending_indexes() {
168            // SAFETY: we've just filtered down to *only* the pending futures,
169            // which have not yet been dropped.
170            unsafe { this.futures.as_mut().drop(i) };
171        }
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use super::*;
178    use crate::utils::DummyWaker;
179
180    use alloc::format;
181    use alloc::sync::Arc;
182    use alloc::vec;
183    use core::future;
184
185    #[test]
186    fn smoke() {
187        futures_lite::future::block_on(async {
188            let fut = vec![future::ready("hello"), future::ready("world")].join();
189            assert_eq!(fut.await, vec!["hello", "world"]);
190        });
191    }
192
193    #[test]
194    fn empty() {
195        futures_lite::future::block_on(async {
196            let data: Vec<future::Ready<()>> = vec![];
197            let fut = data.join();
198            assert_eq!(fut.await, vec![]);
199        });
200    }
201
202    #[test]
203    fn debug() {
204        let mut fut = vec![future::ready("hello"), future::ready("world")].join();
205        assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
206        let mut fut = Pin::new(&mut fut);
207
208        let waker = Arc::new(DummyWaker()).into();
209        let mut cx = Context::from_waker(&waker);
210        let _ = fut.as_mut().poll(&mut cx);
211        assert_eq!(format!("{:?}", fut), "[None, None]");
212    }
213}