futures_concurrency/future/future_group.rs
1use alloc::collections::BTreeSet;
2use core::fmt::{self, Debug};
3use core::ops::{Deref, DerefMut};
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use futures_core::stream::Stream;
7use futures_core::Future;
8use slab::Slab;
9
10use crate::utils::{PollState, PollVec, WakerVec};
11
12/// A growable group of futures which act as a single unit.
13///
14/// # Example
15///
16/// **Basic example**
17///
18/// ```rust
19/// use futures_concurrency::future::FutureGroup;
20/// use futures_lite::StreamExt;
21/// use std::future;
22///
23/// # futures_lite::future::block_on(async {
24/// let mut group = FutureGroup::new();
25/// group.insert(future::ready(2));
26/// group.insert(future::ready(4));
27///
28/// let mut out = 0;
29/// while let Some(num) = group.next().await {
30/// out += num;
31/// }
32/// assert_eq!(out, 6);
33/// # });
34/// ```
35///
36/// **Update the group on every iteration**
37///
38/// ```
39/// use futures_concurrency::future::FutureGroup;
40/// use lending_stream::prelude::*;
41/// use std::future;
42///
43/// # fn main() { futures_lite::future::block_on(async {
44/// let mut group = FutureGroup::new();
45/// group.insert(future::ready(4));
46///
47/// let mut index = 3;
48/// let mut out = 0;
49/// let mut group = group.lend_mut();
50/// while let Some((group, num)) = group.next().await {
51/// if index != 0 {
52/// group.insert(future::ready(index));
53/// index -= 1;
54/// }
55/// out += num;
56/// }
57/// assert_eq!(out, 10);
58/// # });}
59/// ```
60#[must_use = "`FutureGroup` does nothing if not iterated over"]
61#[pin_project::pin_project]
62pub struct FutureGroup<F> {
63 #[pin]
64 futures: Slab<F>,
65 wakers: WakerVec,
66 states: PollVec,
67 keys: BTreeSet<usize>,
68 capacity: usize,
69}
70
71impl<T: Debug> Debug for FutureGroup<T> {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("FutureGroup")
74 .field("slab", &"[..]")
75 .field("len", &self.len())
76 .field("capacity", &self.capacity)
77 .finish()
78 }
79}
80
81impl<T> Default for FutureGroup<T> {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl<F> FutureGroup<F> {
88 /// Create a new instance of `FutureGroup`.
89 ///
90 /// # Example
91 ///
92 /// ```rust
93 /// use futures_concurrency::future::FutureGroup;
94 ///
95 /// let group = FutureGroup::new();
96 /// # let group: FutureGroup<usize> = group;
97 /// ```
98 pub fn new() -> Self {
99 Self::with_capacity(0)
100 }
101
102 /// Create a new instance of `FutureGroup` with a given capacity.
103 ///
104 /// # Example
105 ///
106 /// ```rust
107 /// use futures_concurrency::future::FutureGroup;
108 ///
109 /// let group = FutureGroup::with_capacity(2);
110 /// # let group: FutureGroup<usize> = group;
111 /// ```
112 pub fn with_capacity(capacity: usize) -> Self {
113 Self {
114 futures: Slab::with_capacity(capacity),
115 wakers: WakerVec::new(capacity),
116 states: PollVec::new(capacity),
117 keys: BTreeSet::new(),
118 capacity,
119 }
120 }
121
122 /// Return the number of futures currently active in the group.
123 ///
124 /// # Example
125 ///
126 /// ```rust
127 /// use futures_concurrency::future::FutureGroup;
128 /// use futures_lite::StreamExt;
129 /// use std::future;
130 ///
131 /// let mut group = FutureGroup::with_capacity(2);
132 /// assert_eq!(group.len(), 0);
133 /// group.insert(future::ready(12));
134 /// assert_eq!(group.len(), 1);
135 /// ```
136 #[inline(always)]
137 pub fn len(&self) -> usize {
138 self.futures.len()
139 }
140
141 /// Return the capacity of the `FutureGroup`.
142 ///
143 /// # Example
144 ///
145 /// ```rust
146 /// use futures_concurrency::future::FutureGroup;
147 /// use futures_lite::stream;
148 ///
149 /// let group = FutureGroup::with_capacity(2);
150 /// assert_eq!(group.capacity(), 2);
151 /// # let group: FutureGroup<usize> = group;
152 /// ```
153 pub fn capacity(&self) -> usize {
154 self.capacity
155 }
156
157 /// Returns true if there are no futures currently active in the group.
158 ///
159 /// # Example
160 ///
161 /// ```rust
162 /// use futures_concurrency::future::FutureGroup;
163 /// use std::future;
164 ///
165 /// let mut group = FutureGroup::with_capacity(2);
166 /// assert!(group.is_empty());
167 /// group.insert(future::ready(12));
168 /// assert!(!group.is_empty());
169 /// ```
170 pub fn is_empty(&self) -> bool {
171 self.futures.is_empty()
172 }
173
174 /// Removes a stream from the group. Returns whether the value was present in
175 /// the group.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// use futures_concurrency::future::FutureGroup;
181 /// use std::future;
182 ///
183 /// # futures_lite::future::block_on(async {
184 /// let mut group = FutureGroup::new();
185 /// let key = group.insert(future::ready(4));
186 /// assert_eq!(group.len(), 1);
187 /// group.remove(key);
188 /// assert_eq!(group.len(), 0);
189 /// # })
190 /// ```
191 pub fn remove(&mut self, key: Key) -> bool {
192 let is_present = self.keys.remove(&key.0);
193 if is_present {
194 self.states[key.0].set_none();
195 self.futures.remove(key.0);
196 }
197 is_present
198 }
199
200 /// Returns `true` if the `FutureGroup` contains a value for the specified key.
201 ///
202 /// # Example
203 ///
204 /// ```
205 /// use futures_concurrency::future::FutureGroup;
206 /// use std::future;
207 ///
208 /// # futures_lite::future::block_on(async {
209 /// let mut group = FutureGroup::new();
210 /// let key = group.insert(future::ready(4));
211 /// assert!(group.contains_key(key));
212 /// group.remove(key);
213 /// assert!(!group.contains_key(key));
214 /// # })
215 /// ```
216 pub fn contains_key(&mut self, key: Key) -> bool {
217 self.keys.contains(&key.0)
218 }
219
220 /// Reserves capacity for `additional` more futures to be inserted.
221 /// Does nothing if the capacity is already sufficient.
222 ///
223 /// # Example
224 ///
225 /// ```rust
226 /// use futures_concurrency::future::FutureGroup;
227 /// use std::future::Ready;
228 /// # futures_lite::future::block_on(async {
229 /// let mut group: FutureGroup<Ready<usize>> = FutureGroup::with_capacity(0);
230 /// assert_eq!(group.capacity(), 0);
231 /// group.reserve(10);
232 /// assert_eq!(group.capacity(), 10);
233 ///
234 /// // does nothing if capacity is sufficient
235 /// group.reserve(5);
236 /// assert_eq!(group.capacity(), 10);
237 /// # })
238 /// ```
239 pub fn reserve(&mut self, additional: usize) {
240 if self.len() + additional < self.capacity {
241 return;
242 }
243 let new_cap = self.capacity + additional;
244 self.wakers.resize(new_cap);
245 self.states.resize(new_cap);
246 self.futures.reserve_exact(additional);
247 self.capacity = new_cap;
248 }
249}
250
251impl<F: Future> FutureGroup<F> {
252 /// Insert a new future into the group.
253 ///
254 /// # Example
255 ///
256 /// ```rust
257 /// use futures_concurrency::future::FutureGroup;
258 /// use std::future;
259 ///
260 /// let mut group = FutureGroup::with_capacity(2);
261 /// group.insert(future::ready(12));
262 /// ```
263 pub fn insert(&mut self, future: F) -> Key
264 where
265 F: Future,
266 {
267 if self.capacity <= self.len() {
268 self.reserve(self.capacity * 2 + 1);
269 }
270
271 let index = self.futures.insert(future);
272 self.keys.insert(index);
273
274 // Set the corresponding state
275 self.states[index].set_pending();
276 self.wakers.readiness().set_ready(index);
277
278 Key(index)
279 }
280
281 #[allow(unused)]
282 /// Insert a value into a pinned `FutureGroup`
283 ///
284 /// This method is private because it serves as an implementation detail for
285 /// `ConcurrentStream`. We should never expose this publicly, as the entire
286 /// point of this crate is that we abstract the futures poll machinery away
287 /// from end-users.
288 pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key
289 where
290 F: Future,
291 {
292 let mut this = self.project();
293 // SAFETY: inserting a value into the futures slab does not ever move
294 // any of the existing values.
295 let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future);
296 this.keys.insert(index);
297 let key = Key(index);
298
299 // If our slab allocated more space we need to
300 // update our tracking structures along with it.
301 let max_len = this.futures.as_ref().capacity().max(index);
302 this.wakers.resize(max_len);
303 this.states.resize(max_len);
304
305 // Set the corresponding state
306 this.states[index].set_pending();
307 let mut readiness = this.wakers.readiness();
308 readiness.set_ready(index);
309
310 key
311 }
312
313 /// Create a stream which also yields the key of each item.
314 ///
315 /// # Example
316 ///
317 /// ```rust
318 /// use futures_concurrency::future::FutureGroup;
319 /// use futures_lite::StreamExt;
320 /// use std::future;
321 ///
322 /// # futures_lite::future::block_on(async {
323 /// let mut group = FutureGroup::new();
324 /// group.insert(future::ready(2));
325 /// group.insert(future::ready(4));
326 ///
327 /// let mut out = 0;
328 /// let mut group = group.keyed();
329 /// while let Some((_key, num)) = group.next().await {
330 /// out += num;
331 /// }
332 /// assert_eq!(out, 6);
333 /// # });
334 /// ```
335 pub fn keyed(self) -> Keyed<F> {
336 Keyed { group: self }
337 }
338}
339
340impl<F: Future> FutureGroup<F> {
341 fn poll_next_inner(
342 self: Pin<&mut Self>,
343 cx: &Context<'_>,
344 ) -> Poll<Option<(Key, <F as Future>::Output)>> {
345 let mut this = self.project();
346
347 // Short-circuit if we have no futures to iterate over
348 if this.futures.is_empty() {
349 return Poll::Ready(None);
350 }
351
352 // Set the top-level waker and check readiness
353 let mut readiness = this.wakers.readiness();
354 readiness.set_waker(cx.waker());
355 if !readiness.any_ready() {
356 // Nothing is ready yet
357 return Poll::Pending;
358 }
359
360 // Setup our futures state
361 let mut ret = Poll::Pending;
362 let states = this.states;
363
364 // SAFETY: We unpin the future group so we can later individually access
365 // single futures. Either to read from them or to drop them.
366 let futures = unsafe { this.futures.as_mut().get_unchecked_mut() };
367
368 for index in this.keys.iter().cloned() {
369 if states[index].is_pending() && readiness.clear_ready(index) {
370 // unlock readiness so we don't deadlock when polling
371 #[allow(clippy::drop_non_drop)]
372 drop(readiness);
373
374 // Obtain the intermediate waker.
375 let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
376
377 // SAFETY: this future here is a projection from the futures
378 // vec, which we're reading from.
379 let future = unsafe { Pin::new_unchecked(&mut futures[index]) };
380 match future.poll(&mut cx) {
381 Poll::Ready(item) => {
382 // Set the return type for the function
383 ret = Poll::Ready(Some((Key(index), item)));
384
385 // Remove all associated data with the future
386 // The only data we can't remove directly is the key entry.
387 states[index] = PollState::None;
388 futures.remove(index);
389
390 break;
391 }
392 // Keep looping if there is nothing for us to do
393 Poll::Pending => {}
394 };
395
396 // Lock readiness so we can use it again
397 readiness = this.wakers.readiness();
398 }
399 }
400
401 // Now that we're no longer borrowing `this.keys` we can remove
402 // the current key from the set
403 if let Poll::Ready(Some((key, _))) = ret {
404 this.keys.remove(&key.0);
405 }
406
407 ret
408 }
409}
410
411impl<F: Future> Stream for FutureGroup<F> {
412 type Item = <F as Future>::Output;
413
414 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
415 match self.poll_next_inner(cx) {
416 Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)),
417 Poll::Ready(None) => Poll::Ready(None),
418 Poll::Pending => Poll::Pending,
419 }
420 }
421}
422
423impl<F: Future> Extend<F> for FutureGroup<F> {
424 fn extend<T: IntoIterator<Item = F>>(&mut self, iter: T) {
425 let iter = iter.into_iter();
426 let len = iter.size_hint().1.unwrap_or_default();
427 self.reserve(len);
428
429 for future in iter {
430 self.insert(future);
431 }
432 }
433}
434
435impl<F: Future> FromIterator<F> for FutureGroup<F> {
436 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
437 let mut this = Self::new();
438 this.extend(iter);
439 this
440 }
441}
442
443/// A key used to index into the `FutureGroup` type.
444#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
445pub struct Key(usize);
446
447/// Iterate over items in the futures group with their associated keys.
448#[derive(Debug)]
449#[pin_project::pin_project]
450pub struct Keyed<F: Future> {
451 #[pin]
452 group: FutureGroup<F>,
453}
454
455impl<F: Future> Deref for Keyed<F> {
456 type Target = FutureGroup<F>;
457
458 fn deref(&self) -> &Self::Target {
459 &self.group
460 }
461}
462
463impl<F: Future> DerefMut for Keyed<F> {
464 fn deref_mut(&mut self) -> &mut Self::Target {
465 &mut self.group
466 }
467}
468
469impl<F: Future> Stream for Keyed<F> {
470 type Item = (Key, <F as Future>::Output);
471
472 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
473 let mut this = self.project();
474 this.group.as_mut().poll_next_inner(cx)
475 }
476}
477
478#[cfg(test)]
479mod test {
480 use super::FutureGroup;
481 use core::future;
482 use futures_lite::prelude::*;
483
484 #[test]
485 fn smoke() {
486 futures_lite::future::block_on(async {
487 let mut group = FutureGroup::new();
488 group.insert(future::ready(2));
489 group.insert(future::ready(4));
490
491 let mut out = 0;
492 while let Some(num) = group.next().await {
493 out += num;
494 }
495 assert_eq!(out, 6);
496 assert_eq!(group.len(), 0);
497 assert!(group.is_empty());
498 });
499 }
500
501 #[test]
502 fn capacity_grow_on_insert() {
503 futures_lite::future::block_on(async {
504 let mut group = FutureGroup::new();
505 let cap = group.capacity();
506
507 group.insert(future::ready(1));
508
509 assert!(group.capacity() > cap);
510 });
511 }
512}