stream_router/lib.rs
1use futures::future::Future;
2use futures::sink::{Sink, SinkExt};
3use futures::stream::Stream;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9mod rand;
10mod tagger;
11
12enum StreamState {
13 StreamActive,
14 TaggerActive,
15 SinkPending,
16 SinkActive,
17 SinkFlushing,
18}
19
20struct StreamManager<F, A, T> {
21 tagger: tagger::StreamTagger<F, A>,
22 state: StreamState,
23 pending_sink_tag: Option<T>,
24 pending_item: Option<A>,
25 stream: Box<dyn Stream<Item = A> + Unpin>,
26}
27
28impl<F, A, T> StreamManager<F, A, T> {
29 fn new(
30 tagger: tagger::StreamTagger<F, A>,
31 stream: Box<dyn Stream<Item = A> + Unpin>,
32 ) -> StreamManager<F, A, T> {
33 StreamManager {
34 tagger,
35 state: StreamState::StreamActive,
36 pending_sink_tag: None,
37 pending_item: None,
38 stream,
39 }
40 }
41}
42
43/// The core Struct of this crate that is capable of dynamically routing values between
44/// [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) and [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html).
45///
46/// A `StreamRouter` is at it's core a [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
47/// that can take ownership of any number of other [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
48/// and any number of [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) and dynamically route
49/// values yielded from the [`Stream`s](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) to any one of the
50/// provided [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) through user-defined routing rules.
51///
52/// Each [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) provided to the `StreamRouter`
53/// is tagged with a user-defined [`Hash`able](https://doc.rust-lang.org/std/hash/trait.Hash.html) value.
54/// This tag is utilized by the router to identify and differentiate [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
55/// and is what the user will utilize to reference a specific [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
56/// when defining the routing logic.
57///
58/// Each [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) is provided with a matching closure
59/// that consumes the values yielded by the accompanying [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
60/// and returns a [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) that will resolve to one of the tags
61/// identifying a specific [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) that the yielded value will be
62/// forwarded to. If no [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) is found for the returned routing tag
63/// the value will be yielded from the `StreamRouter` itself.
64///
65/// The `StreamRouter` makes the guarantee that order will be preserved for values yielded from [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
66/// "A" and sent to [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) "B" such that "A" will not attempt to sink any values into "B" until all
67/// previous values from "A" sent to "B" have been processed. There are no cross-Stream or cross-Sink timing or ordering guarentees.
68///
69/// # Example
70///
71/// The following example is [`simple.rs`](https://github.com/BroderickCarlin/stream_router/blob/master/examples/simple.rs)
72/// from the [examples](https://github.com/BroderickCarlin/stream_router/tree/master/examples) folder. This simple example
73/// illustrates the `StreamRouter` forwarding all even values to the `even_chan_tx` while all odd numbers are yielded by
74/// the `StreamRouter` itself. A user could decide to provide a second [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
75/// to explicitly consume odd values if desired, in which case the `StreamRouter` would never yield any values itself.
76///
77///
78/// ```should_panic
79/// use futures::{channel::mpsc, future, stream, stream::StreamExt};
80/// use tokio;
81///
82/// #[tokio::main]
83/// async fn main() {
84/// let mut router = stream_router::StreamRouter::new();
85/// let nums = stream::iter(0..1_000);
86/// let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);
87///
88/// router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
89/// router.add_sink(even_chan_tx, true);
90///
91/// loop {
92/// tokio::select! {
93/// v = router.next() => {
94/// println!("odd number: {:?}", v.unwrap());
95/// }
96/// v = even_chan_rx.next() => {
97/// println!("even number: {:?}", v.unwrap());
98/// }
99/// }
100/// }
101/// }
102/// ```
103///
104/// # Routing Logic
105///
106/// The `StreamRouter`'s routing logic is provided by the user in the form of closures that can map values yielded by
107/// a specific [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) into tags that identify
108/// specific [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html). These closures follow the form of
109/// `Fn(A) -> Future<Output = (A, T)>` where `A` is a value yielded by the [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
110/// and where `T` is a tag that the user has assigned to one of their [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html).
111/// It should be noted that the closure takes ownership of the values yielded by the stream and is responsible for also
112/// returning the values as part of the tuple that contains the [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) tag.
113/// This is done to avoid the need to `clone()` each value but also allows the user to potentially "map" the values if
114/// beneficial to their specific use-case. While simple routing (such as shown above) has no real need to utilize the flexibility provided by returning a
115/// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html), the option to return a
116/// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) allows for more complex state-ful routing.
117/// An example of utilizing state-ful routing to dedup an incoming [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html)
118/// can be found in the [`dedup.rs`](https://github.com/BroderickCarlin/stream_router/blob/master/examples/dedup.rs) example.
119pub struct StreamRouter<F, T, A>
120where
121 T: Hash + Eq,
122{
123 streams: Vec<StreamManager<F, A, T>>,
124 sinks: HashMap<T, (usize, Box<dyn Sink<A, Error = ()> + Unpin>)>,
125}
126
127impl<F, T, A> StreamRouter<F, T, A>
128where
129 T: Hash + Eq,
130{
131 /// Creates a new instance of a `StreamRouter`
132 pub fn new() -> StreamRouter<F, T, A> {
133 StreamRouter {
134 streams: vec![],
135 sinks: HashMap::new(),
136 }
137 }
138
139 /// Adds a new [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) to
140 /// the `StreamRouter` and provides the routing function that will be utilized to assign a
141 /// tag to each value yielded by the [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html).
142 /// This tag will determine which [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html), if any, the
143 /// value will be forwarded to.
144 ///
145 /// The routing function follows the form: `Fn(A) -> Future<Output = (A, T)>` where `A` is a value yielded by the
146 /// [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) and where `T` is a tag that the user
147 /// has assigned to one of their [`Sink`s](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html). The returned
148 /// [`Future`](https://docs.rs/futures/0.3.4/futures/prelude/trait.Future.html) could be as simple as
149 /// [`future::ready(tag)`](https://docs.rs/futures/0.3.4/futures/future/fn.ready.html) or a more complex `async` block
150 /// such as:
151 /// ```ignore
152 /// async move {
153 /// let a = b.await;
154 /// let c = a.await;
155 /// c.await
156 /// }.boxed()
157 /// ```
158 pub fn add_source<S, M>(&mut self, stream: S, transform: M)
159 where
160 S: Stream<Item = A> + Unpin + 'static,
161 M: Fn(A) -> F + 'static,
162 F: Future<Output = (A, T)>,
163 {
164 let tagger = tagger::StreamTagger::new(Box::new(transform));
165 self.streams
166 .push(StreamManager::new(tagger, Box::new(stream)));
167 }
168
169 /// Adds a new [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html) to the
170 /// `StreamRouter` and provides the tag that will be used to identify the [`Sink`](https://docs.rs/futures/0.3.4/futures/sink/trait.Sink.html)
171 /// from within the user-provided routing logic. Tags are intentionally as flexible as possible and
172 /// only have a couple limitations:
173 /// - All tags have to be the same base type
174 /// - Tags have to implement [`Hash`](https://doc.rust-lang.org/std/hash/trait.Hash.html)
175 /// - Tags have to implement [`Eq`](https://doc.rust-lang.org/std/cmp/trait.Eq.html)
176 /// - Tags have to implement [`Unpin`](https://doc.rust-lang.org/std/marker/trait.Unpin.html)
177 ///
178 /// Luckily, most of the base types within the Rust std library implement all these. A non-exhaustive list of some built-in types
179 /// that can be used:
180 /// - Numerics (`bool`, `u8`, `u16`, `usize`, etc.)
181 /// - `Ipv4Addr`/`Ipv6Addr`
182 /// - `String`/`&'static str`
183 ///
184 /// But there is also no reason a custom type couldn't be used as long as it meets the above requirements!
185 /// For example, the following could be used:
186 /// ```
187 /// #[derive(Hash, Eq, PartialEq)]
188 /// enum Color {
189 /// Red,
190 /// Green,
191 /// Blue,
192 /// }
193 /// ```
194 pub fn add_sink<S>(&mut self, sink: S, tag: T)
195 where
196 S: Sink<A> + Unpin + Sized + 'static,
197 {
198 self.sinks
199 .insert(tag, (0, Box::new(sink.sink_map_err(|_| ()))));
200 }
201}
202
203impl<F, T, A> StreamRouter<F, T, A>
204where
205 F: Future<Output = (A, T)> + Unpin,
206 T: Hash + Eq + Unpin,
207 A: Unpin,
208{
209 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<A>> {
210 use Poll::*;
211
212 let start = rand::thread_rng_n(self.streams.len() as u32) as usize;
213 let mut idx = start;
214
215 'outterLoop: for _ in 0..self.streams.len() {
216 'innerLoop: loop {
217 match self.streams[idx].state {
218 StreamState::StreamActive => {
219 match Pin::new(&mut self.streams[idx].stream).poll_next(cx) {
220 Ready(Some(val)) => {
221 self.streams[idx].state = StreamState::TaggerActive;
222 self.streams[idx].tagger.start_map(val);
223 continue 'innerLoop;
224 }
225 Ready(None) => {
226 self.streams.swap_remove(idx);
227 continue 'outterLoop;
228 }
229 Pending => {
230 break 'innerLoop;
231 }
232 }
233 }
234 StreamState::TaggerActive => {
235 match Pin::new(&mut self.streams[idx].tagger).poll(cx) {
236 Ready((val, tag)) => {
237 if let Some((ref_count, _sink)) = self.sinks.get_mut(&tag) {
238 // We have a sink for this val!
239 self.streams[idx].pending_sink_tag = Some(tag);
240 self.streams[idx].pending_item = Some(val);
241 if *ref_count == 0 {
242 // Nobody is currently sinking items, so we need to setup the sink
243 self.streams[idx].state = StreamState::SinkPending;
244 continue 'innerLoop;
245 } else {
246 self.streams[idx].state = StreamState::SinkActive;
247 *ref_count += 1;
248 continue 'innerLoop;
249 }
250 } else {
251 // We do not have a sink for this, yield it from us!
252 self.streams[idx].state = StreamState::StreamActive;
253 return Ready(Some(val));
254 }
255 }
256 Pending => {
257 break 'innerLoop;
258 }
259 }
260 }
261 StreamState::SinkPending => {
262 let tag = self.streams[idx].pending_sink_tag.take().unwrap();
263 if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
264 if *ref_count != 0 {
265 // Another stream is actively sending to this sink
266 // so we can just immedietly start sinking
267 self.streams[idx].pending_sink_tag = Some(tag);
268 self.streams[idx].state = StreamState::SinkActive;
269 *ref_count += 1;
270 continue 'innerLoop;
271 }
272
273 match Pin::new(sink).poll_ready(cx) {
274 Ready(Ok(())) => {
275 self.streams[idx].pending_sink_tag = Some(tag);
276 self.streams[idx].state = StreamState::SinkActive;
277 *ref_count += 1;
278 continue 'innerLoop;
279 }
280 Ready(Err(_)) => {
281 // TODO: properly handle sink errors as the sink is most likely dead
282 self.streams[idx].pending_item = None;
283 self.streams[idx].state = StreamState::StreamActive;
284 break 'innerLoop;
285 }
286 Pending => {
287 self.streams[idx].pending_sink_tag = Some(tag);
288 break 'innerLoop;
289 }
290 }
291 } else {
292 // The sink we were going to send to is no longer active
293 // so we will drop the value
294 self.streams[idx].state = StreamState::StreamActive;
295 break 'innerLoop;
296 }
297 }
298 StreamState::SinkActive => {
299 let tag = self.streams[idx].pending_sink_tag.take().unwrap();
300 if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
301 if Pin::new(sink)
302 .start_send(self.streams[idx].pending_item.take().unwrap())
303 .is_ok()
304 {
305 self.streams[idx].pending_sink_tag = Some(tag);
306 self.streams[idx].state = StreamState::SinkFlushing;
307 continue 'innerLoop;
308 } else {
309 // TODO: properly handle sink errors as the sink is most likely dead
310 self.streams[idx].state = StreamState::StreamActive;
311 *ref_count -= 1;
312 break 'innerLoop;
313 }
314 }
315 }
316 StreamState::SinkFlushing => {
317 let tag = self.streams[idx].pending_sink_tag.take().unwrap();
318 if let Some((ref_count, sink)) = self.sinks.get_mut(&tag) {
319 if *ref_count > 1 {
320 // Someone else is sinking to this sink, so don't flush yet
321 *ref_count -= 1;
322 self.streams[idx].state = StreamState::StreamActive;
323 continue 'innerLoop;
324 } else {
325 // We are the last person trying to sink here! So flush away
326 match Pin::new(sink).poll_flush(cx) {
327 Ready(Ok(())) => {
328 self.streams[idx].state = StreamState::StreamActive;
329 *ref_count -= 1;
330 continue 'innerLoop;
331 }
332 Ready(Err(_)) => {
333 // TODO: properly handle sink errors as the sink is most likely dead
334 self.streams[idx].state = StreamState::StreamActive;
335 *ref_count -= 1;
336 continue 'innerLoop;
337 }
338 Pending => {
339 self.streams[idx].pending_sink_tag = Some(tag);
340 break 'innerLoop;
341 }
342 }
343 }
344 }
345 }
346 }
347 }
348
349 idx = idx.wrapping_add(1) % self.streams.len();
350 }
351
352 // If the map is empty, then the stream is complete.
353 if self.streams.is_empty() {
354 Ready(None)
355 } else {
356 Pending
357 }
358 }
359}
360
361#[must_use = "streams do nothing unless you `.await` or poll them"]
362impl<F, T, A> Stream for StreamRouter<F, T, A>
363where
364 F: Future<Output = (A, T)> + Unpin,
365 T: Hash + Eq + Unpin,
366 A: Unpin,
367{
368 type Item = A;
369
370 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 match self.poll_next_entry(cx) {
372 Poll::Ready(Some(val)) => Poll::Ready(Some(val)),
373 Poll::Ready(None) => Poll::Ready(None),
374 Poll::Pending => Poll::Pending,
375 }
376 }
377
378 fn size_hint(&self) -> (usize, Option<usize>) {
379 let mut ret = (0, Some(0));
380
381 for stream_manager in &self.streams {
382 let hint = stream_manager.stream.size_hint();
383
384 ret.0 += hint.0;
385
386 match (ret.1, hint.1) {
387 (Some(a), Some(b)) => ret.1 = Some(a + b),
388 (Some(_), None) => ret.1 = None,
389 _ => {}
390 }
391 }
392
393 ret
394 }
395}