pollable_map/futures/
set.rs

1use std::future::Future;
2use std::pin::Pin;
3
4use super::FutureMap;
5use futures::stream::FusedStream;
6use futures::{Stream, StreamExt};
7use std::task::{Context, Poll};
8
9pub struct FutureSet<S> {
10    id: i64,
11    map: FutureMap<i64, S>,
12}
13
14impl<S> Default for FutureSet<S>
15where
16    S: Future + Send + Unpin + 'static,
17{
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl<S> FutureSet<S>
24where
25    S: Future + Send + Unpin + 'static,
26{
27    /// Creates an empty ['FutureSet`]
28    pub fn new() -> Self {
29        Self {
30            id: 0,
31            map: FutureMap::default(),
32        }
33    }
34
35    /// Insert a future into the set of futures.
36    pub fn insert(&mut self, fut: S) -> bool {
37        self.id = self.id.wrapping_add(1);
38        self.map.insert(self.id, fut)
39    }
40
41    /// An iterator visiting all futures in arbitrary order.
42    pub fn iter(&self) -> impl Iterator<Item = &S> {
43        self.map.iter().map(|(_, st)| st)
44    }
45
46    /// An iterator visiting all futures mutably in arbitrary order.
47    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
48        self.map.iter_mut().map(|(_, st)| st)
49    }
50
51    /// An iterator visiting all futures pinned valued in arbitrary order
52    pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
53        self.map.iter_pin().map(|(_, st)| st)
54    }
55
56    /// Clears the set.
57    pub fn clear(&mut self) {
58        self.map.clear();
59    }
60
61    /// Returns the number of futures in the set.
62    pub fn len(&self) -> usize {
63        self.map.len()
64    }
65
66    /// Return `true` map contains no elements.
67    pub fn is_empty(&self) -> bool {
68        self.map.is_empty()
69    }
70}
71
72impl<S> FromIterator<S> for FutureSet<S>
73where
74    S: Future + Send + Unpin + 'static,
75{
76    fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
77        let mut maps = Self::new();
78        for st in iter {
79            maps.insert(st);
80        }
81        maps
82    }
83}
84
85impl<S> Stream for FutureSet<S>
86where
87    S: Future + Send + Unpin + 'static,
88{
89    type Item = S::Output;
90
91    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92        self.map
93            .poll_next_unpin(cx)
94            .map(|output| output.map(|(_, item)| item))
95    }
96
97    fn size_hint(&self) -> (usize, Option<usize>) {
98        self.map.size_hint()
99    }
100}
101
102impl<S> FusedStream for FutureSet<S>
103where
104    S: Future + Send + Unpin + 'static,
105{
106    fn is_terminated(&self) -> bool {
107        self.map.is_terminated()
108    }
109}
110
111#[cfg(test)]
112mod test {
113    use crate::futures::set::FutureSet;
114    use futures::StreamExt;
115
116    #[test]
117    fn valid_future_set() {
118        let mut list = FutureSet::new();
119        assert!(list.insert(futures::future::ready(0)));
120        assert!(list.insert(futures::future::ready(1)));
121
122        futures::executor::block_on(async move {
123            let val = list.next().await;
124            assert_eq!(val, Some(0));
125            let val = list.next().await;
126            assert_eq!(val, Some(1));
127        });
128    }
129}