futures_concurrency/future/join/
array.rs

1use super::Join as JoinTrait;
2use crate::utils::{FutureArray, OutputArray, PollArray, WakerArray};
3
4use core::fmt;
5use core::future::{Future, IntoFuture};
6use core::mem::ManuallyDrop;
7use core::ops::DerefMut;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use pin_project::{pin_project, pinned_drop};
12
13/// A future which waits for two similarly-typed futures to complete.
14///
15/// This `struct` is created by the [`join`] method on the [`Join`] trait. See
16/// its documentation for more.
17///
18/// [`join`]: crate::future::Join::join
19/// [`Join`]: crate::future::Join
20#[must_use = "futures do nothing unless you `.await` or poll them"]
21#[pin_project(PinnedDrop)]
22pub struct Join<Fut, const N: usize>
23where
24    Fut: Future,
25{
26    /// A boolean which holds whether the future has completed
27    consumed: bool,
28    /// The number of futures which are currently still in-flight
29    pending: usize,
30    /// The output data, to be returned after the future completes
31    items: OutputArray<<Fut as Future>::Output, N>,
32    /// A structure holding the waker passed to the future, and the various
33    /// sub-wakers passed to the contained futures.
34    wakers: WakerArray<N>,
35    /// The individual poll state of each future.
36    state: PollArray<N>,
37    #[pin]
38    /// The array of futures passed to the structure.
39    futures: FutureArray<Fut, N>,
40}
41
42impl<Fut, const N: usize> Join<Fut, N>
43where
44    Fut: Future,
45{
46    #[inline]
47    pub(crate) fn new(futures: [Fut; N]) -> Self {
48        Join {
49            consumed: false,
50            pending: N,
51            items: OutputArray::uninit(),
52            wakers: WakerArray::new(),
53            state: PollArray::new_pending(),
54            futures: FutureArray::new(futures),
55        }
56    }
57}
58
59impl<Fut, const N: usize> JoinTrait for [Fut; N]
60where
61    Fut: IntoFuture,
62{
63    type Output = [Fut::Output; N];
64    type Future = Join<Fut::IntoFuture, N>;
65
66    #[inline]
67    fn join(self) -> Self::Future {
68        Join::new(self.map(IntoFuture::into_future))
69    }
70}
71
72impl<Fut, const N: usize> fmt::Debug for Join<Fut, N>
73where
74    Fut: Future + fmt::Debug,
75{
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_list().entries(self.state.iter()).finish()
78    }
79}
80
81impl<Fut, const N: usize> Future for Join<Fut, N>
82where
83    Fut: Future,
84{
85    type Output = [Fut::Output; N];
86
87    #[inline]
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let this = self.project();
90
91        assert!(
92            !*this.consumed,
93            "Futures must not be polled after completing"
94        );
95
96        let mut readiness = this.wakers.readiness();
97        readiness.set_waker(cx.waker());
98        if *this.pending != 0 && !readiness.any_ready() {
99            // Nothing is ready yet
100            return Poll::Pending;
101        }
102
103        // Poll all ready futures
104        for (i, mut fut) in this.futures.iter().enumerate() {
105            if this.state[i].is_pending() && readiness.clear_ready(i) {
106                // unlock readiness so we don't deadlock when polling
107                #[allow(clippy::drop_non_drop)]
108                drop(readiness);
109
110                // Obtain the intermediate waker.
111                let mut cx = Context::from_waker(this.wakers.get(i).unwrap());
112
113                // Poll the future
114                // SAFETY: the future's state was "pending", so it's safe to poll
115                if let Poll::Ready(value) = unsafe {
116                    fut.as_mut()
117                        .map_unchecked_mut(|t| t.deref_mut())
118                        .poll(&mut cx)
119                } {
120                    this.items.write(i, value);
121                    this.state[i].set_ready();
122                    *this.pending -= 1;
123                    // SAFETY: the future state has been changed to "ready" which
124                    // means we'll no longer poll the future, so it's safe to drop
125                    unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
126                }
127
128                // Lock readiness so we can use it again
129                readiness = this.wakers.readiness();
130            }
131        }
132
133        // Check whether we're all done now or need to keep going.
134        if *this.pending == 0 {
135            // Mark all data as "consumed" before we take it
136            *this.consumed = true;
137            for state in this.state.iter_mut() {
138                debug_assert!(
139                    state.is_ready(),
140                    "Future should have reached a `Ready` state"
141                );
142                state.set_none();
143            }
144
145            // SAFETY: we've checked with the state that all of our outputs have been
146            // filled, which means we're ready to take the data and assume it's initialized.
147            Poll::Ready(unsafe { this.items.take() })
148        } else {
149            Poll::Pending
150        }
151    }
152}
153
154/// Drop the already initialized values on cancellation.
155#[pinned_drop]
156impl<Fut, const N: usize> PinnedDrop for Join<Fut, N>
157where
158    Fut: Future,
159{
160    fn drop(self: Pin<&mut Self>) {
161        let mut this = self.project();
162
163        // Drop all initialized values.
164        for i in this.state.ready_indexes() {
165            // SAFETY: we've just filtered down to *only* the initialized values.
166            // We can assume they're initialized, and this is where we drop them.
167            unsafe { this.items.drop(i) };
168        }
169
170        // Drop all pending futures.
171        for i in this.state.pending_indexes() {
172            // SAFETY: we've just filtered down to *only* the pending futures,
173            // which have not yet been dropped.
174            unsafe { this.futures.as_mut().drop(i) };
175        }
176    }
177}
178
179#[cfg(test)]
180mod test {
181    use super::*;
182
183    use core::future;
184
185    #[test]
186    fn smoke() {
187        futures_lite::future::block_on(async {
188            let fut = [future::ready("hello"), future::ready("world")].join();
189            assert_eq!(fut.await, ["hello", "world"]);
190        });
191    }
192
193    #[test]
194    fn empty() {
195        futures_lite::future::block_on(async {
196            let data: [future::Ready<()>; 0] = [];
197            let fut = data.join();
198            assert_eq!(fut.await, []);
199        });
200    }
201
202    #[test]
203    #[cfg(feature = "alloc")]
204    fn debug() {
205        use crate::utils::DummyWaker;
206        use alloc::format;
207        use alloc::sync::Arc;
208        use core::task::Context;
209
210        let mut fut = [future::ready("hello"), future::ready("world")].join();
211        assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
212        let mut fut = Pin::new(&mut fut);
213
214        let waker = Arc::new(DummyWaker()).into();
215        let mut cx = Context::from_waker(&waker);
216        let _ = fut.as_mut().poll(&mut cx);
217        assert_eq!(format!("{:?}", fut), "[None, None]");
218    }
219}