Skip to main content

stream_router/
lib.rs

1use futures::future::Future;
2use futures::sink::{Sink, SinkExt};
3use futures::stream::Stream;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9mod rand;
10mod tagger;
11
12enum StreamState {
13    StreamActive,
14    TaggerActive,
15    SinkPending,
16    SinkActive,
17    SinkFlushing,
18}
19
20struct StreamManager<F, A, T> {
21    tagger: tagger::StreamTagger<F, A>,
22    state: StreamState,
23    pending_sink_tag: Option<T>,
24    pending_item: Option<A>,
25    stream: Box<dyn Stream<Item = A> + Unpin>,
26}
27
28impl<F, A, T> StreamManager<F, A, T> {
29    fn new(
30        tagger: tagger::StreamTagger<F, A>,
31        stream: Box<dyn Stream<Item = A> + Unpin>,
32    ) -> StreamManager<F, A, T> {
33        StreamManager {
34            tagger,
35            state: StreamState::StreamActive,
36            pending_sink_tag: None,
37            pending_item: None,
38            stream,
39        }
40    }
41}
42
43/// The core Struct of this crate that is capable of dynamically routing values between
44/// [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) and [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html).
45///
46/// A `StreamRouter` is at it's core a [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
47/// that can take ownership of any number of other [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
48/// and any number of [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) and dynamically route
49/// values yielded from the [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) to any one of the
50/// provided [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) through user-defined routing rules.
51///
52/// Each [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) provided to the `StreamRouter`
53/// is tagged with a user-defined [`Hash`able](https://doc.rust-lang.org/std/hash/trait.Hash.html) value.
54/// This tag is utilized by the router to identify and differentiate [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
55/// and is what the user will utilize to reference a specific [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
56/// when defining the routing logic.
57///
58/// Each [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) is provided with a matching closure
59/// that consumes the values yielded by the accompanying [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
60/// and returns a [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) that will resolve to one of the tags
61/// identifying a specific [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) that the yielded value will be
62/// forwarded to. If no [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) is found for the returned routing tag
63/// the value will be yielded from the `StreamRouter` itself.
64///
65/// The `StreamRouter` makes the guarantee that order will be preserved for values yielded from [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
66/// "A" and sent to [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) "B" such that "A" will not attempt to sink any values into "B" until all
67/// previous values from "A" sent to "B" have been processed. There are no cross-Stream or cross-Sink timing or ordering guarentees.
68///
69/// # Example
70///
71/// The following example is [`simple.rs`](https://github.com/BroderickCarlin/stream_router/blob/master/examples/simple.rs)
72/// from the [examples](https://github.com/BroderickCarlin/stream_router/tree/master/examples) folder. This simple example
73/// illustrates the `StreamRouter` forwarding all even values to the `even_chan_tx` while all odd numbers are yielded by
74/// the `StreamRouter` itself. A user could decide to provide a second [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
75/// to explicitly consume odd values if desired, in which case the `StreamRouter` would never yield any values itself.
76///
77///
78/// ```should_panic
79/// use futures::{channel::mpsc, future, stream, stream::StreamExt};
80/// use tokio;
81///
82/// #[tokio::main]
83/// async fn main() {
84///     let mut router = stream_router::StreamRouter::new();
85///     let nums = stream::iter(0..1_000);
86///     let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);
87///
88///     router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
89///     router.add_sink(even_chan_tx, true);
90///
91///     loop {
92///         tokio::select! {
93///             v = router.next() => {
94///                 println!("odd number:  {:?}", v.unwrap());
95///             }
96///             v = even_chan_rx.next() => {
97///                 println!("even number: {:?}", v.unwrap());
98///             }
99///         }
100///     }
101/// }
102/// ```
103///
104/// # Routing Logic
105///
106/// The `StreamRouter`'s routing logic is provided by the user in the form of closures that can map values yielded by
107/// a specific [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) into tags that identify
108/// specific [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html). These closures follow the form of
109/// `Fn(A) -> Future<Output = (A, T)>` where `A` is a value yielded by the [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
110/// and where `T` is a tag that the user has assigned to one of their [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html).
111/// It should be noted that the closure takes ownership of the values yielded by the stream and is responsible for also
112/// returning the values as part of the tuple that contains the  [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) tag.
113/// This is done to avoid the need to `clone()` each value but also allows the user to potentially "map" the values if
114/// beneficial to their specific use-case. While simple routing (such as shown above) has no real need to utilize the flexibility provided by returning a
115/// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html), the option to return a
116/// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) allows for more complex state-ful routing.
117/// An example of utilizing state-ful routing to dedup an incoming [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
118/// can be found in the [`dedup.rs`](https://github.com/BroderickCarlin/stream_router/blob/master/examples/dedup.rs) example.
119pub struct StreamRouter<F, T, A>
120where
121    T: Hash + Eq,
122{
123    streams: Vec<StreamManager<F, A, T>>,
124    sinks: HashMap<T, (usize, Box<dyn Sink<A, Error = ()> + Unpin>)>,
125}
126
127impl<F, T, A> StreamRouter<F, T, A>
128where
129    T: Hash + Eq,
130{
131    /// Creates a new instance of a `StreamRouter`
132    pub fn new() -> StreamRouter<F, T, A> {
133        StreamRouter {
134            streams: vec![],
135            sinks: HashMap::new(),
136        }
137    }
138
139    /// Adds a new [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) to
140    /// the `StreamRouter` and provides the routing function that will be utilized to assign a
141    /// tag to each value yielded by the [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html).
142    /// This tag will determine which [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html), if any, the
143    /// value will be forwarded to.
144    ///
145    /// The routing function follows the form: `Fn(A) -> Future<Output = (A, T)>` where `A` is a value yielded by the
146    /// [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) and where `T` is a tag that the user
147    /// has assigned to one of their [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html). The returned
148    /// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) could be as simple as
149    /// [`future::ready(tag)`](https://docs.rs/futures/0.3.4/futures/future/fn.ready.html) or a more complex `async` block
150    /// such as:
151    /// ```ignore
152    /// async move {
153    ///     let a = b.await;
154    ///     let c = a.await;
155    ///     c.await
156    /// }.boxed()
157    /// ```
158    pub fn add_source<S, M>(&mut self, stream: S, transform: M)
159    where
160        S: Stream<Item = A> + Unpin + 'static,
161        M: Fn(A) -> F + 'static,
162        F: Future<Output = (A, T)>,
163    {
164        let tagger = tagger::StreamTagger::new(Box::new(transform));
165        self.streams
166            .push(StreamManager::new(tagger, Box::new(stream)));
167    }
168
169    /// Adds a new [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) to the
170    /// `StreamRouter` and provides the tag that will be used to identify the [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
171    /// from within the user-provided routing logic. Tags are intentionally as flexible as possible and
172    /// only have a couple limitations:
173    /// - All tags have to be the same base type
174    /// - Tags have to implement [`Hash`](https://doc.rust-lang.org/std/hash/trait.Hash.html)
175    /// - Tags have to implement [`Eq`](https://doc.rust-lang.org/std/cmp/trait.Eq.html)
176    /// - Tags have to implement [`Unpin`](https://doc.rust-lang.org/std/marker/trait.Unpin.html)
177    ///
178    /// Luckily, most of the base types within the Rust std library implement all these. A non-exhaustive list of some built-in types
179    /// that can be used:
180    /// - Numerics (`bool`, `u8`, `u16`, `usize`, etc.)
181    /// - `Ipv4Addr`/`Ipv6Addr`
182    /// - `String`/`&'static str`
183    ///
184    /// But there is also no reason a custom type couldn't be used as long as it meets the above requirements!
185    /// For example, the following could be used:
186    /// ```
187    /// #[derive(Hash, Eq, PartialEq)]
188    /// enum Color {
189    ///     Red,
190    ///     Green,
191    ///     Blue,
192    /// }
193    /// ```
194    pub fn add_sink<S>(&mut self, sink: S, tag: T)
195    where
196        S: Sink<A> + Unpin + Sized + 'static,
197    {
198        self.sinks
199            .insert(tag, (0, Box::new(sink.sink_map_err(|_| ()))));
200    }
201}
202
203impl<F, T, A> StreamRouter<F, T, A>
204where
205    F: Future<Output = (A, T)> + Unpin,
206    T: Hash + Eq + Unpin,
207    A: Unpin,
208{
209    fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<A>> {
210        use Poll::*;
211
212        let start = rand::thread_rng_n(self.streams.len() as u32) as usize;
213        let mut idx = start;
214
215        'outterLoop: for _ in 0..self.streams.len() {
216            'innerLoop: loop {
217                match self.streams[idx].state {
218                    StreamState::StreamActive => {
219                        match Pin::new(&mut self.streams[idx].stream).poll_next(cx) {
220                            Ready(Some(val)) => {
221                                self.streams[idx].state = StreamState::TaggerActive;
222                                self.streams[idx].tagger.start_map(val);
223                                continue 'innerLoop;
224                            }
225                            Ready(None) => {
226                                self.streams.swap_remove(idx);
227                                continue 'outterLoop;
228                            }
229                            Pending => {
230                                break 'innerLoop;
231                            }
232                        }
233                    }
234                    StreamState::TaggerActive => {
235                        match Pin::new(&mut self.streams[idx].tagger).poll(cx) {
236                            Ready((val, tag)) => {
237                                if let Some((ref_count, _sink)) = self.sinks.get_mut(&tag) {
238                                    // We have a sink for this val!
239                                    self.streams[idx].pending_sink_tag = Some(tag);
240                                    self.streams[idx].pending_item = Some(val);
241                                    if *ref_count == 0 {
242                                        // Nobody is currently sinking items, so we need to setup the sink
243                                        self.streams[idx].state = StreamState::SinkPending;
244                                        continue 'innerLoop;
245                                    } else {
246                                        self.streams[idx].state = StreamState::SinkActive;
247                                        *ref_count += 1;
248                                        continue 'innerLoop;
249                                    }
250                                } else {
251                                    // We do not have a sink for this, yield it from us!
252                                    self.streams[idx].state = StreamState::StreamActive;
253                                    return Ready(Some(val));
254                                }
255                            }
256                            Pending => {
257                                break 'innerLoop;
258                            }
259                        }
260                    }
261                    StreamState::SinkPending => {
262                        let tag = self.streams[idx].pending_sink_tag.take().unwrap();
263                        if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
264                            if *ref_count != 0 {
265                                // Another stream is actively sending to this sink
266                                // so we can just immedietly start sinking
267                                self.streams[idx].pending_sink_tag = Some(tag);
268                                self.streams[idx].state = StreamState::SinkActive;
269                                *ref_count += 1;
270                                continue 'innerLoop;
271                            }
272
273                            match Pin::new(sink).poll_ready(cx) {
274                                Ready(Ok(())) => {
275                                    self.streams[idx].pending_sink_tag = Some(tag);
276                                    self.streams[idx].state = StreamState::SinkActive;
277                                    *ref_count += 1;
278                                    continue 'innerLoop;
279                                }
280                                Ready(Err(_)) => {
281                                    // TODO: properly handle sink errors as the sink is most likely dead
282                                    self.streams[idx].pending_item = None;
283                                    self.streams[idx].state = StreamState::StreamActive;
284                                    break 'innerLoop;
285                                }
286                                Pending => {
287                                    self.streams[idx].pending_sink_tag = Some(tag);
288                                    break 'innerLoop;
289                                }
290                            }
291                        } else {
292                            // The sink we were going to send to is no longer active
293                            // so we will drop the value
294                            self.streams[idx].state = StreamState::StreamActive;
295                            break 'innerLoop;
296                        }
297                    }
298                    StreamState::SinkActive => {
299                        let tag = self.streams[idx].pending_sink_tag.take().unwrap();
300                        if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
301                            if Pin::new(sink)
302                                .start_send(self.streams[idx].pending_item.take().unwrap())
303                                .is_ok()
304                            {
305                                self.streams[idx].pending_sink_tag = Some(tag);
306                                self.streams[idx].state = StreamState::SinkFlushing;
307                                continue 'innerLoop;
308                            } else {
309                                // TODO: properly handle sink errors as the sink is most likely dead
310                                self.streams[idx].state = StreamState::StreamActive;
311                                *ref_count -= 1;
312                                break 'innerLoop;
313                            }
314                        }
315                    }
316                    StreamState::SinkFlushing => {
317                        let tag = self.streams[idx].pending_sink_tag.take().unwrap();
318                        if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
319                            if *ref_count > 1 {
320                                // Someone else is sinking to this sink, so don't flush yet
321                                *ref_count -= 1;
322                                self.streams[idx].state = StreamState::StreamActive;
323                                continue 'innerLoop;
324                            } else {
325                                // We are the last person trying to sink here! So flush away
326                                match Pin::new(sink).poll_flush(cx) {
327                                    Ready(Ok(())) => {
328                                        self.streams[idx].state = StreamState::StreamActive;
329                                        *ref_count -= 1;
330                                        continue 'innerLoop;
331                                    }
332                                    Ready(Err(_)) => {
333                                        // TODO: properly handle sink errors as the sink is most likely dead
334                                        self.streams[idx].state = StreamState::StreamActive;
335                                        *ref_count -= 1;
336                                        continue 'innerLoop;
337                                    }
338                                    Pending => {
339                                        self.streams[idx].pending_sink_tag = Some(tag);
340                                        break 'innerLoop;
341                                    }
342                                }
343                            }
344                        }
345                    }
346                }
347            }
348
349            idx = idx.wrapping_add(1) % self.streams.len();
350        }
351
352        // If the map is empty, then the stream is complete.
353        if self.streams.is_empty() {
354            Ready(None)
355        } else {
356            Pending
357        }
358    }
359}
360
361#[must_use = "streams do nothing unless you `.await` or poll them"]
362impl<F, T, A> Stream for StreamRouter<F, T, A>
363where
364    F: Future<Output = (A, T)> + Unpin,
365    T: Hash + Eq + Unpin,
366    A: Unpin,
367{
368    type Item = A;
369
370    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371        match self.poll_next_entry(cx) {
372            Poll::Ready(Some(val)) => Poll::Ready(Some(val)),
373            Poll::Ready(None) => Poll::Ready(None),
374            Poll::Pending => Poll::Pending,
375        }
376    }
377
378    fn size_hint(&self) -> (usize, Option<usize>) {
379        let mut ret = (0, Some(0));
380
381        for stream_manager in &self.streams {
382            let hint = stream_manager.stream.size_hint();
383
384            ret.0 += hint.0;
385
386            match (ret.1, hint.1) {
387                (Some(a), Some(b)) => ret.1 = Some(a + b),
388                (Some(_), None) => ret.1 = None,
389                _ => {}
390            }
391        }
392
393        ret
394    }
395}