async_select_all/lib.rs
1// Copyright 2020 CoD Technologies Corp.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Select over a list of futures.
16//!
17//! ## Usage
18//!
19//! ```
20//! use async_select_all::SelectAll;
21//! use futures::executor::block_on;
22//!
23//! async fn inc(i: i32) -> i32 {
24//! i + 1
25//! }
26//!
27//! fn main() {
28//! let futures = vec![inc(10), inc(5)];
29//! let mut select_all = SelectAll::from(futures);
30//! let vec = block_on(async {
31//! let mut vec = Vec::with_capacity(select_all.len());
32//! while !select_all.is_empty() {
33//! let val = select_all.select().await;
34//! vec.push(val)
35//! }
36//! vec.sort();
37//! vec
38//! });
39//! assert_eq!(vec, vec![6, 11]);
40//! }
41//! ```
42
43use pin_project_lite::pin_project;
44use std::future::Future;
45use std::pin::Pin;
46use std::task::{Context, Poll};
47
48pin_project! {
49 struct SelectFuture<'a, F: Future> {
50 #[pin] futures: &'a mut Vec<F> ,
51 }
52}
53
54impl<'a, F: Future> SelectFuture<'a, F> {
55 #[inline]
56 fn new(futures: &'a mut Vec<F>) -> Self {
57 Self { futures }
58 }
59}
60
61impl<'a, F: Future> Future for SelectFuture<'a, F> {
62 type Output = F::Output;
63
64 #[inline]
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let mut ready = None;
67
68 let mut this = self.project();
69 for (i, f) in this.futures.iter_mut().enumerate() {
70 let p = unsafe { Pin::new_unchecked(f) };
71 if let Poll::Ready(output) = p.poll(cx) {
72 ready = Some((i, Poll::Ready(output)));
73 break;
74 }
75 }
76
77 if let Some((id, r)) = ready {
78 this.futures.swap_remove(id);
79 return r;
80 }
81
82 Poll::Pending
83 }
84}
85
86/// An unbounded set of futures.
87pub struct SelectAll<F: Future> {
88 futures: Vec<F>,
89}
90
91impl<I> From<I> for SelectAll<I::Item>
92where
93 I: IntoIterator,
94 I::Item: Future,
95{
96 #[inline]
97 fn from(iter: I) -> Self {
98 Self {
99 futures: iter.into_iter().collect(),
100 }
101 }
102}
103
104impl<F: Future> SelectAll<F> {
105 /// Constructs a new, empty `SelectAll`.
106 /// The returned `SelectAll` does not contain any futures.
107 #[allow(clippy::new_without_default)]
108 #[inline]
109 pub fn new() -> Self {
110 SelectAll {
111 futures: Vec::new(),
112 }
113 }
114
115 /// Returns the number of futures contained in the set.
116 /// This represents the total number of in-flight futures.
117 #[inline]
118 pub fn len(&self) -> usize {
119 self.futures.len()
120 }
121
122 /// Returns true if the set contains no futures.
123 #[inline]
124 pub fn is_empty(&self) -> bool {
125 self.futures.is_empty()
126 }
127
128 /// Push a future into the set.
129 /// This function submits the given future to the set for managing.
130 /// This function will not call poll on the submitted future.
131 /// The caller must ensure that `SelectAll::select` is called in order to receive task notifications.
132 #[inline]
133 pub fn push(&mut self, future: F) {
134 self.futures.push(future);
135 }
136
137 /// Select over a list of futures.
138 ///
139 /// Upon completion the item resolved will be returned.
140 ///
141 /// There are no guarantees provided on the order of the list with the remaining futures.
142 /// They might be swapped around, reversed, or completely random.
143 ///
144 /// # Panics
145 /// This function will panic if the `SelectAll` specified contains no items.
146 #[inline]
147 pub async fn select(&mut self) -> F::Output {
148 assert!(!self.futures.is_empty());
149 SelectFuture::new(&mut self.futures).await
150 }
151}