futures_concurrency/future/join/
vec.rs1use super::Join as JoinTrait;
2use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec};
3
4#[cfg(all(feature = "alloc", not(feature = "std")))]
5use alloc::vec::Vec;
6
7use core::fmt;
8use core::future::{Future, IntoFuture};
9use core::mem::ManuallyDrop;
10use core::ops::DerefMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13
14use pin_project::{pin_project, pinned_drop};
15
16#[must_use = "futures do nothing unless you `.await` or poll them"]
24#[pin_project(PinnedDrop)]
25pub struct Join<Fut>
26where
27 Fut: Future,
28{
29 consumed: bool,
30 pending: usize,
31 items: OutputVec<<Fut as Future>::Output>,
32 wakers: WakerVec,
33 state: PollVec,
34 #[pin]
35 futures: FutureVec<Fut>,
36}
37
38impl<Fut> Join<Fut>
39where
40 Fut: Future,
41{
42 pub(crate) fn new(futures: Vec<Fut>) -> Self {
43 let len = futures.len();
44 Join {
45 consumed: false,
46 pending: len,
47 items: OutputVec::uninit(len),
48 wakers: WakerVec::new(len),
49 state: PollVec::new_pending(len),
50 futures: FutureVec::new(futures),
51 }
52 }
53}
54
55impl<Fut> JoinTrait for Vec<Fut>
56where
57 Fut: IntoFuture,
58{
59 type Output = Vec<Fut::Output>;
60 type Future = Join<Fut::IntoFuture>;
61
62 fn join(self) -> Self::Future {
63 Join::new(self.into_iter().map(IntoFuture::into_future).collect())
64 }
65}
66
67impl<Fut> fmt::Debug for Join<Fut>
68where
69 Fut: Future + fmt::Debug,
70{
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 f.debug_list().entries(self.state.iter()).finish()
73 }
74}
75
76impl<Fut> Future for Join<Fut>
77where
78 Fut: Future,
79{
80 type Output = Vec<Fut::Output>;
81
82 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let mut this = self.project();
84
85 assert!(
86 !*this.consumed,
87 "Futures must not be polled after completing"
88 );
89
90 let mut readiness = this.wakers.readiness();
91 readiness.set_waker(cx.waker());
92 if *this.pending != 0 && !readiness.any_ready() {
93 return Poll::Pending;
95 }
96
97 let futures = this.futures.as_mut();
99 let states = &mut this.state[..];
100 for (i, mut fut) in futures.iter().enumerate() {
101 if states[i].is_pending() && readiness.clear_ready(i) {
102 #[allow(clippy::drop_non_drop)]
104 drop(readiness);
105
106 let mut cx = Context::from_waker(this.wakers.get(i).unwrap());
108
109 if let Poll::Ready(value) = unsafe {
112 fut.as_mut()
113 .map_unchecked_mut(|t| t.deref_mut())
114 .poll(&mut cx)
115 } {
116 this.items.write(i, value);
117 states[i].set_ready();
118 *this.pending -= 1;
119 unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
122 }
123
124 readiness = this.wakers.readiness();
126 }
127 }
128
129 if *this.pending == 0 {
131 *this.consumed = true;
133 this.state.iter_mut().for_each(|state| {
134 debug_assert!(
135 state.is_ready(),
136 "Future should have reached a `Ready` state"
137 );
138 state.set_none();
139 });
140
141 Poll::Ready(unsafe { this.items.take() })
144 } else {
145 Poll::Pending
146 }
147 }
148}
149
150#[pinned_drop]
152impl<Fut> PinnedDrop for Join<Fut>
153where
154 Fut: Future,
155{
156 fn drop(self: Pin<&mut Self>) {
157 let mut this = self.project();
158
159 for i in this.state.ready_indexes() {
161 unsafe { this.items.drop(i) };
164 }
165
166 for i in this.state.pending_indexes() {
168 unsafe { this.futures.as_mut().drop(i) };
171 }
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use super::*;
178 use crate::utils::DummyWaker;
179
180 use alloc::format;
181 use alloc::sync::Arc;
182 use alloc::vec;
183 use core::future;
184
185 #[test]
186 fn smoke() {
187 futures_lite::future::block_on(async {
188 let fut = vec![future::ready("hello"), future::ready("world")].join();
189 assert_eq!(fut.await, vec!["hello", "world"]);
190 });
191 }
192
193 #[test]
194 fn empty() {
195 futures_lite::future::block_on(async {
196 let data: Vec<future::Ready<()>> = vec![];
197 let fut = data.join();
198 assert_eq!(fut.await, vec![]);
199 });
200 }
201
202 #[test]
203 fn debug() {
204 let mut fut = vec![future::ready("hello"), future::ready("world")].join();
205 assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
206 let mut fut = Pin::new(&mut fut);
207
208 let waker = Arc::new(DummyWaker()).into();
209 let mut cx = Context::from_waker(&waker);
210 let _ = fut.as_mut().poll(&mut cx);
211 assert_eq!(format!("{:?}", fut), "[None, None]");
212 }
213}