[][src]Struct stream_router::StreamRouter

pub struct StreamRouter<F, T, A> where
    T: Hash + Eq
{ /* fields omitted */ }

The core Struct of this crate that is capable of dynamically routing values between Streams and Sinks.

A StreamRouter is at it's core a Stream that can take ownership of any number of other Streams and any number of Sinks and dynamically route values yielded from the Streams to any one of the provided Sinks through user-defined routing rules.

Each Sink provided to the StreamRouter is tagged with a user-defined Hashable value. This tag is utilized by the router to identify and differentiate Sinks and is what the user will utilize to reference a specific Sink when defining the routing logic.

Each Stream is provided with a matching closure that consumes the values yielded by the accompanying Stream and returns a Future that will resolve to one of the tags identifying a specific Sink that the yielded value will be forwarded to. If no Sink is found for the returned routing tag the value will be yielded from the StreamRouter itself.

The StreamRouter makes the guarantee that order will be preserved for values yielded from Stream "A" and sent to Sink "B" such that "A" will not attempt to sink any values into "B" until all previous values from "A" sent to "B" have been processed. There are no cross-Stream or cross-Sink timing or ordering guarentees.

Example

The following example is simple.rs from the examples folder. This simple example illustrates the StreamRouter forwarding all even values to the even_chan_tx while all odd numbers are yielded by the StreamRouter itself. A user could decide to provide a second Sink to explicitly consume odd values if desired, in which case the StreamRouter would never yield any values itself.

use futures::{channel::mpsc, future, stream, stream::StreamExt};
use tokio;

#[tokio::main]
async fn main() {
    let mut router = stream_router::StreamRouter::new();
    let nums = stream::iter(0..1_000);
    let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);

    router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
    router.add_sink(even_chan_tx, true);

    loop {
        tokio::select! {
            v = router.next() => {
                println!("odd number:  {:?}", v.unwrap());
            }
            v = even_chan_rx.next() => {
                println!("even number: {:?}", v.unwrap());
            }
        }
    }
}

Routing Logic

The StreamRouter's routing logic is provided by the user in the form of closures that can map values yielded by a specific Stream into tags that identify specific Sinks. These closures follow the form of Fn(A) -> Future<Output = (A, T)> where A is a value yielded by the Stream and where T is a tag that the user has assigned to one of their Sinks. It should be noted that the closure takes ownership of the values yielded by the stream and is responsible for also returning the values as part of the tuple that contains the Stream tag. This is done to avoid the need to clone() each value but also allows the user to potentially "map" the values if beneficial to their specific use-case. While simple routing (such as shown above) has no real need to utilize the flexibility provided by returning a Future, the option to return a Future allows for more complex state-ful routing. An example of utilizing state-ful routing to dedup an incoming Stream can be found in the dedup.rs example.

Methods

impl<F, T, A> StreamRouter<F, T, A> where
    T: Hash + Eq
[src]

pub fn new() -> StreamRouter<F, T, A>[src]

Creates a new instance of a StreamRouter

pub fn add_source<S, M>(&mut self, stream: S, transform: M) where
    S: Stream<Item = A> + Unpin + 'static,
    M: Fn(A) -> F + 'static,
    F: Future<Output = (A, T)>, 
[src]

Adds a new Stream to the StreamRouter and provides the routing function that will be utilized to assign a tag to each value yielded by the Stream. This tag will determine which Sink, if any, the value will be forwarded to.

The routing function follows the form: Fn(A) -> Future<Output = (A, T)> where A is a value yielded by the Stream and where T is a tag that the user has assigned to one of their Sinks. The returned Future could be as simple as future::ready(tag) or a more complex async block such as:

This example is not tested
async move {
    let a = b.await;
    let c = a.await;
    c.await
}.boxed()

pub fn add_sink<S>(&mut self, sink: S, tag: T) where
    S: Sink<A> + Unpin + Sized + 'static, 
[src]

Adds a new Sink to the StreamRouter and provides the tag that will be used to identify the Sink from within the user-provided routing logic. Tags are intentionally as flexible as possible and only have a couple limitations:

  • All tags have to be the same base type
  • Tags have to implement Hash
  • Tags have to implement Eq
  • Tags have to implement Unpin

Luckily, most of the base types within the Rust std library implement all these. A non-exhaustive list of some built-in types that can be used:

  • Numerics (bool, u8, u16, usize, etc.)
  • Ipv4Addr/Ipv6Addr
  • String/&'static str

But there is also no reason a custom type couldn't be used as long as it meets the above requirements! For example, the following could be used:

#[derive(Hash, Eq, PartialEq)]
enum Color {
    Red,
    Green,
    Blue,
}

Trait Implementations

impl<F, T, A> Stream for StreamRouter<F, T, A> where
    F: Future<Output = (A, T)> + Unpin,
    T: Hash + Eq + Unpin,
    A: Unpin
[src]

type Item = A

Values yielded by the stream.

Auto Trait Implementations

impl<F, T, A> !RefUnwindSafe for StreamRouter<F, T, A>

impl<F, T, A> !Send for StreamRouter<F, T, A>

impl<F, T, A> !Sync for StreamRouter<F, T, A>

impl<F, T, A> Unpin for StreamRouter<F, T, A> where
    A: Unpin,
    F: Unpin,
    T: Unpin

impl<F, T, A> !UnwindSafe for StreamRouter<F, T, A>

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> StreamExt for T where
    T: Stream + ?Sized
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<S, T, E> TryStream for S where
    S: Stream<Item = Result<T, E>> + ?Sized
[src]

type Ok = T

The type of successful values yielded by this future

type Error = E

The type of failures yielded by this future

impl<S> TryStreamExt for S where
    S: TryStream + ?Sized
[src]