1pub mod optional;
2pub mod ordered;
3
4use crate::common::InnerMap;
5use futures::stream::{FusedStream, FuturesUnordered};
6use futures::{Stream, StreamExt};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11pub struct FutureMap<K, S> {
12 list: FuturesUnordered<InnerMap<K, S>>,
13 empty: bool,
14 waker: Option<Waker>,
15}
16
17impl<K, T> Default for FutureMap<K, T> {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl<K, T> FutureMap<K, T> {
24 pub fn new() -> Self {
26 Self {
27 list: FuturesUnordered::new(),
28 empty: true,
29 waker: None,
30 }
31 }
32}
33
34impl<K, T> FutureMap<K, T>
35where
36 K: Clone + PartialEq + Send + Unpin + 'static,
37 T: Future + Send + Unpin + 'static,
38{
39 pub fn insert(&mut self, key: K, fut: T) -> bool {
43 if self.contains_key(&key) {
44 return false;
45 }
46
47 let st = InnerMap::new(key, fut);
48 self.list.push(st);
49
50 if let Some(waker) = self.waker.take() {
51 waker.wake();
52 }
53
54 self.empty = false;
55 true
56 }
57
58 pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
62 self.list
63 .iter_mut()
64 .find(|st| st.key().eq(key))
65 .is_some_and(|st| st.set_wake_on_success(wake_on_success))
66 }
67
68 pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
70 self.list.iter().filter_map(|st| st.key_value())
71 }
72
73 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
75 self.list.iter_mut().filter_map(|st| st.key_value_mut())
76 }
77
78 pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
80 self.list.iter_mut().filter_map(|st| st.key_value_pin())
81 }
82
83 pub fn keys(&self) -> impl Iterator<Item = &K> {
85 self.list.iter().map(|st| st.key())
86 }
87
88 pub fn values(&self) -> impl Iterator<Item = &T> {
90 self.list.iter().filter_map(|st| st.inner())
91 }
92
93 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
95 self.list.iter_mut().filter_map(|st| st.inner_mut())
96 }
97
98 pub fn contains_key(&self, key: &K) -> bool {
100 self.list.iter().any(|st| st.key().eq(key))
101 }
102
103 pub fn clear(&mut self) {
105 self.list.clear();
106 }
107
108 pub fn get(&self, key: &K) -> Option<&T> {
110 self.list
111 .iter()
112 .find(|st| st.key().eq(key))
113 .and_then(|st| st.inner())
114 }
115
116 pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
118 self.list
119 .iter_mut()
120 .find(|st| st.key().eq(key))
121 .and_then(|st| st.inner_mut())
122 }
123
124 pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
126 self.list
127 .iter_mut()
128 .find(|st| st.key().eq(key))
129 .and_then(|st| st.inner_pin())
130 }
131
132 pub fn remove(&mut self, key: &K) -> Option<T> {
134 self.list
135 .iter_mut()
136 .find(|st| st.key().eq(key))
137 .and_then(|st| st.take_inner())
138 }
139
140 pub fn len(&self) -> usize {
142 self.list.iter().filter(|st| st.inner().is_some()).count()
143 }
144
145 pub fn is_empty(&self) -> bool {
147 self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
148 }
149}
150
151impl<K, T> FromIterator<(K, T)> for FutureMap<K, T>
152where
153 K: Clone + PartialEq + Send + Unpin + 'static,
154 T: Future + Send + Unpin + 'static,
155{
156 fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
157 let mut maps = Self::new();
158 for (key, val) in iter {
159 maps.insert(key, val);
160 }
161 maps
162 }
163}
164
165impl<K, T> Stream for FutureMap<K, T>
166where
167 K: Clone + PartialEq + Send + Unpin + 'static,
168 T: Future + Unpin + Send + 'static,
169{
170 type Item = (K, T::Output);
171
172 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173 let this = &mut *self;
174 loop {
175 match this.list.poll_next_unpin(cx) {
176 Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
177 Poll::Ready(Some((key, None))) => {
179 this.remove(&key);
180 }
181 Poll::Ready(None) => {
182 if self.empty {
189 self.waker = Some(cx.waker().clone());
190 return Poll::Pending;
191 }
192
193 self.empty = true;
194 return Poll::Ready(None);
195 }
196 Poll::Pending => {
197 self.waker = Some(cx.waker().clone());
199 return Poll::Pending;
200 }
201 }
202 }
203 }
204
205 fn size_hint(&self) -> (usize, Option<usize>) {
206 self.list.size_hint()
207 }
208}
209
210impl<K, T> FusedStream for FutureMap<K, T>
211where
212 K: Clone + PartialEq + Send + Unpin + 'static,
213 T: Future + Unpin + Send + 'static,
214{
215 fn is_terminated(&self) -> bool {
216 self.list.is_terminated()
217 }
218}