futures_concurrency/future/join/
array.rs1use super::Join as JoinTrait;
2use crate::utils::{FutureArray, OutputArray, PollArray, WakerArray};
3
4use core::fmt;
5use core::future::{Future, IntoFuture};
6use core::mem::ManuallyDrop;
7use core::ops::DerefMut;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use pin_project::{pin_project, pinned_drop};
12
13#[must_use = "futures do nothing unless you `.await` or poll them"]
21#[pin_project(PinnedDrop)]
22pub struct Join<Fut, const N: usize>
23where
24 Fut: Future,
25{
26 consumed: bool,
28 pending: usize,
30 items: OutputArray<<Fut as Future>::Output, N>,
32 wakers: WakerArray<N>,
35 state: PollArray<N>,
37 #[pin]
38 futures: FutureArray<Fut, N>,
40}
41
42impl<Fut, const N: usize> Join<Fut, N>
43where
44 Fut: Future,
45{
46 #[inline]
47 pub(crate) fn new(futures: [Fut; N]) -> Self {
48 Join {
49 consumed: false,
50 pending: N,
51 items: OutputArray::uninit(),
52 wakers: WakerArray::new(),
53 state: PollArray::new_pending(),
54 futures: FutureArray::new(futures),
55 }
56 }
57}
58
59impl<Fut, const N: usize> JoinTrait for [Fut; N]
60where
61 Fut: IntoFuture,
62{
63 type Output = [Fut::Output; N];
64 type Future = Join<Fut::IntoFuture, N>;
65
66 #[inline]
67 fn join(self) -> Self::Future {
68 Join::new(self.map(IntoFuture::into_future))
69 }
70}
71
72impl<Fut, const N: usize> fmt::Debug for Join<Fut, N>
73where
74 Fut: Future + fmt::Debug,
75{
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 f.debug_list().entries(self.state.iter()).finish()
78 }
79}
80
81impl<Fut, const N: usize> Future for Join<Fut, N>
82where
83 Fut: Future,
84{
85 type Output = [Fut::Output; N];
86
87 #[inline]
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let this = self.project();
90
91 assert!(
92 !*this.consumed,
93 "Futures must not be polled after completing"
94 );
95
96 let mut readiness = this.wakers.readiness();
97 readiness.set_waker(cx.waker());
98 if *this.pending != 0 && !readiness.any_ready() {
99 return Poll::Pending;
101 }
102
103 for (i, mut fut) in this.futures.iter().enumerate() {
105 if this.state[i].is_pending() && readiness.clear_ready(i) {
106 #[allow(clippy::drop_non_drop)]
108 drop(readiness);
109
110 let mut cx = Context::from_waker(this.wakers.get(i).unwrap());
112
113 if let Poll::Ready(value) = unsafe {
116 fut.as_mut()
117 .map_unchecked_mut(|t| t.deref_mut())
118 .poll(&mut cx)
119 } {
120 this.items.write(i, value);
121 this.state[i].set_ready();
122 *this.pending -= 1;
123 unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
126 }
127
128 readiness = this.wakers.readiness();
130 }
131 }
132
133 if *this.pending == 0 {
135 *this.consumed = true;
137 for state in this.state.iter_mut() {
138 debug_assert!(
139 state.is_ready(),
140 "Future should have reached a `Ready` state"
141 );
142 state.set_none();
143 }
144
145 Poll::Ready(unsafe { this.items.take() })
148 } else {
149 Poll::Pending
150 }
151 }
152}
153
154#[pinned_drop]
156impl<Fut, const N: usize> PinnedDrop for Join<Fut, N>
157where
158 Fut: Future,
159{
160 fn drop(self: Pin<&mut Self>) {
161 let mut this = self.project();
162
163 for i in this.state.ready_indexes() {
165 unsafe { this.items.drop(i) };
168 }
169
170 for i in this.state.pending_indexes() {
172 unsafe { this.futures.as_mut().drop(i) };
175 }
176 }
177}
178
179#[cfg(test)]
180mod test {
181 use super::*;
182
183 use core::future;
184
185 #[test]
186 fn smoke() {
187 futures_lite::future::block_on(async {
188 let fut = [future::ready("hello"), future::ready("world")].join();
189 assert_eq!(fut.await, ["hello", "world"]);
190 });
191 }
192
193 #[test]
194 fn empty() {
195 futures_lite::future::block_on(async {
196 let data: [future::Ready<()>; 0] = [];
197 let fut = data.join();
198 assert_eq!(fut.await, []);
199 });
200 }
201
202 #[test]
203 #[cfg(feature = "alloc")]
204 fn debug() {
205 use crate::utils::DummyWaker;
206 use alloc::format;
207 use alloc::sync::Arc;
208 use core::task::Context;
209
210 let mut fut = [future::ready("hello"), future::ready("world")].join();
211 assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
212 let mut fut = Pin::new(&mut fut);
213
214 let waker = Arc::new(DummyWaker()).into();
215 let mut cx = Context::from_waker(&waker);
216 let _ = fut.as_mut().poll(&mut cx);
217 assert_eq!(format!("{:?}", fut), "[None, None]");
218 }
219}