pollable_map/
futures.rs

1pub mod optional;
2pub mod ordered;
3
4use crate::common::InnerMap;
5use futures::stream::{FusedStream, FuturesUnordered};
6use futures::{Stream, StreamExt};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11pub struct FutureMap<K, S> {
12    list: FuturesUnordered<InnerMap<K, S>>,
13    empty: bool,
14    waker: Option<Waker>,
15}
16
17impl<K, T> Default for FutureMap<K, T> {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl<K, T> FutureMap<K, T> {
24    /// Creates an empty [`FutureMap`]
25    pub fn new() -> Self {
26        Self {
27            list: FuturesUnordered::new(),
28            empty: true,
29            waker: None,
30        }
31    }
32}
33
34impl<K, T> FutureMap<K, T>
35where
36    K: Clone + PartialEq + Send + Unpin + 'static,
37    T: Future + Send + Unpin + 'static,
38{
39    /// Insert a future into the map with a unique key.
40    /// The function will return true if the map does not have the key present,
41    /// otherwise it will return false
42    pub fn insert(&mut self, key: K, fut: T) -> bool {
43        if self.contains_key(&key) {
44            return false;
45        }
46
47        let st = InnerMap::new(key, fut);
48        self.list.push(st);
49
50        if let Some(waker) = self.waker.take() {
51            waker.wake();
52        }
53
54        self.empty = false;
55        true
56    }
57
58    /// Mark future with assigned key to wake up on successful yield.
59    /// Will return false if future does not exist or if value is the same as
60    /// previously set.
61    pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
62        self.list
63            .iter_mut()
64            .find(|st| st.key().eq(key))
65            .is_some_and(|st| st.set_wake_on_success(wake_on_success))
66    }
67
68    /// An iterator visiting all key-value pairs in arbitrary order.
69    pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
70        self.list.iter().filter_map(|st| st.key_value())
71    }
72
73    /// An iterator visiting all key-value pairs mutably in arbitrary order.
74    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
75        self.list.iter_mut().filter_map(|st| st.key_value_mut())
76    }
77
78    /// An iterator visiting all key-value pairs with a pinned valued in arbitrary order
79    pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
80        self.list.iter_mut().filter_map(|st| st.key_value_pin())
81    }
82
83    /// Returns an iterator visiting all keys in arbitrary order.
84    pub fn keys(&self) -> impl Iterator<Item = &K> {
85        self.list.iter().map(|st| st.key())
86    }
87
88    /// An iterator visiting all values in arbitrary order.
89    pub fn values(&self) -> impl Iterator<Item = &T> {
90        self.list.iter().filter_map(|st| st.inner())
91    }
92
93    /// An iterator visiting all values mutably in arbitrary order.
94    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
95        self.list.iter_mut().filter_map(|st| st.inner_mut())
96    }
97
98    /// Returns `true` if the map contains a future for the specified key.
99    pub fn contains_key(&self, key: &K) -> bool {
100        self.list.iter().any(|st| st.key().eq(key))
101    }
102
103    /// Clears the map.
104    pub fn clear(&mut self) {
105        self.list.clear();
106    }
107
108    /// Returns a reference to the future corresponding to the key.
109    pub fn get(&self, key: &K) -> Option<&T> {
110        self.list
111            .iter()
112            .find(|st| st.key().eq(key))
113            .and_then(|st| st.inner())
114    }
115
116    /// Returns a mutable future to the value corresponding to the key.
117    pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
118        self.list
119            .iter_mut()
120            .find(|st| st.key().eq(key))
121            .and_then(|st| st.inner_mut())
122    }
123
124    /// Returns a pinned future corresponding to the key.
125    pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
126        self.list
127            .iter_mut()
128            .find(|st| st.key().eq(key))
129            .and_then(|st| st.inner_pin())
130    }
131
132    /// Removes a key from the map, returning the future.
133    pub fn remove(&mut self, key: &K) -> Option<T> {
134        self.list
135            .iter_mut()
136            .find(|st| st.key().eq(key))
137            .and_then(|st| st.take_inner())
138    }
139
140    /// Returns the number of futures in the map.
141    pub fn len(&self) -> usize {
142        self.list.iter().filter(|st| st.inner().is_some()).count()
143    }
144
145    /// Return `true` map contains no elements.
146    pub fn is_empty(&self) -> bool {
147        self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
148    }
149}
150
151impl<K, T> FromIterator<(K, T)> for FutureMap<K, T>
152where
153    K: Clone + PartialEq + Send + Unpin + 'static,
154    T: Future + Send + Unpin + 'static,
155{
156    fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
157        let mut maps = Self::new();
158        for (key, val) in iter {
159            maps.insert(key, val);
160        }
161        maps
162    }
163}
164
165impl<K, T> Stream for FutureMap<K, T>
166where
167    K: Clone + PartialEq + Send + Unpin + 'static,
168    T: Future + Unpin + Send + 'static,
169{
170    type Item = (K, T::Output);
171
172    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173        let this = &mut *self;
174        loop {
175            match this.list.poll_next_unpin(cx) {
176                Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
177                // We continue in case there is any progress on the set of streams
178                Poll::Ready(Some((key, None))) => {
179                    this.remove(&key);
180                }
181                Poll::Ready(None) => {
182                    // While we could allow the stream to continue to be pending, it would make more sense to notify that the stream
183                    // is empty without needing to explicitly check while polling the actual "map" itself
184                    // So we would mark a field to notify that the state is finished and return `Poll::Ready(None)` so the stream
185                    // can be terminated while on the next poll, we could let it be return pending.
186                    // We do this so that we are not returning `Poll::Ready(None)` each time the map is polled
187                    // as that may be seen as UB and may cause an increase in cpu usage
188                    if self.empty {
189                        self.waker = Some(cx.waker().clone());
190                        return Poll::Pending;
191                    }
192
193                    self.empty = true;
194                    return Poll::Ready(None);
195                }
196                Poll::Pending => {
197                    // Returning `None` does not mean the stream is actually terminated
198                    self.waker = Some(cx.waker().clone());
199                    return Poll::Pending;
200                }
201            }
202        }
203    }
204
205    fn size_hint(&self) -> (usize, Option<usize>) {
206        self.list.size_hint()
207    }
208}
209
210impl<K, T> FusedStream for FutureMap<K, T>
211where
212    K: Clone + PartialEq + Send + Unpin + 'static,
213    T: Future + Unpin + Send + 'static,
214{
215    fn is_terminated(&self) -> bool {
216        self.list.is_terminated()
217    }
218}