1pub mod optional;
2pub mod ordered;
3pub mod set;
4pub mod timeout_map;
5pub mod timeout_set;
6
7use crate::common::InnerMap;
8use futures::stream::{FusedStream, FuturesUnordered};
9use futures::{Stream, StreamExt};
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll, Waker};
13
14pub struct FutureMap<K, S> {
15 list: FuturesUnordered<InnerMap<K, S>>,
16 empty: bool,
17 waker: Option<Waker>,
18}
19
20impl<K, T> Default for FutureMap<K, T> {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl<K, T> FutureMap<K, T> {
27 pub fn new() -> Self {
29 Self {
30 list: FuturesUnordered::new(),
31 empty: true,
32 waker: None,
33 }
34 }
35}
36
37impl<K, T> FutureMap<K, T>
38where
39 K: Clone + PartialEq + Send + Unpin + 'static,
40 T: Future + Send + Unpin + 'static,
41{
42 pub fn insert(&mut self, key: K, fut: T) -> bool {
46 if self.contains_key(&key) {
47 return false;
48 }
49
50 let st = InnerMap::new(key, fut);
51 self.list.push(st);
52
53 if let Some(waker) = self.waker.take() {
54 waker.wake();
55 }
56
57 self.empty = false;
58 true
59 }
60
61 pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
65 self.list
66 .iter_mut()
67 .find(|st| st.key().eq(key))
68 .is_some_and(|st| st.set_wake_on_success(wake_on_success))
69 }
70
71 pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
73 self.list.iter().filter_map(|st| st.key_value())
74 }
75
76 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
78 self.list.iter_mut().filter_map(|st| st.key_value_mut())
79 }
80
81 pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
83 self.list.iter_mut().filter_map(|st| st.key_value_pin())
84 }
85
86 pub fn keys(&self) -> impl Iterator<Item = &K> {
88 self.list.iter().map(|st| st.key())
89 }
90
91 pub fn values(&self) -> impl Iterator<Item = &T> {
93 self.list.iter().filter_map(|st| st.inner())
94 }
95
96 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
98 self.list.iter_mut().filter_map(|st| st.inner_mut())
99 }
100
101 pub fn contains_key(&self, key: &K) -> bool {
103 self.list.iter().any(|st| st.key().eq(key))
104 }
105
106 pub fn clear(&mut self) {
108 self.list.clear();
109 }
110
111 pub fn get(&self, key: &K) -> Option<&T> {
113 self.list
114 .iter()
115 .find(|st| st.key().eq(key))
116 .and_then(|st| st.inner())
117 }
118
119 pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
121 self.list
122 .iter_mut()
123 .find(|st| st.key().eq(key))
124 .and_then(|st| st.inner_mut())
125 }
126
127 pub fn get_mut_or_default(&mut self, key: &K) -> &mut T
129 where
130 T: Default,
131 {
132 self.insert(key.clone(), T::default());
133 self.get_mut(key).expect("valid entry")
134 }
135
136 pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
138 self.list
139 .iter_mut()
140 .find(|st| st.key().eq(key))
141 .and_then(|st| st.inner_pin())
142 }
143
144 pub fn remove(&mut self, key: &K) -> Option<T> {
146 self.list
147 .iter_mut()
148 .find(|st| st.key().eq(key))
149 .and_then(|st| st.take_inner())
150 }
151
152 pub fn len(&self) -> usize {
154 self.list.iter().filter(|st| st.inner().is_some()).count()
155 }
156
157 pub fn is_empty(&self) -> bool {
159 self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
160 }
161}
162
163impl<K, T> FromIterator<(K, T)> for FutureMap<K, T>
164where
165 K: Clone + PartialEq + Send + Unpin + 'static,
166 T: Future + Send + Unpin + 'static,
167{
168 fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
169 let mut maps = Self::new();
170 for (key, val) in iter {
171 maps.insert(key, val);
172 }
173 maps
174 }
175}
176
177impl<K, T> Stream for FutureMap<K, T>
178where
179 K: Clone + PartialEq + Send + Unpin + 'static,
180 T: Future + Unpin + Send + 'static,
181{
182 type Item = (K, T::Output);
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 let this = &mut *self;
186 loop {
187 match this.list.poll_next_unpin(cx) {
188 Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
189 Poll::Ready(Some((key, None))) => {
191 this.remove(&key);
192 }
193 Poll::Ready(None) => {
194 if self.empty {
201 self.waker = Some(cx.waker().clone());
202 return Poll::Pending;
203 }
204
205 self.empty = true;
206 return Poll::Ready(None);
207 }
208 Poll::Pending => {
209 self.waker = Some(cx.waker().clone());
211 return Poll::Pending;
212 }
213 }
214 }
215 }
216
217 fn size_hint(&self) -> (usize, Option<usize>) {
218 self.list.size_hint()
219 }
220}
221
222impl<K, T> FusedStream for FutureMap<K, T>
223where
224 K: Clone + PartialEq + Send + Unpin + 'static,
225 T: Future + Unpin + Send + 'static,
226{
227 fn is_terminated(&self) -> bool {
228 self.list.is_terminated()
229 }
230}
231
232#[cfg(test)]
233mod test {
234 use crate::futures::FutureMap;
235 use futures::future::pending;
236 use futures::StreamExt;
237 use std::task::Poll;
238
239 #[test]
240 fn existing_key() {
241 let mut map = FutureMap::new();
242 assert!(map.insert(1, pending::<()>()));
243 assert!(!map.insert(1, pending::<()>()));
244 }
245
246 #[test]
247 fn poll_multiple_keyed_streams() {
248 let mut map = FutureMap::new();
249 map.insert(1, futures::future::ready(10));
250 map.insert(2, futures::future::ready(20));
251 map.insert(3, futures::future::ready(30));
252
253 futures::executor::block_on(async move {
254 assert_eq!(map.next().await, Some((1, 10)));
255 assert_eq!(map.next().await, Some((2, 20)));
256 assert_eq!(map.next().await, Some((3, 30)));
257 assert_eq!(map.next().await, None);
258 let pending =
259 futures::future::poll_fn(|cx| Poll::Ready(map.poll_next_unpin(cx).is_pending()))
260 .await;
261 assert!(pending);
262 })
263 }
264}