1pub mod optional;
2
3use crate::common::InnerMap;
4use futures::stream::{FusedStream, SelectAll};
5use futures::{Stream, StreamExt};
6use std::pin::Pin;
7use std::task::{Context, Poll, Waker};
8
9pub struct StreamMap<K, S> {
11 list: SelectAll<InnerMap<K, S>>,
12 empty: bool,
13 waker: Option<Waker>,
14}
15
16impl<K, T> Default for StreamMap<K, T>
17where
18 K: Clone + Unpin,
19 T: Stream + Send + Unpin + 'static,
20{
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl<K, T> StreamMap<K, T>
27where
28 K: Clone + Unpin,
29 T: Stream + Send + Unpin + 'static,
30{
31 pub fn new() -> Self {
33 Self {
34 list: SelectAll::new(),
35 empty: true,
36 waker: None,
37 }
38 }
39}
40
41impl<K, T> StreamMap<K, T>
42where
43 K: Clone + PartialEq + Send + Unpin + 'static,
44 T: Stream + Send + Unpin + 'static,
45{
46 pub fn insert(&mut self, key: K, stream: T) -> bool {
50 if self.contains_key(&key) {
51 return false;
52 }
53
54 let st = InnerMap::new(key, stream);
55 self.list.push(st);
56
57 if let Some(waker) = self.waker.take() {
58 waker.wake();
59 }
60
61 self.empty = false;
62 true
63 }
64
65 pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
69 self.list
70 .iter_mut()
71 .find(|st| st.key().eq(key))
72 .is_some_and(|st| st.set_wake_on_success(wake_on_success))
73 }
74
75 pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
77 self.list.iter().filter_map(|st| st.key_value())
78 }
79
80 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
82 self.list.iter_mut().filter_map(|st| st.key_value_mut())
83 }
84
85 pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
87 self.list.iter_mut().filter_map(|st| st.key_value_pin())
88 }
89
90 pub fn keys(&self) -> impl Iterator<Item = &K> {
92 self.list.iter().map(|st| st.key())
93 }
94
95 pub fn values(&self) -> impl Iterator<Item = &T> {
97 self.list.iter().filter_map(|st| st.inner())
98 }
99
100 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
102 self.list.iter_mut().filter_map(|st| st.inner_mut())
103 }
104
105 pub fn contains_key(&self, key: &K) -> bool {
107 self.list.iter().any(|st| st.key().eq(key))
108 }
109
110 pub fn clear(&mut self) {
112 self.list.clear();
113 }
114
115 pub fn get(&self, key: &K) -> Option<&T> {
117 self.list
118 .iter()
119 .find(|st| st.key().eq(key))
120 .and_then(|st| st.inner())
121 }
122
123 pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
125 self.list
126 .iter_mut()
127 .find(|st| st.key().eq(key))
128 .and_then(|st| st.inner_mut())
129 }
130
131 pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
133 self.list
134 .iter_mut()
135 .find(|st| st.key().eq(key))
136 .and_then(|st| st.inner_pin())
137 }
138
139 pub fn remove(&mut self, key: &K) -> Option<T> {
141 self.list
142 .iter_mut()
143 .find(|st| st.key().eq(key))
144 .and_then(|st| st.take_inner())
145 }
146
147 pub fn len(&self) -> usize {
149 self.list.iter().filter(|st| st.inner().is_some()).count()
150 }
151
152 pub fn is_empty(&self) -> bool {
154 self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
155 }
156}
157
158impl<K, T> FromIterator<(K, T)> for StreamMap<K, T>
159where
160 K: Clone + PartialEq + Send + Unpin + 'static,
161 T: Stream + Send + Unpin + 'static,
162{
163 fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
164 let mut maps = Self::new();
165 for (key, val) in iter {
166 maps.insert(key, val);
167 }
168 maps
169 }
170}
171
172impl<K, T> Stream for StreamMap<K, T>
173where
174 K: Clone + PartialEq + Send + Unpin + 'static,
175 T: Stream + Unpin + Send + 'static,
176{
177 type Item = (K, T::Item);
178
179 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 let this = &mut *self;
181
182 if this.list.is_empty() {
183 self.waker = Some(cx.waker().clone());
184 return Poll::Pending;
185 }
186
187 loop {
188 match this.list.poll_next_unpin(cx) {
189 Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
190 Poll::Ready(Some((key, None))) => {
192 this.remove(&key);
193 }
194 Poll::Ready(None) => {
195 if self.empty {
202 self.waker = Some(cx.waker().clone());
203 return Poll::Pending;
204 }
205
206 self.empty = true;
207 return Poll::Ready(None);
208 }
209 Poll::Pending => {
210 self.waker = Some(cx.waker().clone());
212 return Poll::Pending;
213 }
214 }
215 }
216 }
217
218 fn size_hint(&self) -> (usize, Option<usize>) {
219 self.list.size_hint()
220 }
221}
222
223impl<K, T> FusedStream for StreamMap<K, T>
224where
225 K: Clone + PartialEq + Send + Unpin + 'static,
226 T: Stream + Unpin + Send + 'static,
227{
228 fn is_terminated(&self) -> bool {
229 self.list.is_terminated()
230 }
231}