pollable_map/
futures.rs

1pub mod optional;
2pub mod ordered;
3pub mod set;
4pub mod timeout_map;
5pub mod timeout_set;
6
7use crate::common::InnerMap;
8use futures::stream::{FusedStream, FuturesUnordered};
9use futures::{Stream, StreamExt};
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll, Waker};
13
14pub struct FutureMap<K, S> {
15    list: FuturesUnordered<InnerMap<K, S>>,
16    empty: bool,
17    waker: Option<Waker>,
18}
19
20impl<K, T> Default for FutureMap<K, T> {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl<K, T> FutureMap<K, T> {
27    /// Creates an empty [`FutureMap`]
28    pub fn new() -> Self {
29        Self {
30            list: FuturesUnordered::new(),
31            empty: true,
32            waker: None,
33        }
34    }
35}
36
37impl<K, T> FutureMap<K, T>
38where
39    K: Clone + PartialEq + Send + Unpin + 'static,
40    T: Future + Send + Unpin + 'static,
41{
42    /// Insert a future into the map with a unique key.
43    /// The function will return true if the map does not have the key present,
44    /// otherwise it will return false
45    pub fn insert(&mut self, key: K, fut: T) -> bool {
46        if self.contains_key(&key) {
47            return false;
48        }
49
50        let st = InnerMap::new(key, fut);
51        self.list.push(st);
52
53        if let Some(waker) = self.waker.take() {
54            waker.wake();
55        }
56
57        self.empty = false;
58        true
59    }
60
61    /// Mark future with assigned key to wake up on successful yield.
62    /// Will return false if future does not exist or if value is the same as
63    /// previously set.
64    pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
65        self.list
66            .iter_mut()
67            .find(|st| st.key().eq(key))
68            .is_some_and(|st| st.set_wake_on_success(wake_on_success))
69    }
70
71    /// An iterator visiting all key-value pairs in arbitrary order.
72    pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
73        self.list.iter().filter_map(|st| st.key_value())
74    }
75
76    /// An iterator visiting all key-value pairs mutably in arbitrary order.
77    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
78        self.list.iter_mut().filter_map(|st| st.key_value_mut())
79    }
80
81    /// An iterator visiting all key-value pairs with a pinned valued in arbitrary order
82    pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
83        self.list.iter_mut().filter_map(|st| st.key_value_pin())
84    }
85
86    /// Returns an iterator visiting all keys in arbitrary order.
87    pub fn keys(&self) -> impl Iterator<Item = &K> {
88        self.list.iter().map(|st| st.key())
89    }
90
91    /// An iterator visiting all values in arbitrary order.
92    pub fn values(&self) -> impl Iterator<Item = &T> {
93        self.list.iter().filter_map(|st| st.inner())
94    }
95
96    /// An iterator visiting all values mutably in arbitrary order.
97    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
98        self.list.iter_mut().filter_map(|st| st.inner_mut())
99    }
100
101    /// Returns `true` if the map contains a future for the specified key.
102    pub fn contains_key(&self, key: &K) -> bool {
103        self.list.iter().any(|st| st.key().eq(key))
104    }
105
106    /// Clears the map.
107    pub fn clear(&mut self) {
108        self.list.clear();
109    }
110
111    /// Returns a reference to the future corresponding to the key.
112    pub fn get(&self, key: &K) -> Option<&T> {
113        self.list
114            .iter()
115            .find(|st| st.key().eq(key))
116            .and_then(|st| st.inner())
117    }
118
119    /// Returns a mutable future to the value corresponding to the key.
120    pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
121        self.list
122            .iter_mut()
123            .find(|st| st.key().eq(key))
124            .and_then(|st| st.inner_mut())
125    }
126
127    /// Returns a muable future or default value if it does not exist.
128    pub fn get_mut_or_default(&mut self, key: &K) -> &mut T
129    where
130        T: Default,
131    {
132        self.insert(key.clone(), T::default());
133        self.get_mut(key).expect("valid entry")
134    }
135
136    /// Returns a pinned future corresponding to the key.
137    pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
138        self.list
139            .iter_mut()
140            .find(|st| st.key().eq(key))
141            .and_then(|st| st.inner_pin())
142    }
143
144    /// Removes a key from the map, returning the future.
145    pub fn remove(&mut self, key: &K) -> Option<T> {
146        self.list
147            .iter_mut()
148            .find(|st| st.key().eq(key))
149            .and_then(|st| st.take_inner())
150    }
151
152    /// Returns the number of futures in the map.
153    pub fn len(&self) -> usize {
154        self.list.iter().filter(|st| st.inner().is_some()).count()
155    }
156
157    /// Return `true` map contains no elements.
158    pub fn is_empty(&self) -> bool {
159        self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
160    }
161}
162
163impl<K, T> FromIterator<(K, T)> for FutureMap<K, T>
164where
165    K: Clone + PartialEq + Send + Unpin + 'static,
166    T: Future + Send + Unpin + 'static,
167{
168    fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
169        let mut maps = Self::new();
170        for (key, val) in iter {
171            maps.insert(key, val);
172        }
173        maps
174    }
175}
176
177impl<K, T> Stream for FutureMap<K, T>
178where
179    K: Clone + PartialEq + Send + Unpin + 'static,
180    T: Future + Unpin + Send + 'static,
181{
182    type Item = (K, T::Output);
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        let this = &mut *self;
186        loop {
187            match this.list.poll_next_unpin(cx) {
188                Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
189                // We continue in case there is any progress on the set of streams
190                Poll::Ready(Some((key, None))) => {
191                    this.remove(&key);
192                }
193                Poll::Ready(None) => {
194                    // While we could allow the stream to continue to be pending, it would make more sense to notify that the stream
195                    // is empty without needing to explicitly check while polling the actual "map" itself
196                    // So we would mark a field to notify that the state is finished and return `Poll::Ready(None)` so the stream
197                    // can be terminated while on the next poll, we could let it be return pending.
198                    // We do this so that we are not returning `Poll::Ready(None)` each time the map is polled
199                    // as that may be seen as UB and may cause an increase in cpu usage
200                    if self.empty {
201                        self.waker = Some(cx.waker().clone());
202                        return Poll::Pending;
203                    }
204
205                    self.empty = true;
206                    return Poll::Ready(None);
207                }
208                Poll::Pending => {
209                    // Returning `None` does not mean the stream is actually terminated
210                    self.waker = Some(cx.waker().clone());
211                    return Poll::Pending;
212                }
213            }
214        }
215    }
216
217    fn size_hint(&self) -> (usize, Option<usize>) {
218        self.list.size_hint()
219    }
220}
221
222impl<K, T> FusedStream for FutureMap<K, T>
223where
224    K: Clone + PartialEq + Send + Unpin + 'static,
225    T: Future + Unpin + Send + 'static,
226{
227    fn is_terminated(&self) -> bool {
228        self.list.is_terminated()
229    }
230}
231
232#[cfg(test)]
233mod test {
234    use crate::futures::FutureMap;
235    use futures::future::pending;
236    use futures::StreamExt;
237    use std::task::Poll;
238
239    #[test]
240    fn existing_key() {
241        let mut map = FutureMap::new();
242        assert!(map.insert(1, pending::<()>()));
243        assert!(!map.insert(1, pending::<()>()));
244    }
245
246    #[test]
247    fn poll_multiple_keyed_streams() {
248        let mut map = FutureMap::new();
249        map.insert(1, futures::future::ready(10));
250        map.insert(2, futures::future::ready(20));
251        map.insert(3, futures::future::ready(30));
252
253        futures::executor::block_on(async move {
254            assert_eq!(map.next().await, Some((1, 10)));
255            assert_eq!(map.next().await, Some((2, 20)));
256            assert_eq!(map.next().await, Some((3, 30)));
257            assert_eq!(map.next().await, None);
258            let pending =
259                futures::future::poll_fn(|cx| Poll::Ready(map.poll_next_unpin(cx).is_pending()))
260                    .await;
261            assert!(pending);
262        })
263    }
264}