pollable_map/
stream.rs

1pub mod optional;
2
3use crate::common::InnerMap;
4use futures::stream::{FusedStream, SelectAll};
5use futures::{Stream, StreamExt};
6use std::pin::Pin;
7use std::task::{Context, Poll, Waker};
8
9/// Combining multiple streams into one, with each stream having a unique key.
10pub struct StreamMap<K, S> {
11    list: SelectAll<InnerMap<K, S>>,
12    empty: bool,
13    waker: Option<Waker>,
14}
15
16impl<K, T> Default for StreamMap<K, T>
17where
18    K: Clone + Unpin,
19    T: Stream + Send + Unpin + 'static,
20{
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl<K, T> StreamMap<K, T>
27where
28    K: Clone + Unpin,
29    T: Stream + Send + Unpin + 'static,
30{
31    /// Creates an empty [`StreamMap`]
32    pub fn new() -> Self {
33        Self {
34            list: SelectAll::new(),
35            empty: true,
36            waker: None,
37        }
38    }
39}
40
41impl<K, T> StreamMap<K, T>
42where
43    K: Clone + PartialEq + Send + Unpin + 'static,
44    T: Stream + Send + Unpin + 'static,
45{
46    /// Insert a stream into the map with a unique key.
47    /// The function will return true if the map does not have the key present,
48    /// otherwise it will return false
49    pub fn insert(&mut self, key: K, stream: T) -> bool {
50        if self.contains_key(&key) {
51            return false;
52        }
53
54        let st = InnerMap::new(key, stream);
55        self.list.push(st);
56
57        if let Some(waker) = self.waker.take() {
58            waker.wake();
59        }
60
61        self.empty = false;
62        true
63    }
64
65    /// Mark stream with assigned key to wake up on successful yield.
66    /// Will return false if stream does not exist or if value is the same as
67    /// previously set.
68    pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
69        self.list
70            .iter_mut()
71            .find(|st| st.key().eq(key))
72            .is_some_and(|st| st.set_wake_on_success(wake_on_success))
73    }
74
75    /// An iterator visiting all key-value pairs in arbitrary order.
76    pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
77        self.list.iter().filter_map(|st| st.key_value())
78    }
79
80    /// An iterator visiting all key-value pairs mutably in arbitrary order.
81    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
82        self.list.iter_mut().filter_map(|st| st.key_value_mut())
83    }
84
85    /// An iterator visiting all key-value pairs with a pinned valued in arbitrary order
86    pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
87        self.list.iter_mut().filter_map(|st| st.key_value_pin())
88    }
89
90    /// Returns an iterator visiting all keys in arbitrary order.
91    pub fn keys(&self) -> impl Iterator<Item = &K> {
92        self.list.iter().map(|st| st.key())
93    }
94
95    /// An iterator visiting all values in arbitrary order.
96    pub fn values(&self) -> impl Iterator<Item = &T> {
97        self.list.iter().filter_map(|st| st.inner())
98    }
99
100    /// An iterator visiting all values mutably in arbitrary order.
101    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
102        self.list.iter_mut().filter_map(|st| st.inner_mut())
103    }
104
105    /// Returns `true` if the map contains a stream for the specified key.
106    pub fn contains_key(&self, key: &K) -> bool {
107        self.list.iter().any(|st| st.key().eq(key))
108    }
109
110    /// Clears the map.
111    pub fn clear(&mut self) {
112        self.list.clear();
113    }
114
115    /// Returns a reference to the stream corresponding to the key.
116    pub fn get(&self, key: &K) -> Option<&T> {
117        self.list
118            .iter()
119            .find(|st| st.key().eq(key))
120            .and_then(|st| st.inner())
121    }
122
123    /// Returns a mutable stream to the value corresponding to the key.
124    pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
125        self.list
126            .iter_mut()
127            .find(|st| st.key().eq(key))
128            .and_then(|st| st.inner_mut())
129    }
130
131    /// Returns a pinned stream corresponding to the key.
132    pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
133        self.list
134            .iter_mut()
135            .find(|st| st.key().eq(key))
136            .and_then(|st| st.inner_pin())
137    }
138
139    /// Removes a key from the map, returning the stream.
140    pub fn remove(&mut self, key: &K) -> Option<T> {
141        self.list
142            .iter_mut()
143            .find(|st| st.key().eq(key))
144            .and_then(|st| st.take_inner())
145    }
146
147    /// Returns the number of streams in the map.
148    pub fn len(&self) -> usize {
149        self.list.iter().filter(|st| st.inner().is_some()).count()
150    }
151
152    /// Return `true` map contains no elements.
153    pub fn is_empty(&self) -> bool {
154        self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
155    }
156}
157
158impl<K, T> FromIterator<(K, T)> for StreamMap<K, T>
159where
160    K: Clone + PartialEq + Send + Unpin + 'static,
161    T: Stream + Send + Unpin + 'static,
162{
163    fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
164        let mut maps = Self::new();
165        for (key, val) in iter {
166            maps.insert(key, val);
167        }
168        maps
169    }
170}
171
172impl<K, T> Stream for StreamMap<K, T>
173where
174    K: Clone + PartialEq + Send + Unpin + 'static,
175    T: Stream + Unpin + Send + 'static,
176{
177    type Item = (K, T::Item);
178
179    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        let this = &mut *self;
181
182        if this.list.is_empty() {
183            self.waker = Some(cx.waker().clone());
184            return Poll::Pending;
185        }
186
187        loop {
188            match this.list.poll_next_unpin(cx) {
189                Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
190                // We continue in case there is any progress on the set of streams
191                Poll::Ready(Some((key, None))) => {
192                    this.remove(&key);
193                }
194                Poll::Ready(None) => {
195                    // While we could allow the stream to continue to be pending, it would make more sense to notify that the stream
196                    // is empty without needing to explicitly check while polling the actual "map" itself
197                    // So we would mark a field to notify that the state is finished and return `Poll::Ready(None)` so the stream
198                    // can be terminated while on the next poll, we could let it be return pending.
199                    // We do this so that we are not returning `Poll::Ready(None)` each time the map is polled
200                    // as that may be seen as UB and may cause an increase in cpu usage
201                    if self.empty {
202                        self.waker = Some(cx.waker().clone());
203                        return Poll::Pending;
204                    }
205
206                    self.empty = true;
207                    return Poll::Ready(None);
208                }
209                Poll::Pending => {
210                    // Returning `None` does not mean the stream is actually terminated
211                    self.waker = Some(cx.waker().clone());
212                    return Poll::Pending;
213                }
214            }
215        }
216    }
217
218    fn size_hint(&self) -> (usize, Option<usize>) {
219        self.list.size_hint()
220    }
221}
222
223impl<K, T> FusedStream for StreamMap<K, T>
224where
225    K: Clone + PartialEq + Send + Unpin + 'static,
226    T: Stream + Unpin + Send + 'static,
227{
228    fn is_terminated(&self) -> bool {
229        self.list.is_terminated()
230    }
231}