stream_map_any/lib.rs
1//! Allow merging async Streams of different output type.
2//!
3//! It's very similar to Tokio's [`StreamMap`], except that it doesn't require the streams to have the
4//! same output type.
5//! This can be useful when you don't know what type of streams should be combined, acting as a
6//! runtime dynamic select.
7//!
8//! ## Not a zero-cost-abstraction
9//! Since we don't know what types of outputs the streams will generate, the generated output will
10//! be a `StreamMapAnyVariant`, a newtype around `Box<dyn Any>`. As a result, we rely on dynamic
11//! dispatching to transform it back into the desired output.
12//! Benching shows that it's __2x__ as slow as a [`StreamMap`] or a [`select`] macro implementation.
13//!
14//! [`StreamMap`]: https://docs.rs/tokio/*/tokio/stream/struct.StreamMap.html
15//! [`select`]: https://docs.rs/tokio/*/tokio/macro.select.html
16//!
17//! ## Example
18//!```
19//!# use futures::channel::mpsc::channel;
20//!# use futures::executor::block_on;
21//!# use futures::stream::{self, StreamExt};
22//!# use stream_map_any::StreamMapAny;
23//!
24//!let int_stream = stream::iter(vec![1; 10]);
25//!let (mut tx, rx) = channel::<String>(100);
26//!
27//!let mut merge = StreamMapAny::new();
28//!merge.insert(0, int_stream);
29//!merge.insert(1, rx);
30//!
31//! std::thread::spawn(move || {
32//! tx.try_send("hello world".into()).unwrap();
33//! });
34//!
35//! block_on(async move {
36//! loop {
37//! match merge.next().await {
38//! Some((0, val)) => {
39//! let _val: i32 = val.value().unwrap();
40//! }
41//! Some((1, val)) => {
42//! let _val: String = val.value().unwrap();
43//! }
44//! Some(_) => panic!("unexpected key"),
45//! None => break,
46//! }
47//! }
48//! });
49//!```
50
51use futures::stream::{Stream, StreamExt};
52use std::any::Any;
53use std::borrow::Borrow;
54use std::pin::Pin;
55use std::task::{Context, Poll};
56
57/// Combines streams with different output types into one.
58pub struct StreamMapAny<K> {
59 streams: Vec<(K, BoxedStream)>,
60 last_position: usize,
61}
62
63/// Newtype around a Boxed Any.
64#[derive(Debug)]
65pub struct StreamMapAnyVariant(Box<dyn Any>);
66
67struct BoxedStream(Box<dyn Stream<Item = Box<dyn Any>> + Unpin>);
68
69impl<K> StreamMapAny<K> {
70 pub const fn new() -> Self {
71 Self {
72 streams: Vec::new(),
73 last_position: 0,
74 }
75 }
76
77 /// Insert a new stream into the map with a given key.
78 ///
79 /// If that key is already in use by another stream, that stream will get dropped.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use futures::stream;
85 /// use stream_map_any::StreamMapAny;
86 ///
87 /// let mut map = StreamMapAny::new();
88 ///
89 /// map.insert("foo", stream::iter(vec![1]));
90 /// ```
91 pub fn insert<S>(&mut self, key: K, stream: S)
92 where
93 S: Stream + Unpin + 'static,
94 S::Item: Any,
95 K: Eq,
96 {
97 // if there already is a stream with this key, remove it first
98 self.remove(&key);
99
100 let boxed = BoxedStream::new(stream);
101 self.streams.push((key, boxed));
102 }
103
104 /// Remove a stream from the map with a given key.
105 ///
106 /// The stream will get dropped.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use futures::stream;
112 /// use stream_map_any::StreamMapAny;
113 ///
114 /// let mut map = StreamMapAny::new();
115 ///
116 /// map.insert(1, stream::iter(vec![1]));
117 /// assert_eq!(map.contains_key(&1), true);
118 ///
119 /// map.remove(&1);
120 /// assert_eq!(map.contains_key(&1), false);
121 /// ```
122 pub fn remove<Q>(&mut self, k: &Q)
123 where
124 K: Borrow<Q>,
125 Q: Eq,
126 {
127 for i in 0..self.streams.len() {
128 if self.streams[i].0.borrow() == k {
129 self.streams.swap_remove(i);
130 }
131 }
132 }
133
134 /// Returns `true` if the map contains a stream for the specified key.
135 ///
136 /// # Examples
137 ///
138 /// ```
139 /// use futures::stream;
140 /// use stream_map_any::StreamMapAny;
141 ///
142 /// let mut map = StreamMapAny::new();
143 ///
144 /// map.insert(1, stream::iter(vec![1]));
145 /// assert_eq!(map.contains_key(&1), true);
146 /// assert_eq!(map.contains_key(&2), false);
147 /// ```
148 pub fn contains_key<Q>(&self, k: &Q) -> bool
149 where
150 K: Borrow<Q>,
151 Q: Eq,
152 {
153 for i in 0..self.streams.len() {
154 if self.streams[i].0.borrow() == k {
155 return true;
156 }
157 }
158
159 false
160 }
161}
162
163impl<K> StreamMapAny<K>
164where
165 K: Unpin + Clone,
166{
167 fn poll_streams(&mut self, cx: &mut Context) -> Poll<Option<(K, StreamMapAnyVariant)>> {
168 let start = self.last_position.wrapping_add(1) % self.streams.len();
169 let mut idx = start;
170 self.last_position = idx;
171
172 for _ in 0..self.streams.len() {
173 let (id, stream) = &mut self.streams[idx];
174
175 match Pin::new(stream).poll_next(cx) {
176 Poll::Ready(Some(data)) => {
177 return Poll::Ready(Some((id.clone(), StreamMapAnyVariant(data))));
178 }
179 Poll::Ready(None) => {
180 self.streams.swap_remove(idx);
181 if idx == self.streams.len() {
182 idx = 0;
183 } else if idx < start && start <= self.streams.len() {
184 idx = idx.wrapping_add(1) % self.streams.len();
185 }
186 }
187 Poll::Pending => {
188 idx = idx.wrapping_add(1) % self.streams.len();
189 }
190 }
191 }
192
193 if self.streams.is_empty() {
194 Poll::Ready(None)
195 } else {
196 Poll::Pending
197 }
198 }
199}
200
201impl<K> Stream for StreamMapAny<K>
202where
203 K: Unpin + Clone,
204{
205 type Item = (K, StreamMapAnyVariant);
206
207 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
208 if self.streams.is_empty() {
209 return Poll::Ready(None);
210 }
211
212 self.poll_streams(cx)
213 }
214}
215
216impl StreamMapAnyVariant {
217 pub fn new<T>(value: T) -> Self
218 where
219 T: Any,
220 {
221 Self(Box::new(value))
222 }
223
224 /// Retrieve the value if the type matches T.
225 ///
226 /// If it doesn't match, the variant will be returned as Err.
227 ///
228 /// # Examples
229 ///
230 /// ```
231 /// use stream_map_any::StreamMapAnyVariant;
232 ///
233 /// let variant = StreamMapAnyVariant::new(8u32);
234 /// let value = variant.value().unwrap();
235 /// assert_eq!(8u32, value);
236 ///
237 /// // call will fail here because it has the wrong type.
238 /// let variant = StreamMapAnyVariant::new(8u32);
239 /// let value: Result<String, _> = variant.value();
240 /// assert!(value.is_err());
241 /// ```
242 pub fn value<T>(self: Self) -> Result<T, Self>
243 where
244 T: Any,
245 {
246 self.0.downcast().map(|v| *v).map_err(Self)
247 }
248
249 /// Retrieve a boxed value if the type matches T.
250 ///
251 /// If it doesn't match, the variant will be returned as Err.
252 pub fn boxed_value<T>(self: Self) -> Result<Box<T>, Self>
253 where
254 T: Any,
255 {
256 self.0.downcast().map_err(Self)
257 }
258
259 /// Retrieve the containing boxed Any.
260 pub fn boxed_any(self: Self) -> Box<dyn Any> {
261 self.0
262 }
263}
264
265impl BoxedStream {
266 fn new<S>(s: S) -> Self
267 where
268 S: Stream + Unpin + 'static,
269 S::Item: Any,
270 {
271 let stream = s.map(|o| {
272 let v: Box<dyn Any> = Box::new(o);
273 v
274 });
275 Self(Box::new(stream))
276 }
277}
278
279impl Stream for BoxedStream {
280 type Item = Box<dyn Any>;
281 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
282 Pin::new(&mut *self.0).poll_next(cx)
283 }
284}