1pub mod optional;
2pub mod set;
3pub mod timeout_map;
4pub mod timeout_set;
5
6use crate::common::InnerMap;
7use futures::stream::{FusedStream, SelectAll};
8use futures::{Stream, StreamExt};
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12pub struct StreamMap<K, S> {
14 list: SelectAll<InnerMap<K, S>>,
15 empty: bool,
16 waker: Option<Waker>,
17}
18
19impl<K, T> Default for StreamMap<K, T>
20where
21 K: Clone + Unpin,
22 T: Stream + Send + Unpin + 'static,
23{
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl<K, T> StreamMap<K, T>
30where
31 K: Clone + Unpin,
32 T: Stream + Send + Unpin + 'static,
33{
34 pub fn new() -> Self {
36 Self {
37 list: SelectAll::new(),
38 empty: true,
39 waker: None,
40 }
41 }
42}
43
44impl<K, T> StreamMap<K, T>
45where
46 K: Clone + PartialEq + Send + Unpin + 'static,
47 T: Stream + Send + Unpin + 'static,
48{
49 pub fn insert(&mut self, key: K, stream: T) -> bool {
53 if self.contains_key(&key) {
54 return false;
55 }
56
57 let st = InnerMap::new(key, stream);
58 self.list.push(st);
59
60 if let Some(waker) = self.waker.take() {
61 waker.wake();
62 }
63
64 self.empty = false;
65 true
66 }
67
68 pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
72 self.list
73 .iter_mut()
74 .find(|st| st.key().eq(key))
75 .is_some_and(|st| st.set_wake_on_success(wake_on_success))
76 }
77
78 pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
80 self.list.iter().filter_map(|st| st.key_value())
81 }
82
83 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
85 self.list.iter_mut().filter_map(|st| st.key_value_mut())
86 }
87
88 pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
90 self.list.iter_mut().filter_map(|st| st.key_value_pin())
91 }
92
93 pub fn keys(&self) -> impl Iterator<Item = &K> {
95 self.list.iter().map(|st| st.key())
96 }
97
98 pub fn values(&self) -> impl Iterator<Item = &T> {
100 self.list.iter().filter_map(|st| st.inner())
101 }
102
103 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
105 self.list.iter_mut().filter_map(|st| st.inner_mut())
106 }
107
108 pub fn contains_key(&self, key: &K) -> bool {
110 self.list.iter().any(|st| st.key().eq(key))
111 }
112
113 pub fn clear(&mut self) {
115 self.list.clear();
116 }
117
118 pub fn get(&self, key: &K) -> Option<&T> {
120 self.list
121 .iter()
122 .find(|st| st.key().eq(key))
123 .and_then(|st| st.inner())
124 }
125
126 pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
128 self.list
129 .iter_mut()
130 .find(|st| st.key().eq(key))
131 .and_then(|st| st.inner_mut())
132 }
133
134 pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
136 self.list
137 .iter_mut()
138 .find(|st| st.key().eq(key))
139 .and_then(|st| st.inner_pin())
140 }
141
142 pub fn remove(&mut self, key: &K) -> Option<T> {
144 self.list
145 .iter_mut()
146 .find(|st| st.key().eq(key))
147 .and_then(|st| st.take_inner())
148 }
149
150 pub fn len(&self) -> usize {
152 self.list.iter().filter(|st| st.inner().is_some()).count()
153 }
154
155 pub fn is_empty(&self) -> bool {
157 self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
158 }
159}
160
161impl<K, T> FromIterator<(K, T)> for StreamMap<K, T>
162where
163 K: Clone + PartialEq + Send + Unpin + 'static,
164 T: Stream + Send + Unpin + 'static,
165{
166 fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
167 let mut maps = Self::new();
168 for (key, val) in iter {
169 maps.insert(key, val);
170 }
171 maps
172 }
173}
174
175impl<K, T> Stream for StreamMap<K, T>
176where
177 K: Clone + PartialEq + Send + Unpin + 'static,
178 T: Stream + Unpin + Send + 'static,
179{
180 type Item = (K, T::Item);
181
182 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183 let this = &mut *self;
184
185 if this.list.is_empty() {
186 self.waker = Some(cx.waker().clone());
187 return Poll::Pending;
188 }
189
190 loop {
191 match this.list.poll_next_unpin(cx) {
192 Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
193 Poll::Ready(Some((key, None))) => {
195 this.remove(&key);
196 }
197 Poll::Ready(None) => {
198 if self.empty {
205 self.waker = Some(cx.waker().clone());
206 return Poll::Pending;
207 }
208
209 self.empty = true;
210 return Poll::Ready(None);
211 }
212 Poll::Pending => {
213 self.waker = Some(cx.waker().clone());
215 return Poll::Pending;
216 }
217 }
218 }
219 }
220
221 fn size_hint(&self) -> (usize, Option<usize>) {
222 self.list.size_hint()
223 }
224}
225
226impl<K, T> FusedStream for StreamMap<K, T>
227where
228 K: Clone + PartialEq + Send + Unpin + 'static,
229 T: Stream + Unpin + Send + 'static,
230{
231 fn is_terminated(&self) -> bool {
232 self.list.is_terminated()
233 }
234}