futures_concurrency/future/
future_group.rs

1use alloc::collections::BTreeSet;
2use core::fmt::{self, Debug};
3use core::ops::{Deref, DerefMut};
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use futures_core::stream::Stream;
7use futures_core::Future;
8use slab::Slab;
9
10use crate::utils::{PollState, PollVec, WakerVec};
11
12/// A growable group of futures which act as a single unit.
13///
14/// # Example
15///
16/// **Basic example**
17///
18/// ```rust
19/// use futures_concurrency::future::FutureGroup;
20/// use futures_lite::StreamExt;
21/// use std::future;
22///
23/// # futures_lite::future::block_on(async {
24/// let mut group = FutureGroup::new();
25/// group.insert(future::ready(2));
26/// group.insert(future::ready(4));
27///
28/// let mut out = 0;
29/// while let Some(num) = group.next().await {
30///     out += num;
31/// }
32/// assert_eq!(out, 6);
33/// # });
34/// ```
35///
36/// **Update the group on every iteration**
37///
38/// ```
39/// use futures_concurrency::future::FutureGroup;
40/// use lending_stream::prelude::*;
41/// use std::future;
42///
43/// # fn main() { futures_lite::future::block_on(async {
44/// let mut group = FutureGroup::new();
45/// group.insert(future::ready(4));
46///
47/// let mut index = 3;
48/// let mut out = 0;
49/// let mut group = group.lend_mut();
50/// while let Some((group, num)) = group.next().await {
51///     if index != 0 {
52///         group.insert(future::ready(index));
53///         index -= 1;
54///     }
55///     out += num;
56/// }
57/// assert_eq!(out, 10);
58/// # });}
59/// ```
60#[must_use = "`FutureGroup` does nothing if not iterated over"]
61#[pin_project::pin_project]
62pub struct FutureGroup<F> {
63    #[pin]
64    futures: Slab<F>,
65    wakers: WakerVec,
66    states: PollVec,
67    keys: BTreeSet<usize>,
68    capacity: usize,
69}
70
71impl<T: Debug> Debug for FutureGroup<T> {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("FutureGroup")
74            .field("slab", &"[..]")
75            .field("len", &self.len())
76            .field("capacity", &self.capacity)
77            .finish()
78    }
79}
80
81impl<T> Default for FutureGroup<T> {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl<F> FutureGroup<F> {
88    /// Create a new instance of `FutureGroup`.
89    ///
90    /// # Example
91    ///
92    /// ```rust
93    /// use futures_concurrency::future::FutureGroup;
94    ///
95    /// let group = FutureGroup::new();
96    /// # let group: FutureGroup<usize> = group;
97    /// ```
98    pub fn new() -> Self {
99        Self::with_capacity(0)
100    }
101
102    /// Create a new instance of `FutureGroup` with a given capacity.
103    ///
104    /// # Example
105    ///
106    /// ```rust
107    /// use futures_concurrency::future::FutureGroup;
108    ///
109    /// let group = FutureGroup::with_capacity(2);
110    /// # let group: FutureGroup<usize> = group;
111    /// ```
112    pub fn with_capacity(capacity: usize) -> Self {
113        Self {
114            futures: Slab::with_capacity(capacity),
115            wakers: WakerVec::new(capacity),
116            states: PollVec::new(capacity),
117            keys: BTreeSet::new(),
118            capacity,
119        }
120    }
121
122    /// Return the number of futures currently active in the group.
123    ///
124    /// # Example
125    ///
126    /// ```rust
127    /// use futures_concurrency::future::FutureGroup;
128    /// use futures_lite::StreamExt;
129    /// use std::future;
130    ///
131    /// let mut group = FutureGroup::with_capacity(2);
132    /// assert_eq!(group.len(), 0);
133    /// group.insert(future::ready(12));
134    /// assert_eq!(group.len(), 1);
135    /// ```
136    #[inline(always)]
137    pub fn len(&self) -> usize {
138        self.futures.len()
139    }
140
141    /// Return the capacity of the `FutureGroup`.
142    ///
143    /// # Example
144    ///
145    /// ```rust
146    /// use futures_concurrency::future::FutureGroup;
147    /// use futures_lite::stream;
148    ///
149    /// let group = FutureGroup::with_capacity(2);
150    /// assert_eq!(group.capacity(), 2);
151    /// # let group: FutureGroup<usize> = group;
152    /// ```
153    pub fn capacity(&self) -> usize {
154        self.capacity
155    }
156
157    /// Returns true if there are no futures currently active in the group.
158    ///
159    /// # Example
160    ///
161    /// ```rust
162    /// use futures_concurrency::future::FutureGroup;
163    /// use std::future;
164    ///
165    /// let mut group = FutureGroup::with_capacity(2);
166    /// assert!(group.is_empty());
167    /// group.insert(future::ready(12));
168    /// assert!(!group.is_empty());
169    /// ```
170    pub fn is_empty(&self) -> bool {
171        self.futures.is_empty()
172    }
173
174    /// Removes a stream from the group. Returns whether the value was present in
175    /// the group.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use futures_concurrency::future::FutureGroup;
181    /// use std::future;
182    ///
183    /// # futures_lite::future::block_on(async {
184    /// let mut group = FutureGroup::new();
185    /// let key = group.insert(future::ready(4));
186    /// assert_eq!(group.len(), 1);
187    /// group.remove(key);
188    /// assert_eq!(group.len(), 0);
189    /// # })
190    /// ```
191    pub fn remove(&mut self, key: Key) -> bool {
192        let is_present = self.keys.remove(&key.0);
193        if is_present {
194            self.states[key.0].set_none();
195            self.futures.remove(key.0);
196        }
197        is_present
198    }
199
200    /// Returns `true` if the `FutureGroup` contains a value for the specified key.
201    ///
202    /// # Example
203    ///
204    /// ```
205    /// use futures_concurrency::future::FutureGroup;
206    /// use std::future;
207    ///
208    /// # futures_lite::future::block_on(async {
209    /// let mut group = FutureGroup::new();
210    /// let key = group.insert(future::ready(4));
211    /// assert!(group.contains_key(key));
212    /// group.remove(key);
213    /// assert!(!group.contains_key(key));
214    /// # })
215    /// ```
216    pub fn contains_key(&mut self, key: Key) -> bool {
217        self.keys.contains(&key.0)
218    }
219
220    /// Reserves capacity for `additional` more futures to be inserted.
221    /// Does nothing if the capacity is already sufficient.
222    ///
223    /// # Example
224    ///
225    /// ```rust
226    /// use futures_concurrency::future::FutureGroup;
227    /// use std::future::Ready;
228    /// # futures_lite::future::block_on(async {
229    /// let mut group: FutureGroup<Ready<usize>> = FutureGroup::with_capacity(0);
230    /// assert_eq!(group.capacity(), 0);
231    /// group.reserve(10);
232    /// assert_eq!(group.capacity(), 10);
233    ///
234    /// // does nothing if capacity is sufficient
235    /// group.reserve(5);
236    /// assert_eq!(group.capacity(), 10);
237    /// # })
238    /// ```
239    pub fn reserve(&mut self, additional: usize) {
240        if self.len() + additional < self.capacity {
241            return;
242        }
243        let new_cap = self.capacity + additional;
244        self.wakers.resize(new_cap);
245        self.states.resize(new_cap);
246        self.futures.reserve_exact(additional);
247        self.capacity = new_cap;
248    }
249}
250
251impl<F: Future> FutureGroup<F> {
252    /// Insert a new future into the group.
253    ///
254    /// # Example
255    ///
256    /// ```rust
257    /// use futures_concurrency::future::FutureGroup;
258    /// use std::future;
259    ///
260    /// let mut group = FutureGroup::with_capacity(2);
261    /// group.insert(future::ready(12));
262    /// ```
263    pub fn insert(&mut self, future: F) -> Key
264    where
265        F: Future,
266    {
267        if self.capacity <= self.len() {
268            self.reserve(self.capacity * 2 + 1);
269        }
270
271        let index = self.futures.insert(future);
272        self.keys.insert(index);
273
274        // Set the corresponding state
275        self.states[index].set_pending();
276        self.wakers.readiness().set_ready(index);
277
278        Key(index)
279    }
280
281    #[allow(unused)]
282    /// Insert a value into a pinned `FutureGroup`
283    ///
284    /// This method is private because it serves as an implementation detail for
285    /// `ConcurrentStream`. We should never expose this publicly, as the entire
286    /// point of this crate is that we abstract the futures poll machinery away
287    /// from end-users.
288    pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key
289    where
290        F: Future,
291    {
292        let mut this = self.project();
293        // SAFETY: inserting a value into the futures slab does not ever move
294        // any of the existing values.
295        let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future);
296        this.keys.insert(index);
297        let key = Key(index);
298
299        // If our slab allocated more space we need to
300        // update our tracking structures along with it.
301        let max_len = this.futures.as_ref().capacity().max(index);
302        this.wakers.resize(max_len);
303        this.states.resize(max_len);
304
305        // Set the corresponding state
306        this.states[index].set_pending();
307        let mut readiness = this.wakers.readiness();
308        readiness.set_ready(index);
309
310        key
311    }
312
313    /// Create a stream which also yields the key of each item.
314    ///
315    /// # Example
316    ///
317    /// ```rust
318    /// use futures_concurrency::future::FutureGroup;
319    /// use futures_lite::StreamExt;
320    /// use std::future;
321    ///
322    /// # futures_lite::future::block_on(async {
323    /// let mut group = FutureGroup::new();
324    /// group.insert(future::ready(2));
325    /// group.insert(future::ready(4));
326    ///
327    /// let mut out = 0;
328    /// let mut group = group.keyed();
329    /// while let Some((_key, num)) = group.next().await {
330    ///     out += num;
331    /// }
332    /// assert_eq!(out, 6);
333    /// # });
334    /// ```
335    pub fn keyed(self) -> Keyed<F> {
336        Keyed { group: self }
337    }
338}
339
340impl<F: Future> FutureGroup<F> {
341    fn poll_next_inner(
342        self: Pin<&mut Self>,
343        cx: &Context<'_>,
344    ) -> Poll<Option<(Key, <F as Future>::Output)>> {
345        let mut this = self.project();
346
347        // Short-circuit if we have no futures to iterate over
348        if this.futures.is_empty() {
349            return Poll::Ready(None);
350        }
351
352        // Set the top-level waker and check readiness
353        let mut readiness = this.wakers.readiness();
354        readiness.set_waker(cx.waker());
355        if !readiness.any_ready() {
356            // Nothing is ready yet
357            return Poll::Pending;
358        }
359
360        // Setup our futures state
361        let mut ret = Poll::Pending;
362        let states = this.states;
363
364        // SAFETY: We unpin the future group so we can later individually access
365        // single futures. Either to read from them or to drop them.
366        let futures = unsafe { this.futures.as_mut().get_unchecked_mut() };
367
368        for index in this.keys.iter().cloned() {
369            if states[index].is_pending() && readiness.clear_ready(index) {
370                // unlock readiness so we don't deadlock when polling
371                #[allow(clippy::drop_non_drop)]
372                drop(readiness);
373
374                // Obtain the intermediate waker.
375                let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
376
377                // SAFETY: this future here is a projection from the futures
378                // vec, which we're reading from.
379                let future = unsafe { Pin::new_unchecked(&mut futures[index]) };
380                match future.poll(&mut cx) {
381                    Poll::Ready(item) => {
382                        // Set the return type for the function
383                        ret = Poll::Ready(Some((Key(index), item)));
384
385                        // Remove all associated data with the future
386                        // The only data we can't remove directly is the key entry.
387                        states[index] = PollState::None;
388                        futures.remove(index);
389
390                        break;
391                    }
392                    // Keep looping if there is nothing for us to do
393                    Poll::Pending => {}
394                };
395
396                // Lock readiness so we can use it again
397                readiness = this.wakers.readiness();
398            }
399        }
400
401        // Now that we're no longer borrowing `this.keys` we can remove
402        // the current key from the set
403        if let Poll::Ready(Some((key, _))) = ret {
404            this.keys.remove(&key.0);
405        }
406
407        ret
408    }
409}
410
411impl<F: Future> Stream for FutureGroup<F> {
412    type Item = <F as Future>::Output;
413
414    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
415        match self.poll_next_inner(cx) {
416            Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)),
417            Poll::Ready(None) => Poll::Ready(None),
418            Poll::Pending => Poll::Pending,
419        }
420    }
421}
422
423impl<F: Future> Extend<F> for FutureGroup<F> {
424    fn extend<T: IntoIterator<Item = F>>(&mut self, iter: T) {
425        let iter = iter.into_iter();
426        let len = iter.size_hint().1.unwrap_or_default();
427        self.reserve(len);
428
429        for future in iter {
430            self.insert(future);
431        }
432    }
433}
434
435impl<F: Future> FromIterator<F> for FutureGroup<F> {
436    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
437        let mut this = Self::new();
438        this.extend(iter);
439        this
440    }
441}
442
443/// A key used to index into the `FutureGroup` type.
444#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
445pub struct Key(usize);
446
447/// Iterate over items in the futures group with their associated keys.
448#[derive(Debug)]
449#[pin_project::pin_project]
450pub struct Keyed<F: Future> {
451    #[pin]
452    group: FutureGroup<F>,
453}
454
455impl<F: Future> Deref for Keyed<F> {
456    type Target = FutureGroup<F>;
457
458    fn deref(&self) -> &Self::Target {
459        &self.group
460    }
461}
462
463impl<F: Future> DerefMut for Keyed<F> {
464    fn deref_mut(&mut self) -> &mut Self::Target {
465        &mut self.group
466    }
467}
468
469impl<F: Future> Stream for Keyed<F> {
470    type Item = (Key, <F as Future>::Output);
471
472    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
473        let mut this = self.project();
474        this.group.as_mut().poll_next_inner(cx)
475    }
476}
477
478#[cfg(test)]
479mod test {
480    use super::FutureGroup;
481    use core::future;
482    use futures_lite::prelude::*;
483
484    #[test]
485    fn smoke() {
486        futures_lite::future::block_on(async {
487            let mut group = FutureGroup::new();
488            group.insert(future::ready(2));
489            group.insert(future::ready(4));
490
491            let mut out = 0;
492            while let Some(num) = group.next().await {
493                out += num;
494            }
495            assert_eq!(out, 6);
496            assert_eq!(group.len(), 0);
497            assert!(group.is_empty());
498        });
499    }
500
501    #[test]
502    fn capacity_grow_on_insert() {
503        futures_lite::future::block_on(async {
504            let mut group = FutureGroup::new();
505            let cap = group.capacity();
506
507            group.insert(future::ready(1));
508
509            assert!(group.capacity() > cap);
510        });
511    }
512}