async_select_all/
lib.rs

1// Copyright 2020 CoD Technologies Corp.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Select over a list of futures.
16//!
17//! ## Usage
18//!
19//! ```
20//! use async_select_all::SelectAll;
21//! use futures::executor::block_on;
22//!
23//! async fn inc(i: i32) -> i32 {
24//!     i + 1
25//! }
26//!
27//! fn main() {
28//!     let futures = vec![inc(10), inc(5)];
29//!     let mut select_all = SelectAll::from(futures);
30//!     let vec = block_on(async {
31//!         let mut vec = Vec::with_capacity(select_all.len());
32//!         while !select_all.is_empty() {
33//!             let val = select_all.select().await;
34//!             vec.push(val)
35//!         }
36//!         vec.sort();
37//!         vec
38//!     });
39//!     assert_eq!(vec, vec![6, 11]);
40//! }
41//! ```
42
43use pin_project_lite::pin_project;
44use std::future::Future;
45use std::pin::Pin;
46use std::task::{Context, Poll};
47
48pin_project! {
49    struct SelectFuture<'a, F: Future> {
50        #[pin] futures: &'a mut Vec<F> ,
51    }
52}
53
54impl<'a, F: Future> SelectFuture<'a, F> {
55    #[inline]
56    fn new(futures: &'a mut Vec<F>) -> Self {
57        Self { futures }
58    }
59}
60
61impl<'a, F: Future> Future for SelectFuture<'a, F> {
62    type Output = F::Output;
63
64    #[inline]
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let mut ready = None;
67
68        let mut this = self.project();
69        for (i, f) in this.futures.iter_mut().enumerate() {
70            let p = unsafe { Pin::new_unchecked(f) };
71            if let Poll::Ready(output) = p.poll(cx) {
72                ready = Some((i, Poll::Ready(output)));
73                break;
74            }
75        }
76
77        if let Some((id, r)) = ready {
78            this.futures.swap_remove(id);
79            return r;
80        }
81
82        Poll::Pending
83    }
84}
85
86/// An unbounded set of futures.
87pub struct SelectAll<F: Future> {
88    futures: Vec<F>,
89}
90
91impl<I> From<I> for SelectAll<I::Item>
92where
93    I: IntoIterator,
94    I::Item: Future,
95{
96    #[inline]
97    fn from(iter: I) -> Self {
98        Self {
99            futures: iter.into_iter().collect(),
100        }
101    }
102}
103
104impl<F: Future> SelectAll<F> {
105    /// Constructs a new, empty `SelectAll`.
106    /// The returned `SelectAll` does not contain any futures.
107    #[allow(clippy::new_without_default)]
108    #[inline]
109    pub fn new() -> Self {
110        SelectAll {
111            futures: Vec::new(),
112        }
113    }
114
115    /// Returns the number of futures contained in the set.
116    /// This represents the total number of in-flight futures.
117    #[inline]
118    pub fn len(&self) -> usize {
119        self.futures.len()
120    }
121
122    /// Returns true if the set contains no futures.
123    #[inline]
124    pub fn is_empty(&self) -> bool {
125        self.futures.is_empty()
126    }
127
128    /// Push a future into the set.
129    /// This function submits the given future to the set for managing.
130    /// This function will not call poll on the submitted future.
131    /// The caller must ensure that `SelectAll::select` is called in order to receive task notifications.
132    #[inline]
133    pub fn push(&mut self, future: F) {
134        self.futures.push(future);
135    }
136
137    /// Select over a list of futures.
138    ///
139    /// Upon completion the item resolved will be returned.
140    ///
141    /// There are no guarantees provided on the order of the list with the remaining futures.
142    /// They might be swapped around, reversed, or completely random.
143    ///
144    /// # Panics
145    /// This function will panic if the `SelectAll` specified contains no items.
146    #[inline]
147    pub async fn select(&mut self) -> F::Output {
148        assert!(!self.futures.is_empty());
149        SelectFuture::new(&mut self.futures).await
150    }
151}