stream_map_any/
lib.rs

1//! Allow merging async Streams of different output type.
2//!
3//! It's very similar to Tokio's [`StreamMap`], except that it doesn't require the streams to have the
4//! same output type.
5//! This can be useful when you don't know what type of streams should be combined, acting as a
6//! runtime dynamic select.
7//!
8//! ## Not a zero-cost-abstraction
9//! Since we don't know what types of outputs the streams will generate, the generated output will
10//! be a `StreamMapAnyVariant`, a newtype around `Box<dyn Any>`. As a result, we rely on dynamic
11//! dispatching to transform it back into the desired output.
12//! Benching shows that it's __2x__ as slow as a [`StreamMap`] or a [`select`] macro implementation.
13//!
14//! [`StreamMap`]: https://docs.rs/tokio/*/tokio/stream/struct.StreamMap.html
15//! [`select`]: https://docs.rs/tokio/*/tokio/macro.select.html
16//!
17//! ## Example
18//!```
19//!# use futures::channel::mpsc::channel;
20//!# use futures::executor::block_on;
21//!# use futures::stream::{self, StreamExt};
22//!# use stream_map_any::StreamMapAny;
23//!
24//!let int_stream = stream::iter(vec![1; 10]);
25//!let (mut tx, rx) = channel::<String>(100);
26//!
27//!let mut merge = StreamMapAny::new();
28//!merge.insert(0, int_stream);
29//!merge.insert(1, rx);
30//!
31//! std::thread::spawn(move || {
32//!     tx.try_send("hello world".into()).unwrap();
33//! });
34//!
35//! block_on(async move {
36//!     loop {
37//!         match merge.next().await {
38//!             Some((0, val)) => {
39//!                 let _val: i32 = val.value().unwrap();
40//!             }
41//!             Some((1, val)) => {
42//!                 let _val: String = val.value().unwrap();
43//!             }
44//!             Some(_) => panic!("unexpected key"),
45//!             None => break,
46//!        }
47//!     }
48//! });
49//!```
50
51use futures::stream::{Stream, StreamExt};
52use std::any::Any;
53use std::borrow::Borrow;
54use std::pin::Pin;
55use std::task::{Context, Poll};
56
57/// Combines streams with different output types into one.
58pub struct StreamMapAny<K> {
59    streams: Vec<(K, BoxedStream)>,
60    last_position: usize,
61}
62
63/// Newtype around a Boxed Any.
64#[derive(Debug)]
65pub struct StreamMapAnyVariant(Box<dyn Any>);
66
67struct BoxedStream(Box<dyn Stream<Item = Box<dyn Any>> + Unpin>);
68
69impl<K> StreamMapAny<K> {
70    pub const fn new() -> Self {
71        Self {
72            streams: Vec::new(),
73            last_position: 0,
74        }
75    }
76
77    /// Insert a new stream into the map with a given key.
78    ///
79    /// If that key is already in use by another stream, that stream will get dropped.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use futures::stream;
85    /// use stream_map_any::StreamMapAny;
86    ///
87    /// let mut map = StreamMapAny::new();
88    ///
89    /// map.insert("foo", stream::iter(vec![1]));
90    /// ```
91    pub fn insert<S>(&mut self, key: K, stream: S)
92    where
93        S: Stream + Unpin + 'static,
94        S::Item: Any,
95        K: Eq,
96    {
97        // if there already is a stream with this key, remove it first
98        self.remove(&key);
99
100        let boxed = BoxedStream::new(stream);
101        self.streams.push((key, boxed));
102    }
103
104    /// Remove a stream from the map with a given key.
105    ///
106    /// The stream will get dropped.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use futures::stream;
112    /// use stream_map_any::StreamMapAny;
113    ///
114    /// let mut map = StreamMapAny::new();
115    ///
116    /// map.insert(1, stream::iter(vec![1]));
117    /// assert_eq!(map.contains_key(&1), true);
118    ///
119    /// map.remove(&1);
120    /// assert_eq!(map.contains_key(&1), false);
121    /// ```
122    pub fn remove<Q>(&mut self, k: &Q)
123    where
124        K: Borrow<Q>,
125        Q: Eq,
126    {
127        for i in 0..self.streams.len() {
128            if self.streams[i].0.borrow() == k {
129                self.streams.swap_remove(i);
130            }
131        }
132    }
133
134    /// Returns `true` if the map contains a stream for the specified key.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use futures::stream;
140    /// use stream_map_any::StreamMapAny;
141    ///
142    /// let mut map = StreamMapAny::new();
143    ///
144    /// map.insert(1, stream::iter(vec![1]));
145    /// assert_eq!(map.contains_key(&1), true);
146    /// assert_eq!(map.contains_key(&2), false);
147    /// ```
148    pub fn contains_key<Q>(&self, k: &Q) -> bool
149    where
150        K: Borrow<Q>,
151        Q: Eq,
152    {
153        for i in 0..self.streams.len() {
154            if self.streams[i].0.borrow() == k {
155                return true;
156            }
157        }
158
159        false
160    }
161}
162
163impl<K> StreamMapAny<K>
164where
165    K: Unpin + Clone,
166{
167    fn poll_streams(&mut self, cx: &mut Context) -> Poll<Option<(K, StreamMapAnyVariant)>> {
168        let start = self.last_position.wrapping_add(1) % self.streams.len();
169        let mut idx = start;
170        self.last_position = idx;
171
172        for _ in 0..self.streams.len() {
173            let (id, stream) = &mut self.streams[idx];
174
175            match Pin::new(stream).poll_next(cx) {
176                Poll::Ready(Some(data)) => {
177                    return Poll::Ready(Some((id.clone(), StreamMapAnyVariant(data))));
178                }
179                Poll::Ready(None) => {
180                    self.streams.swap_remove(idx);
181                    if idx == self.streams.len() {
182                        idx = 0;
183                    } else if idx < start && start <= self.streams.len() {
184                        idx = idx.wrapping_add(1) % self.streams.len();
185                    }
186                }
187                Poll::Pending => {
188                    idx = idx.wrapping_add(1) % self.streams.len();
189                }
190            }
191        }
192
193        if self.streams.is_empty() {
194            Poll::Ready(None)
195        } else {
196            Poll::Pending
197        }
198    }
199}
200
201impl<K> Stream for StreamMapAny<K>
202where
203    K: Unpin + Clone,
204{
205    type Item = (K, StreamMapAnyVariant);
206
207    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
208        if self.streams.is_empty() {
209            return Poll::Ready(None);
210        }
211
212        self.poll_streams(cx)
213    }
214}
215
216impl StreamMapAnyVariant {
217    pub fn new<T>(value: T) -> Self
218    where
219        T: Any,
220    {
221        Self(Box::new(value))
222    }
223
224    /// Retrieve the value if the type matches T.
225    ///
226    /// If it doesn't match, the variant will be returned as Err.
227    ///
228    /// # Examples
229    ///
230    /// ```
231    /// use stream_map_any::StreamMapAnyVariant;
232    ///
233    /// let variant = StreamMapAnyVariant::new(8u32);
234    /// let value = variant.value().unwrap();
235    /// assert_eq!(8u32, value);
236    ///
237    /// // call will fail here because it has the wrong type.
238    /// let variant = StreamMapAnyVariant::new(8u32);
239    /// let value: Result<String, _> = variant.value();
240    /// assert!(value.is_err());
241    /// ```
242    pub fn value<T>(self: Self) -> Result<T, Self>
243    where
244        T: Any,
245    {
246        self.0.downcast().map(|v| *v).map_err(Self)
247    }
248
249    /// Retrieve a boxed value if the type matches T.
250    ///
251    /// If it doesn't match, the variant will be returned as Err.
252    pub fn boxed_value<T>(self: Self) -> Result<Box<T>, Self>
253    where
254        T: Any,
255    {
256        self.0.downcast().map_err(Self)
257    }
258
259    /// Retrieve the containing boxed Any.
260    pub fn boxed_any(self: Self) -> Box<dyn Any> {
261        self.0
262    }
263}
264
265impl BoxedStream {
266    fn new<S>(s: S) -> Self
267    where
268        S: Stream + Unpin + 'static,
269        S::Item: Any,
270    {
271        let stream = s.map(|o| {
272            let v: Box<dyn Any> = Box::new(o);
273            v
274        });
275        Self(Box::new(stream))
276    }
277}
278
279impl Stream for BoxedStream {
280    type Item = Box<dyn Any>;
281    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
282        Pin::new(&mut *self.0).poll_next(cx)
283    }
284}