aiur/
join.rs

1//  \ O /
2//  / * \    aiur: the homeplanet for the famous executors
3// |' | '|   (c) 2020 - present, Vladimir Zvezda
4//   / \
5use crate::pinned_any_of;
6use std::future::Future;
7
8/// Waits concurrently until all futures are completed.
9///
10///
11/// Internally it just select the correct `joinN()` function based on the number of arguments
12/// supplied. For example the `join!(fut1, fut2, fut3).await` is the same as
13/// [`join3`]`(fut1, fut2, fut3).await`.
14///
15/// Please note that unlike join implementation in other crates this one returns future and
16/// requires `.await` to start execution.
17#[macro_export]
18macro_rules! join {
19    ($f1:expr, $f2:expr $(,)?) => {
20        $crate::join2($f1, $f2)
21    };
22    ($f1:expr, $f2:expr, $f3:expr $(,)?) => {
23        $crate::join3($f1, $f2, $f3)
24    };
25    ($f1:expr, $f2:expr, $f3:expr, $f4:expr $(,)?) => {
26        $crate::join4($f1, $f2, $f3, $f4)
27    };
28    ($f1:expr, $f2:expr, $f3:expr, $f4:expr, $f5:expr $(,)?) => {
29        $crate::join5($f1, $f2, $f3, $f4, $f5)
30    };
31    ($f1:expr, $f2:expr, $f3:expr, $f4:expr, $f5:expr, $f6:expr $(,)?) => {
32        $crate::join6($f1, $f2, $f3, $f4, $f5, $f6)
33    };
34    ($f1:expr, $f2:expr, $f3:expr, $f4:expr, $f5:expr, $f6:expr, $f7:expr $(,)?) => {
35        $crate::join7($f1, $f2, $f3, $f4, $f5, $f6, $f7)
36    };
37    ($f1:expr, $f2:expr, $f3:expr, $f4:expr, $f5:expr, $f6:expr, $f7:expr, $f8:expr $(,)?) => {
38        $crate::join8($f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8)
39    };
40}
41
42/// Polls two futures concurrently until both are completed.
43pub async fn join2<FutT1, FutT2>(f1: FutT1, f2: FutT2) -> (FutT1::Output, FutT2::Output)
44where
45    FutT1: Future,
46    FutT2: Future,
47{
48    pinned_any_of!(stream, f1, f2);
49    let mut res = std::mem::MaybeUninit::<(FutT1::Output, FutT2::Output)>::uninit();
50    unsafe {
51        let ptr = res.as_mut_ptr();
52        while let Some(val) = stream.next().await {
53            match val {
54                crate::OneOf2::First(x) => (*ptr).0 = x,
55                crate::OneOf2::Second(x) => (*ptr).1 = x,
56            }
57        }
58    }
59    unsafe { res.assume_init() }
60}
61
62/// Polls three futures concurrently until all are completed.
63pub async fn join3<FutT1, FutT2, FutT3>(
64    f1: FutT1,
65    f2: FutT2,
66    f3: FutT3,
67) -> (FutT1::Output, FutT2::Output, FutT3::Output)
68where
69    FutT1: Future,
70    FutT2: Future,
71    FutT3: Future,
72{
73    let mut res = std::mem::MaybeUninit::<(FutT1::Output, FutT2::Output, FutT3::Output)>::uninit();
74
75    pinned_any_of!(stream, f1, f2, f3);
76
77    unsafe {
78        let ptr = res.as_mut_ptr();
79        while let Some(val) = stream.next().await {
80            match val {
81                crate::OneOf3::First(x) => (*ptr).0 = x,
82                crate::OneOf3::Second(x) => (*ptr).1 = x,
83                crate::OneOf3::Third(x) => (*ptr).2 = x,
84            }
85        }
86    }
87    unsafe { res.assume_init() }
88}
89
90/// Polls four futures concurrently until all are completed.
91pub async fn join4<FutT1, FutT2, FutT3, FutT4>(
92    f1: FutT1,
93    f2: FutT2,
94    f3: FutT3,
95    f4: FutT4,
96) -> (FutT1::Output, FutT2::Output, FutT3::Output, FutT4::Output)
97where
98    FutT1: Future,
99    FutT2: Future,
100    FutT3: Future,
101    FutT4: Future,
102{
103    let mut res = std::mem::MaybeUninit::<(
104        FutT1::Output,
105        FutT2::Output,
106        FutT3::Output,
107        FutT4::Output,
108    )>::uninit();
109
110    pinned_any_of!(stream, f1, f2, f3, f4);
111
112    unsafe {
113        let ptr = res.as_mut_ptr();
114        while let Some(val) = stream.next().await {
115            match val {
116                crate::OneOf4::First(x) => (*ptr).0 = x,
117                crate::OneOf4::Second(x) => (*ptr).1 = x,
118                crate::OneOf4::Third(x) => (*ptr).2 = x,
119                crate::OneOf4::Fourth(x) => (*ptr).3 = x,
120            }
121        }
122    }
123    unsafe { res.assume_init() }
124}
125
126/// Polls five futures concurrently until all are completed.
127pub async fn join5<FutT1, FutT2, FutT3, FutT4, FutT5>(
128    f1: FutT1,
129    f2: FutT2,
130    f3: FutT3,
131    f4: FutT4,
132    f5: FutT5,
133) -> (
134    FutT1::Output,
135    FutT2::Output,
136    FutT3::Output,
137    FutT4::Output,
138    FutT5::Output,
139)
140where
141    FutT1: Future,
142    FutT2: Future,
143    FutT3: Future,
144    FutT4: Future,
145    FutT5: Future,
146{
147    let mut res = std::mem::MaybeUninit::<(
148        FutT1::Output,
149        FutT2::Output,
150        FutT3::Output,
151        FutT4::Output,
152        FutT5::Output,
153    )>::uninit();
154
155    pinned_any_of!(stream, f1, f2, f3, f4, f5);
156
157    unsafe {
158        let ptr = res.as_mut_ptr();
159        while let Some(val) = stream.next().await {
160            match val {
161                crate::OneOf5::First(x) => (*ptr).0 = x,
162                crate::OneOf5::Second(x) => (*ptr).1 = x,
163                crate::OneOf5::Third(x) => (*ptr).2 = x,
164                crate::OneOf5::Fourth(x) => (*ptr).3 = x,
165                crate::OneOf5::Fifth(x) => (*ptr).4 = x,
166            }
167        }
168    }
169    unsafe { res.assume_init() }
170}
171
172/// Polls six futures concurrently until all are completed.
173pub async fn join6<FutT1, FutT2, FutT3, FutT4, FutT5, FutT6>(
174    f1: FutT1,
175    f2: FutT2,
176    f3: FutT3,
177    f4: FutT4,
178    f5: FutT5,
179    f6: FutT6,
180) -> (
181    FutT1::Output,
182    FutT2::Output,
183    FutT3::Output,
184    FutT4::Output,
185    FutT5::Output,
186    FutT6::Output,
187)
188where
189    FutT1: Future,
190    FutT2: Future,
191    FutT3: Future,
192    FutT4: Future,
193    FutT5: Future,
194    FutT6: Future,
195{
196    let mut res = std::mem::MaybeUninit::<(
197        FutT1::Output,
198        FutT2::Output,
199        FutT3::Output,
200        FutT4::Output,
201        FutT5::Output,
202        FutT6::Output,
203    )>::uninit();
204
205    pinned_any_of!(stream, f1, f2, f3, f4, f5, f6);
206
207    unsafe {
208        let ptr = res.as_mut_ptr();
209        while let Some(val) = stream.next().await {
210            match val {
211                crate::OneOf6::First(x) => (*ptr).0 = x,
212                crate::OneOf6::Second(x) => (*ptr).1 = x,
213                crate::OneOf6::Third(x) => (*ptr).2 = x,
214                crate::OneOf6::Fourth(x) => (*ptr).3 = x,
215                crate::OneOf6::Fifth(x) => (*ptr).4 = x,
216                crate::OneOf6::Sixth(x) => (*ptr).5 = x,
217            }
218        }
219    }
220    unsafe { res.assume_init() }
221}
222
223/// Polls seven futures concurrently until all are completed.
224pub async fn join7<FutT1, FutT2, FutT3, FutT4, FutT5, FutT6, FutT7>(
225    f1: FutT1,
226    f2: FutT2,
227    f3: FutT3,
228    f4: FutT4,
229    f5: FutT5,
230    f6: FutT6,
231    f7: FutT7,
232) -> (
233    FutT1::Output,
234    FutT2::Output,
235    FutT3::Output,
236    FutT4::Output,
237    FutT5::Output,
238    FutT6::Output,
239    FutT7::Output,
240)
241where
242    FutT1: Future,
243    FutT2: Future,
244    FutT3: Future,
245    FutT4: Future,
246    FutT5: Future,
247    FutT6: Future,
248    FutT7: Future,
249{
250    let mut res = std::mem::MaybeUninit::<(
251        FutT1::Output,
252        FutT2::Output,
253        FutT3::Output,
254        FutT4::Output,
255        FutT5::Output,
256        FutT6::Output,
257        FutT7::Output,
258    )>::uninit();
259
260    pinned_any_of!(stream, f1, f2, f3, f4, f5, f6, f7);
261
262    unsafe {
263        let ptr = res.as_mut_ptr();
264        while let Some(val) = stream.next().await {
265            match val {
266                crate::OneOf7::First(x) => (*ptr).0 = x,
267                crate::OneOf7::Second(x) => (*ptr).1 = x,
268                crate::OneOf7::Third(x) => (*ptr).2 = x,
269                crate::OneOf7::Fourth(x) => (*ptr).3 = x,
270                crate::OneOf7::Fifth(x) => (*ptr).4 = x,
271                crate::OneOf7::Sixth(x) => (*ptr).5 = x,
272                crate::OneOf7::Seventh(x) => (*ptr).6 = x,
273            }
274        }
275    }
276    unsafe { res.assume_init() }
277}
278
279/// Polls eight futures concurrently until all are completed.
280pub async fn join8<FutT1, FutT2, FutT3, FutT4, FutT5, FutT6, FutT7, FutT8>(
281    f1: FutT1,
282    f2: FutT2,
283    f3: FutT3,
284    f4: FutT4,
285    f5: FutT5,
286    f6: FutT6,
287    f7: FutT7,
288    f8: FutT8,
289) -> (
290    FutT1::Output,
291    FutT2::Output,
292    FutT3::Output,
293    FutT4::Output,
294    FutT5::Output,
295    FutT6::Output,
296    FutT7::Output,
297    FutT8::Output,
298)
299where
300    FutT1: Future,
301    FutT2: Future,
302    FutT3: Future,
303    FutT4: Future,
304    FutT5: Future,
305    FutT6: Future,
306    FutT7: Future,
307    FutT8: Future,
308{
309    let mut res = std::mem::MaybeUninit::<(
310        FutT1::Output,
311        FutT2::Output,
312        FutT3::Output,
313        FutT4::Output,
314        FutT5::Output,
315        FutT6::Output,
316        FutT7::Output,
317        FutT8::Output,
318    )>::uninit();
319
320    pinned_any_of!(stream, f1, f2, f3, f4, f5, f6, f7, f8);
321
322    unsafe {
323        let ptr = res.as_mut_ptr();
324        while let Some(val) = stream.next().await {
325            match val {
326                crate::OneOf8::First(x) => (*ptr).0 = x,
327                crate::OneOf8::Second(x) => (*ptr).1 = x,
328                crate::OneOf8::Third(x) => (*ptr).2 = x,
329                crate::OneOf8::Fourth(x) => (*ptr).3 = x,
330                crate::OneOf8::Fifth(x) => (*ptr).4 = x,
331                crate::OneOf8::Sixth(x) => (*ptr).5 = x,
332                crate::OneOf8::Seventh(x) => (*ptr).6 = x,
333                crate::OneOf8::Eighth(x) => (*ptr).7 = x,
334            }
335        }
336    }
337    unsafe { res.assume_init() }
338}