pollable_map/
stream.rs

1pub mod optional;
2pub mod set;
3pub mod timeout_map;
4pub mod timeout_set;
5
6use crate::common::InnerMap;
7use futures::stream::{FusedStream, SelectAll};
8use futures::{Stream, StreamExt};
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12/// Combining multiple streams into one, with each stream having a unique key.
13pub struct StreamMap<K, S> {
14    list: SelectAll<InnerMap<K, S>>,
15    empty: bool,
16    waker: Option<Waker>,
17}
18
19impl<K, T> Default for StreamMap<K, T>
20where
21    K: Clone + Unpin,
22    T: Stream + Send + Unpin + 'static,
23{
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl<K, T> StreamMap<K, T>
30where
31    K: Clone + Unpin,
32    T: Stream + Send + Unpin + 'static,
33{
34    /// Creates an empty [`StreamMap`]
35    pub fn new() -> Self {
36        Self {
37            list: SelectAll::new(),
38            empty: true,
39            waker: None,
40        }
41    }
42}
43
44impl<K, T> StreamMap<K, T>
45where
46    K: Clone + PartialEq + Send + Unpin + 'static,
47    T: Stream + Send + Unpin + 'static,
48{
49    /// Insert a stream into the map with a unique key.
50    /// The function will return true if the map does not have the key present,
51    /// otherwise it will return false
52    pub fn insert(&mut self, key: K, stream: T) -> bool {
53        if self.contains_key(&key) {
54            return false;
55        }
56
57        let st = InnerMap::new(key, stream);
58        self.list.push(st);
59
60        if let Some(waker) = self.waker.take() {
61            waker.wake();
62        }
63
64        self.empty = false;
65        true
66    }
67
68    /// Mark stream with assigned key to wake up on successful yield.
69    /// Will return false if stream does not exist or if value is the same as
70    /// previously set.
71    pub fn set_wake_on_success(&mut self, key: &K, wake_on_success: bool) -> bool {
72        self.list
73            .iter_mut()
74            .find(|st| st.key().eq(key))
75            .is_some_and(|st| st.set_wake_on_success(wake_on_success))
76    }
77
78    /// An iterator visiting all key-value pairs in arbitrary order.
79    pub fn iter(&self) -> impl Iterator<Item = (&K, &T)> {
80        self.list.iter().filter_map(|st| st.key_value())
81    }
82
83    /// An iterator visiting all key-value pairs mutably in arbitrary order.
84    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut T)> {
85        self.list.iter_mut().filter_map(|st| st.key_value_mut())
86    }
87
88    /// An iterator visiting all key-value pairs with a pinned valued in arbitrary order
89    pub fn iter_pin(&mut self) -> impl Iterator<Item = (&K, Pin<&mut T>)> {
90        self.list.iter_mut().filter_map(|st| st.key_value_pin())
91    }
92
93    /// Returns an iterator visiting all keys in arbitrary order.
94    pub fn keys(&self) -> impl Iterator<Item = &K> {
95        self.list.iter().map(|st| st.key())
96    }
97
98    /// An iterator visiting all values in arbitrary order.
99    pub fn values(&self) -> impl Iterator<Item = &T> {
100        self.list.iter().filter_map(|st| st.inner())
101    }
102
103    /// An iterator visiting all values mutably in arbitrary order.
104    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut T> {
105        self.list.iter_mut().filter_map(|st| st.inner_mut())
106    }
107
108    /// Returns `true` if the map contains a stream for the specified key.
109    pub fn contains_key(&self, key: &K) -> bool {
110        self.list.iter().any(|st| st.key().eq(key))
111    }
112
113    /// Clears the map.
114    pub fn clear(&mut self) {
115        self.list.clear();
116    }
117
118    /// Returns a reference to the stream corresponding to the key.
119    pub fn get(&self, key: &K) -> Option<&T> {
120        self.list
121            .iter()
122            .find(|st| st.key().eq(key))
123            .and_then(|st| st.inner())
124    }
125
126    /// Returns a mutable stream to the value corresponding to the key.
127    pub fn get_mut(&mut self, key: &K) -> Option<&mut T> {
128        self.list
129            .iter_mut()
130            .find(|st| st.key().eq(key))
131            .and_then(|st| st.inner_mut())
132    }
133
134    /// Returns a pinned stream corresponding to the key.
135    pub fn get_pinned(&mut self, key: &K) -> Option<Pin<&mut T>> {
136        self.list
137            .iter_mut()
138            .find(|st| st.key().eq(key))
139            .and_then(|st| st.inner_pin())
140    }
141
142    /// Removes a key from the map, returning the stream.
143    pub fn remove(&mut self, key: &K) -> Option<T> {
144        self.list
145            .iter_mut()
146            .find(|st| st.key().eq(key))
147            .and_then(|st| st.take_inner())
148    }
149
150    /// Returns the number of streams in the map.
151    pub fn len(&self) -> usize {
152        self.list.iter().filter(|st| st.inner().is_some()).count()
153    }
154
155    /// Return `true` map contains no elements.
156    pub fn is_empty(&self) -> bool {
157        self.list.is_empty() || self.list.iter().all(|st| st.inner().is_none())
158    }
159}
160
161impl<K, T> FromIterator<(K, T)> for StreamMap<K, T>
162where
163    K: Clone + PartialEq + Send + Unpin + 'static,
164    T: Stream + Send + Unpin + 'static,
165{
166    fn from_iter<I: IntoIterator<Item = (K, T)>>(iter: I) -> Self {
167        let mut maps = Self::new();
168        for (key, val) in iter {
169            maps.insert(key, val);
170        }
171        maps
172    }
173}
174
175impl<K, T> Stream for StreamMap<K, T>
176where
177    K: Clone + PartialEq + Send + Unpin + 'static,
178    T: Stream + Unpin + Send + 'static,
179{
180    type Item = (K, T::Item);
181
182    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183        let this = &mut *self;
184
185        if this.list.is_empty() {
186            self.waker = Some(cx.waker().clone());
187            return Poll::Pending;
188        }
189
190        loop {
191            match this.list.poll_next_unpin(cx) {
192                Poll::Ready(Some((key, Some(item)))) => return Poll::Ready(Some((key, item))),
193                // We continue in case there is any progress on the set of streams
194                Poll::Ready(Some((key, None))) => {
195                    this.remove(&key);
196                }
197                Poll::Ready(None) => {
198                    // While we could allow the stream to continue to be pending, it would make more sense to notify that the stream
199                    // is empty without needing to explicitly check while polling the actual "map" itself
200                    // So we would mark a field to notify that the state is finished and return `Poll::Ready(None)` so the stream
201                    // can be terminated while on the next poll, we could let it be return pending.
202                    // We do this so that we are not returning `Poll::Ready(None)` each time the map is polled
203                    // as that may be seen as UB and may cause an increase in cpu usage
204                    if self.empty {
205                        self.waker = Some(cx.waker().clone());
206                        return Poll::Pending;
207                    }
208
209                    self.empty = true;
210                    return Poll::Ready(None);
211                }
212                Poll::Pending => {
213                    // Returning `None` does not mean the stream is actually terminated
214                    self.waker = Some(cx.waker().clone());
215                    return Poll::Pending;
216                }
217            }
218        }
219    }
220
221    fn size_hint(&self) -> (usize, Option<usize>) {
222        self.list.size_hint()
223    }
224}
225
226impl<K, T> FusedStream for StreamMap<K, T>
227where
228    K: Clone + PartialEq + Send + Unpin + 'static,
229    T: Stream + Unpin + Send + 'static,
230{
231    fn is_terminated(&self) -> bool {
232        self.list.is_terminated()
233    }
234}