safina_select/
lib.rs

1//! # ARCHIVED ARCHIVED ARCHIVED
2//! This crate is archived and will not be updated.
3//!
4//! The code is now at
5//! [`safina::select`](https://docs.rs/safina/latest/safina/select/) in the
6//! [`safina`](https://crates.io/crates/safina) crate.
7//!
8//! ----
9//!
10//! # safina-select
11//!
12//! This is a Rust library for awaiting multiple futures
13//! and getting the value of the first one that completes.
14//!
15//! It is part of [`safina`](https://crates.io/crates/safina), a safe async runtime.
16//!
17//! # Features
18//! - `forbid(unsafe_code)`
19//! - Depends only on `std`
20//! - Good test coverage (96%)
21//! - Works with [`safina-executor`](https://crates.io/crates/safina-executor)
22//!   or any async executor
23//!
24//! # Limitations
25//! - Can await 2-5 futures.  Nest them if you need more.
26//!
27//! # Examples
28//! ```rust
29//! use safina_async_test::async_test;
30//! use safina_select::{select_ab, OptionAb};
31//! # async fn make_new(_: String) -> Result<(), String> { Ok(()) }
32//! # async fn get_from_pool(_: String) -> Result<(), String> { Ok(()) }
33//! # #[async_test]
34//! # async fn test1() -> Result<(), String> {
35//! #     let addr = String::new();
36//! let conn = match select_ab(make_new(addr.clone()), get_from_pool(addr.clone())).await {
37//!     OptionAb::A(result) => result?,
38//!     OptionAb::B(result) => result?,
39//! };
40//! // When both futures return the same type, you can use `take`:
41//! let conn = select_ab(make_new(addr.clone()), get_from_pool(addr.clone())).await.take()?;
42//! #     Ok(())
43//! # }
44//! ```
45//! ```rust
46//! use safina_async_test::async_test;
47//! use safina_select::{select_ab, OptionAb};
48//! # async fn read_data() -> Result<(), String> { Ok(()) }
49//! # #[async_test]
50//! # async fn test1() -> Result<(), String> {
51//! #     let deadline = std::time::Instant::now();
52//! safina_timer::start_timer_thread();
53//! let data = match select_ab(read_data(), safina_timer::sleep_until(deadline)).await {
54//!     OptionAb::A(result) => Ok(result?),
55//!     OptionAb::B(()) => Err("timeout"),
56//! };
57//! #     Ok(())
58//! # }
59//! ```
60//!
61//! # Documentation
62//! <https://docs.rs/safina-select>
63//!
64//! # TO DO - Alternatives
65//! - [`tokio::select`](https://docs.rs/tokio/latest/tokio/macro.select.html)
66//!   - very popular
67//!   - Fast
68//!   - internally incredibly complicated
69//!   - full of `unsafe`
70//! - [`futures::select`](https://docs.rs/futures/latest/futures/macro.select.html)
71//!   - very popular
72//!   - proc macro, very complicated
73//!   - contains a little `unsafe` code
74//!
75//! # Changelog
76//! - V0.1.4 - Update docs.
77//! - v0.1.3 - Rename `OptionAB` to `OptionAb`, etc.
78//! - v0.1.2 - Satisfy pedantic clippy
79//! - v0.1.1 - Add badges to readme.  Rename `safina` package to `safina-executor`.
80//! - v0.1.0 - First published version
81//!
82//! # TO DO
83//!
84//! # Release Process
85//! 1. Edit `Cargo.toml` and bump version number.
86//! 1. Run `./release.sh`
87#![forbid(unsafe_code)]
88#![allow(clippy::many_single_char_names)]
89
90mod option_ab;
91mod option_abc;
92mod option_abcd;
93mod option_abcde;
94pub use option_ab::*;
95pub use option_abc::*;
96pub use option_abcd::*;
97pub use option_abcde::*;
98
99use core::future::Future;
100use core::pin::Pin;
101use core::task::{Context, Poll};
102
103struct PendingFuture;
104impl Unpin for PendingFuture {}
105impl Future for PendingFuture {
106    type Output = ();
107    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
108        Poll::Pending
109    }
110}
111
112/// A future that polls two futures
113/// and returns the value from the one that completes first.
114#[must_use = "futures stay idle unless you await them"]
115pub struct SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
116where
117    FutA: Future<Output = A> + Send + Unpin + 'static,
118    FutB: Future<Output = B> + Send + Unpin + 'static,
119    FutC: Future<Output = C> + Send + Unpin + 'static,
120    FutD: Future<Output = D> + Send + Unpin + 'static,
121    FutE: Future<Output = E> + Send + Unpin + 'static,
122{
123    a: FutA,
124    b: FutB,
125    c: FutC,
126    d: FutD,
127    e: FutE,
128}
129
130impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
131    SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
132where
133    FutA: Future<Output = A> + Send + Unpin + 'static,
134    FutB: Future<Output = B> + Send + Unpin + 'static,
135    FutC: Future<Output = C> + Send + Unpin + 'static,
136    FutD: Future<Output = D> + Send + Unpin + 'static,
137    FutE: Future<Output = E> + Send + Unpin + 'static,
138{
139    /// Makes a future that awaits all of the provided futures,
140    /// and returns the value from the one that completes first.
141    ///
142    /// Note that they must be
143    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
144    /// Use [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
145    /// to make them Unpin.
146    /// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
147    /// to do it with unsafe code that does not allocate memory.
148    pub fn new(
149        a: FutA,
150        b: FutB,
151        c: FutC,
152        d: FutD,
153        e: FutE,
154    ) -> SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> {
155        SelectFuture { a, b, c, d, e }
156    }
157}
158
159impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> Future
160    for SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
161where
162    FutA: Future<Output = A> + Send + Unpin + 'static,
163    FutB: Future<Output = B> + Send + Unpin + 'static,
164    FutC: Future<Output = C> + Send + Unpin + 'static,
165    FutD: Future<Output = D> + Send + Unpin + 'static,
166    FutE: Future<Output = E> + Send + Unpin + 'static,
167{
168    type Output = OptionAbcde<A, B, C, D, E>;
169
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        let mut_self = self.get_mut();
172        // "Note that on multiple calls to poll, only the Waker from the Context
173        // passed to the most recent call should be scheduled to receive a wakeup."
174        // https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
175        // There is a race condition between a worker thread calling a waker and
176        // the poll function saving a new waker.
177        //
178        // With SelectFuture, we can potentially have two worker threads calling the
179        // same waker.  The waker could get called multiple times, even after
180        // the future has completed.  The docs don't say whether this is allowed
181        // or not.  If this becomes a problem, we can add code to prevent it.
182        match Pin::new(&mut mut_self.a).poll(cx) {
183            Poll::Ready(value) => return Poll::Ready(OptionAbcde::A(value)),
184            Poll::Pending => {}
185        }
186        match Pin::new(&mut mut_self.b).poll(cx) {
187            Poll::Ready(value) => return Poll::Ready(OptionAbcde::B(value)),
188            Poll::Pending => {}
189        }
190        match Pin::new(&mut mut_self.c).poll(cx) {
191            Poll::Ready(value) => return Poll::Ready(OptionAbcde::C(value)),
192            Poll::Pending => {}
193        }
194        match Pin::new(&mut mut_self.d).poll(cx) {
195            Poll::Ready(value) => return Poll::Ready(OptionAbcde::D(value)),
196            Poll::Pending => {}
197        }
198        match Pin::new(&mut mut_self.e).poll(cx) {
199            Poll::Ready(value) => return Poll::Ready(OptionAbcde::E(value)),
200            Poll::Pending => {}
201        }
202        Poll::Pending
203    }
204}
205
206/// Awaits both futures and returns the value from the one that completes first.
207///
208/// First moves them to the heap, to make them
209/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
210/// Use
211/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
212/// to avoid allocating on the heap.
213#[allow(clippy::missing_panics_doc)]
214pub async fn select_ab<A, B, FutA, FutB>(a: FutA, b: FutB) -> OptionAb<A, B>
215where
216    FutA: Future<Output = A> + Send + 'static,
217    FutB: Future<Output = B> + Send + 'static,
218{
219    match SelectFuture::new(
220        Box::pin(a),
221        Box::pin(b),
222        PendingFuture {},
223        PendingFuture {},
224        PendingFuture {},
225    )
226    .await
227    {
228        OptionAbcde::A(value) => OptionAb::A(value),
229        OptionAbcde::B(value) => OptionAb::B(value),
230        _ => unreachable!(),
231    }
232}
233
234/// Awaits the futures and returns the value from the one that completes first.
235///
236/// First moves them to the heap, to make them
237/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
238/// Use
239/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
240/// to avoid allocating on the heap.
241#[allow(clippy::missing_panics_doc)]
242pub async fn select_abc<A, B, C, FutA, FutB, FutC>(a: FutA, b: FutB, c: FutC) -> OptionAbc<A, B, C>
243where
244    FutA: Future<Output = A> + Send + 'static,
245    FutB: Future<Output = B> + Send + 'static,
246    FutC: Future<Output = C> + Send + 'static,
247{
248    match SelectFuture::new(
249        Box::pin(a),
250        Box::pin(b),
251        Box::pin(c),
252        PendingFuture {},
253        PendingFuture {},
254    )
255    .await
256    {
257        OptionAbcde::A(value) => OptionAbc::A(value),
258        OptionAbcde::B(value) => OptionAbc::B(value),
259        OptionAbcde::C(value) => OptionAbc::C(value),
260        _ => unreachable!(),
261    }
262}
263
264/// Awaits the futures and returns the value from the one that completes first.
265///
266/// First moves them to the heap, to make them
267/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
268/// Use
269/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
270/// to avoid allocating on the heap.
271#[allow(clippy::missing_panics_doc)]
272pub async fn select_abcd<A, B, C, D, FutA, FutB, FutC, FutD>(
273    a: FutA,
274    b: FutB,
275    c: FutC,
276    d: FutD,
277) -> OptionAbcd<A, B, C, D>
278where
279    FutA: Future<Output = A> + Send + 'static,
280    FutB: Future<Output = B> + Send + 'static,
281    FutC: Future<Output = C> + Send + 'static,
282    FutD: Future<Output = D> + Send + 'static,
283{
284    match SelectFuture::new(
285        Box::pin(a),
286        Box::pin(b),
287        Box::pin(c),
288        Box::pin(d),
289        PendingFuture {},
290    )
291    .await
292    {
293        OptionAbcde::A(value) => OptionAbcd::A(value),
294        OptionAbcde::B(value) => OptionAbcd::B(value),
295        OptionAbcde::C(value) => OptionAbcd::C(value),
296        OptionAbcde::D(value) => OptionAbcd::D(value),
297        OptionAbcde::E(_) => unreachable!(),
298    }
299}
300
301/// Awaits the futures and returns the value from the one that completes first.
302///
303/// First moves them to the heap, to make them
304/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
305/// Use
306/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
307/// to avoid allocating on the heap.
308pub async fn select_abcde<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>(
309    a: FutA,
310    b: FutB,
311    c: FutC,
312    d: FutD,
313    e: FutE,
314) -> OptionAbcde<A, B, C, D, E>
315where
316    FutA: Future<Output = A> + Send + 'static,
317    FutB: Future<Output = B> + Send + 'static,
318    FutC: Future<Output = C> + Send + 'static,
319    FutD: Future<Output = D> + Send + 'static,
320    FutE: Future<Output = E> + Send + 'static,
321{
322    SelectFuture::new(
323        Box::pin(a),
324        Box::pin(b),
325        Box::pin(c),
326        Box::pin(d),
327        Box::pin(e),
328    )
329    .await
330}
331
332#[cfg(test)]
333#[allow(clippy::float_cmp)]
334mod tests {
335    use super::*;
336    use core::ops::Range;
337    use core::time::Duration;
338    use safina_async_test::async_test;
339    use safina_timer::sleep_for;
340    use std::time::Instant;
341
342    pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
343        assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
344        let elapsed = before.elapsed();
345        let duration_range =
346            Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
347        assert!(
348            duration_range.contains(&elapsed),
349            "{:?} elapsed, out of range {:?}",
350            elapsed,
351            duration_range
352        );
353    }
354
355    #[async_test]
356    async fn should_return_proper_types() {
357        let _: OptionAb<u8, bool> = select_ab(async { 42_u8 }, async { true }).await;
358        let _: OptionAbc<u8, bool, &'static str> =
359            select_abc(async { 42_u8 }, async { true }, async { "s1" }).await;
360        let _: OptionAbcd<u8, bool, &'static str, usize> =
361            select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
362                7_usize
363            })
364            .await;
365        let _: OptionAbcde<u8, bool, &'static str, usize, f32> = select_abcde(
366            async { 42_u8 },
367            async { true },
368            async { "s1" },
369            async { 7_usize },
370            async { 0.99_f32 },
371        )
372        .await;
373    }
374
375    #[async_test]
376    async fn all_complete() {
377        select_ab(async { 42_u8 }, async { true }).await.unwrap_a();
378        select_abc(async { 42_u8 }, async { true }, async { "s1" })
379            .await
380            .unwrap_a();
381        select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
382            7_usize
383        })
384        .await
385        .unwrap_a();
386        select_abcde(
387            async { 42_u8 },
388            async { true },
389            async { "s1" },
390            async { 7_usize },
391            async { 0.99_f32 },
392        )
393        .await
394        .unwrap_a();
395    }
396
397    #[async_test]
398    async fn one_complete() {
399        let ready_a = || async { 42_u8 };
400        let ready_b = || async { true };
401        let ready_c = || async { "s1" };
402        let ready_d = || async { 7_usize };
403        let ready_e = || async { 0.99_f32 };
404        let wait_a = || async {
405            sleep_for(Duration::from_millis(10)).await;
406            42_u8
407        };
408        let wait_b = || async {
409            sleep_for(Duration::from_millis(10)).await;
410            true
411        };
412        let wait_c = || async {
413            sleep_for(Duration::from_millis(10)).await;
414            "s1"
415        };
416        let wait_d = || async {
417            sleep_for(Duration::from_millis(10)).await;
418            7_usize
419        };
420        let wait_e = || async {
421            sleep_for(Duration::from_millis(10)).await;
422            0.99_f32
423        };
424
425        assert_eq!(42_u8, select_ab(ready_a(), wait_b()).await.unwrap_a());
426        assert!(select_ab(wait_a(), ready_b()).await.unwrap_b());
427
428        assert_eq!(
429            42_u8,
430            select_abc(ready_a(), wait_b(), wait_c()).await.unwrap_a()
431        );
432        assert!(select_abc(wait_a(), ready_b(), wait_c()).await.unwrap_b());
433        assert_eq!(
434            "s1",
435            select_abc(wait_a(), wait_b(), ready_c()).await.unwrap_c()
436        );
437
438        assert_eq!(
439            42_u8,
440            select_abcd(ready_a(), wait_b(), wait_c(), wait_d())
441                .await
442                .unwrap_a()
443        );
444        assert!(select_abcd(wait_a(), ready_b(), wait_c(), wait_d())
445            .await
446            .unwrap_b());
447        assert_eq!(
448            "s1",
449            select_abcd(wait_a(), wait_b(), ready_c(), wait_d())
450                .await
451                .unwrap_c()
452        );
453        assert_eq!(
454            7_usize,
455            select_abcd(wait_a(), wait_b(), wait_c(), ready_d())
456                .await
457                .unwrap_d()
458        );
459
460        assert_eq!(
461            42_u8,
462            select_abcde(ready_a(), wait_b(), wait_c(), wait_d(), wait_e())
463                .await
464                .unwrap_a()
465        );
466        assert!(
467            select_abcde(wait_a(), ready_b(), wait_c(), wait_d(), wait_e())
468                .await
469                .unwrap_b()
470        );
471        assert_eq!(
472            "s1",
473            select_abcde(wait_a(), wait_b(), ready_c(), wait_d(), wait_e())
474                .await
475                .unwrap_c()
476        );
477        assert_eq!(
478            7_usize,
479            select_abcde(wait_a(), wait_b(), wait_c(), ready_d(), wait_e())
480                .await
481                .unwrap_d()
482        );
483        assert_eq!(
484            0.99_f32,
485            select_abcde(wait_a(), wait_b(), wait_c(), wait_d(), ready_e())
486                .await
487                .unwrap_e()
488        );
489    }
490
491    #[async_test]
492    async fn should_poll_all() {
493        let (sender_a, receiver_a) = std::sync::mpsc::channel::<()>();
494        let (sender_b, receiver_b) = std::sync::mpsc::channel::<()>();
495        let (sender_c, receiver_c) = std::sync::mpsc::channel::<()>();
496        let (sender_d, receiver_d) = std::sync::mpsc::channel::<()>();
497        let (sender_e, receiver_e) = std::sync::mpsc::channel::<()>();
498        let fut_a = async move {
499            sender_a.send(()).unwrap();
500            sleep_for(Duration::from_millis(10)).await;
501        };
502        let fut_b = async move {
503            sender_b.send(()).unwrap();
504            sleep_for(Duration::from_millis(10)).await;
505        };
506        let fut_c = async move {
507            sender_c.send(()).unwrap();
508            sleep_for(Duration::from_millis(10)).await;
509        };
510        let fut_d = async move {
511            sender_d.send(()).unwrap();
512            sleep_for(Duration::from_millis(10)).await;
513        };
514        let fut_e = async move {
515            sender_e.send(()).unwrap();
516            sleep_for(Duration::from_millis(10)).await;
517        };
518        select_abcde(fut_a, fut_b, fut_c, fut_d, fut_e).await;
519        receiver_a.recv().unwrap();
520        receiver_b.recv().unwrap();
521        receiver_c.recv().unwrap();
522        receiver_d.recv().unwrap();
523        receiver_e.recv().unwrap();
524    }
525
526    #[async_test]
527    async fn awaits_a() {
528        let before = Instant::now();
529        select_ab(
530            async move {
531                sleep_for(Duration::from_millis(100)).await;
532                42_u8
533            },
534            async move {
535                sleep_for(Duration::from_millis(200)).await;
536                true
537            },
538        )
539        .await
540        .unwrap_a();
541        expect_elapsed(before, 100..190);
542    }
543
544    #[async_test]
545    async fn awaits_b() {
546        let before = Instant::now();
547        select_ab(
548            async move {
549                sleep_for(Duration::from_millis(200)).await;
550                42_u8
551            },
552            async move {
553                sleep_for(Duration::from_millis(100)).await;
554                true
555            },
556        )
557        .await
558        .unwrap_b();
559        expect_elapsed(before, 100..190);
560    }
561
562    #[async_test]
563    async fn match_expression() {
564        match select_ab(async move { 42_u8 }, async move { true }).await {
565            OptionAb::A(42_u8) => {}
566            _ => unreachable!(),
567        }
568        match select_abcde(
569            async { 42_u8 },
570            async { true },
571            async { "s1" },
572            async { 7_usize },
573            async { 0.99_f32 },
574        )
575        .await
576        {
577            OptionAbcde::A(42_u8) => {}
578            OptionAbcde::B(true) | OptionAbcde::C("s1") | OptionAbcde::D(7_usize) => unreachable!(),
579            OptionAbcde::E(value) if value == 0.99_f32 => unreachable!(),
580            _ => unreachable!(),
581        }
582    }
583}