1#![forbid(unsafe_code)]
88#![allow(clippy::many_single_char_names)]
89
90mod option_ab;
91mod option_abc;
92mod option_abcd;
93mod option_abcde;
94pub use option_ab::*;
95pub use option_abc::*;
96pub use option_abcd::*;
97pub use option_abcde::*;
98
99use core::future::Future;
100use core::pin::Pin;
101use core::task::{Context, Poll};
102
103struct PendingFuture;
104impl Unpin for PendingFuture {}
105impl Future for PendingFuture {
106 type Output = ();
107 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
108 Poll::Pending
109 }
110}
111
112#[must_use = "futures stay idle unless you await them"]
115pub struct SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
116where
117 FutA: Future<Output = A> + Send + Unpin + 'static,
118 FutB: Future<Output = B> + Send + Unpin + 'static,
119 FutC: Future<Output = C> + Send + Unpin + 'static,
120 FutD: Future<Output = D> + Send + Unpin + 'static,
121 FutE: Future<Output = E> + Send + Unpin + 'static,
122{
123 a: FutA,
124 b: FutB,
125 c: FutC,
126 d: FutD,
127 e: FutE,
128}
129
130impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
131 SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
132where
133 FutA: Future<Output = A> + Send + Unpin + 'static,
134 FutB: Future<Output = B> + Send + Unpin + 'static,
135 FutC: Future<Output = C> + Send + Unpin + 'static,
136 FutD: Future<Output = D> + Send + Unpin + 'static,
137 FutE: Future<Output = E> + Send + Unpin + 'static,
138{
139 pub fn new(
149 a: FutA,
150 b: FutB,
151 c: FutC,
152 d: FutD,
153 e: FutE,
154 ) -> SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> {
155 SelectFuture { a, b, c, d, e }
156 }
157}
158
159impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> Future
160 for SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
161where
162 FutA: Future<Output = A> + Send + Unpin + 'static,
163 FutB: Future<Output = B> + Send + Unpin + 'static,
164 FutC: Future<Output = C> + Send + Unpin + 'static,
165 FutD: Future<Output = D> + Send + Unpin + 'static,
166 FutE: Future<Output = E> + Send + Unpin + 'static,
167{
168 type Output = OptionAbcde<A, B, C, D, E>;
169
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let mut_self = self.get_mut();
172 match Pin::new(&mut mut_self.a).poll(cx) {
183 Poll::Ready(value) => return Poll::Ready(OptionAbcde::A(value)),
184 Poll::Pending => {}
185 }
186 match Pin::new(&mut mut_self.b).poll(cx) {
187 Poll::Ready(value) => return Poll::Ready(OptionAbcde::B(value)),
188 Poll::Pending => {}
189 }
190 match Pin::new(&mut mut_self.c).poll(cx) {
191 Poll::Ready(value) => return Poll::Ready(OptionAbcde::C(value)),
192 Poll::Pending => {}
193 }
194 match Pin::new(&mut mut_self.d).poll(cx) {
195 Poll::Ready(value) => return Poll::Ready(OptionAbcde::D(value)),
196 Poll::Pending => {}
197 }
198 match Pin::new(&mut mut_self.e).poll(cx) {
199 Poll::Ready(value) => return Poll::Ready(OptionAbcde::E(value)),
200 Poll::Pending => {}
201 }
202 Poll::Pending
203 }
204}
205
206#[allow(clippy::missing_panics_doc)]
214pub async fn select_ab<A, B, FutA, FutB>(a: FutA, b: FutB) -> OptionAb<A, B>
215where
216 FutA: Future<Output = A> + Send + 'static,
217 FutB: Future<Output = B> + Send + 'static,
218{
219 match SelectFuture::new(
220 Box::pin(a),
221 Box::pin(b),
222 PendingFuture {},
223 PendingFuture {},
224 PendingFuture {},
225 )
226 .await
227 {
228 OptionAbcde::A(value) => OptionAb::A(value),
229 OptionAbcde::B(value) => OptionAb::B(value),
230 _ => unreachable!(),
231 }
232}
233
234#[allow(clippy::missing_panics_doc)]
242pub async fn select_abc<A, B, C, FutA, FutB, FutC>(a: FutA, b: FutB, c: FutC) -> OptionAbc<A, B, C>
243where
244 FutA: Future<Output = A> + Send + 'static,
245 FutB: Future<Output = B> + Send + 'static,
246 FutC: Future<Output = C> + Send + 'static,
247{
248 match SelectFuture::new(
249 Box::pin(a),
250 Box::pin(b),
251 Box::pin(c),
252 PendingFuture {},
253 PendingFuture {},
254 )
255 .await
256 {
257 OptionAbcde::A(value) => OptionAbc::A(value),
258 OptionAbcde::B(value) => OptionAbc::B(value),
259 OptionAbcde::C(value) => OptionAbc::C(value),
260 _ => unreachable!(),
261 }
262}
263
264#[allow(clippy::missing_panics_doc)]
272pub async fn select_abcd<A, B, C, D, FutA, FutB, FutC, FutD>(
273 a: FutA,
274 b: FutB,
275 c: FutC,
276 d: FutD,
277) -> OptionAbcd<A, B, C, D>
278where
279 FutA: Future<Output = A> + Send + 'static,
280 FutB: Future<Output = B> + Send + 'static,
281 FutC: Future<Output = C> + Send + 'static,
282 FutD: Future<Output = D> + Send + 'static,
283{
284 match SelectFuture::new(
285 Box::pin(a),
286 Box::pin(b),
287 Box::pin(c),
288 Box::pin(d),
289 PendingFuture {},
290 )
291 .await
292 {
293 OptionAbcde::A(value) => OptionAbcd::A(value),
294 OptionAbcde::B(value) => OptionAbcd::B(value),
295 OptionAbcde::C(value) => OptionAbcd::C(value),
296 OptionAbcde::D(value) => OptionAbcd::D(value),
297 OptionAbcde::E(_) => unreachable!(),
298 }
299}
300
301pub async fn select_abcde<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>(
309 a: FutA,
310 b: FutB,
311 c: FutC,
312 d: FutD,
313 e: FutE,
314) -> OptionAbcde<A, B, C, D, E>
315where
316 FutA: Future<Output = A> + Send + 'static,
317 FutB: Future<Output = B> + Send + 'static,
318 FutC: Future<Output = C> + Send + 'static,
319 FutD: Future<Output = D> + Send + 'static,
320 FutE: Future<Output = E> + Send + 'static,
321{
322 SelectFuture::new(
323 Box::pin(a),
324 Box::pin(b),
325 Box::pin(c),
326 Box::pin(d),
327 Box::pin(e),
328 )
329 .await
330}
331
332#[cfg(test)]
333#[allow(clippy::float_cmp)]
334mod tests {
335 use super::*;
336 use core::ops::Range;
337 use core::time::Duration;
338 use safina_async_test::async_test;
339 use safina_timer::sleep_for;
340 use std::time::Instant;
341
342 pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
343 assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
344 let elapsed = before.elapsed();
345 let duration_range =
346 Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
347 assert!(
348 duration_range.contains(&elapsed),
349 "{:?} elapsed, out of range {:?}",
350 elapsed,
351 duration_range
352 );
353 }
354
355 #[async_test]
356 async fn should_return_proper_types() {
357 let _: OptionAb<u8, bool> = select_ab(async { 42_u8 }, async { true }).await;
358 let _: OptionAbc<u8, bool, &'static str> =
359 select_abc(async { 42_u8 }, async { true }, async { "s1" }).await;
360 let _: OptionAbcd<u8, bool, &'static str, usize> =
361 select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
362 7_usize
363 })
364 .await;
365 let _: OptionAbcde<u8, bool, &'static str, usize, f32> = select_abcde(
366 async { 42_u8 },
367 async { true },
368 async { "s1" },
369 async { 7_usize },
370 async { 0.99_f32 },
371 )
372 .await;
373 }
374
375 #[async_test]
376 async fn all_complete() {
377 select_ab(async { 42_u8 }, async { true }).await.unwrap_a();
378 select_abc(async { 42_u8 }, async { true }, async { "s1" })
379 .await
380 .unwrap_a();
381 select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
382 7_usize
383 })
384 .await
385 .unwrap_a();
386 select_abcde(
387 async { 42_u8 },
388 async { true },
389 async { "s1" },
390 async { 7_usize },
391 async { 0.99_f32 },
392 )
393 .await
394 .unwrap_a();
395 }
396
397 #[async_test]
398 async fn one_complete() {
399 let ready_a = || async { 42_u8 };
400 let ready_b = || async { true };
401 let ready_c = || async { "s1" };
402 let ready_d = || async { 7_usize };
403 let ready_e = || async { 0.99_f32 };
404 let wait_a = || async {
405 sleep_for(Duration::from_millis(10)).await;
406 42_u8
407 };
408 let wait_b = || async {
409 sleep_for(Duration::from_millis(10)).await;
410 true
411 };
412 let wait_c = || async {
413 sleep_for(Duration::from_millis(10)).await;
414 "s1"
415 };
416 let wait_d = || async {
417 sleep_for(Duration::from_millis(10)).await;
418 7_usize
419 };
420 let wait_e = || async {
421 sleep_for(Duration::from_millis(10)).await;
422 0.99_f32
423 };
424
425 assert_eq!(42_u8, select_ab(ready_a(), wait_b()).await.unwrap_a());
426 assert!(select_ab(wait_a(), ready_b()).await.unwrap_b());
427
428 assert_eq!(
429 42_u8,
430 select_abc(ready_a(), wait_b(), wait_c()).await.unwrap_a()
431 );
432 assert!(select_abc(wait_a(), ready_b(), wait_c()).await.unwrap_b());
433 assert_eq!(
434 "s1",
435 select_abc(wait_a(), wait_b(), ready_c()).await.unwrap_c()
436 );
437
438 assert_eq!(
439 42_u8,
440 select_abcd(ready_a(), wait_b(), wait_c(), wait_d())
441 .await
442 .unwrap_a()
443 );
444 assert!(select_abcd(wait_a(), ready_b(), wait_c(), wait_d())
445 .await
446 .unwrap_b());
447 assert_eq!(
448 "s1",
449 select_abcd(wait_a(), wait_b(), ready_c(), wait_d())
450 .await
451 .unwrap_c()
452 );
453 assert_eq!(
454 7_usize,
455 select_abcd(wait_a(), wait_b(), wait_c(), ready_d())
456 .await
457 .unwrap_d()
458 );
459
460 assert_eq!(
461 42_u8,
462 select_abcde(ready_a(), wait_b(), wait_c(), wait_d(), wait_e())
463 .await
464 .unwrap_a()
465 );
466 assert!(
467 select_abcde(wait_a(), ready_b(), wait_c(), wait_d(), wait_e())
468 .await
469 .unwrap_b()
470 );
471 assert_eq!(
472 "s1",
473 select_abcde(wait_a(), wait_b(), ready_c(), wait_d(), wait_e())
474 .await
475 .unwrap_c()
476 );
477 assert_eq!(
478 7_usize,
479 select_abcde(wait_a(), wait_b(), wait_c(), ready_d(), wait_e())
480 .await
481 .unwrap_d()
482 );
483 assert_eq!(
484 0.99_f32,
485 select_abcde(wait_a(), wait_b(), wait_c(), wait_d(), ready_e())
486 .await
487 .unwrap_e()
488 );
489 }
490
491 #[async_test]
492 async fn should_poll_all() {
493 let (sender_a, receiver_a) = std::sync::mpsc::channel::<()>();
494 let (sender_b, receiver_b) = std::sync::mpsc::channel::<()>();
495 let (sender_c, receiver_c) = std::sync::mpsc::channel::<()>();
496 let (sender_d, receiver_d) = std::sync::mpsc::channel::<()>();
497 let (sender_e, receiver_e) = std::sync::mpsc::channel::<()>();
498 let fut_a = async move {
499 sender_a.send(()).unwrap();
500 sleep_for(Duration::from_millis(10)).await;
501 };
502 let fut_b = async move {
503 sender_b.send(()).unwrap();
504 sleep_for(Duration::from_millis(10)).await;
505 };
506 let fut_c = async move {
507 sender_c.send(()).unwrap();
508 sleep_for(Duration::from_millis(10)).await;
509 };
510 let fut_d = async move {
511 sender_d.send(()).unwrap();
512 sleep_for(Duration::from_millis(10)).await;
513 };
514 let fut_e = async move {
515 sender_e.send(()).unwrap();
516 sleep_for(Duration::from_millis(10)).await;
517 };
518 select_abcde(fut_a, fut_b, fut_c, fut_d, fut_e).await;
519 receiver_a.recv().unwrap();
520 receiver_b.recv().unwrap();
521 receiver_c.recv().unwrap();
522 receiver_d.recv().unwrap();
523 receiver_e.recv().unwrap();
524 }
525
526 #[async_test]
527 async fn awaits_a() {
528 let before = Instant::now();
529 select_ab(
530 async move {
531 sleep_for(Duration::from_millis(100)).await;
532 42_u8
533 },
534 async move {
535 sleep_for(Duration::from_millis(200)).await;
536 true
537 },
538 )
539 .await
540 .unwrap_a();
541 expect_elapsed(before, 100..190);
542 }
543
544 #[async_test]
545 async fn awaits_b() {
546 let before = Instant::now();
547 select_ab(
548 async move {
549 sleep_for(Duration::from_millis(200)).await;
550 42_u8
551 },
552 async move {
553 sleep_for(Duration::from_millis(100)).await;
554 true
555 },
556 )
557 .await
558 .unwrap_b();
559 expect_elapsed(before, 100..190);
560 }
561
562 #[async_test]
563 async fn match_expression() {
564 match select_ab(async move { 42_u8 }, async move { true }).await {
565 OptionAb::A(42_u8) => {}
566 _ => unreachable!(),
567 }
568 match select_abcde(
569 async { 42_u8 },
570 async { true },
571 async { "s1" },
572 async { 7_usize },
573 async { 0.99_f32 },
574 )
575 .await
576 {
577 OptionAbcde::A(42_u8) => {}
578 OptionAbcde::B(true) | OptionAbcde::C("s1") | OptionAbcde::D(7_usize) => unreachable!(),
579 OptionAbcde::E(value) if value == 0.99_f32 => unreachable!(),
580 _ => unreachable!(),
581 }
582 }
583}