[][src]Crate stream_multiplexer

This crate provides natural backpressure to classes of streams.

Streams are gathered into 'channels' that can be polled via recv(). Channels are indpendent of each other and have their own backpressure.


With a TCP server you may have two different classes of connections: Authenticated and Unauthenticated. By grouping each class of connection into it's own channel, you can favor the Authenticated connections over the Unauthenticated. This would provide a better experience for those that have been able to authenticate.

Code Example

use futures_util::stream::StreamExt;
use tokio_util::compat::*;

smol::block_on(async move {
    const CHANNEL_ONE: usize = 1;
    const CHANNEL_TWO: usize = 2;

    // Initialize a multiplexer
    let mut multiplexer = stream_multiplexer::Multiplexer::new();

    // Set up the recognized channels

    // Bind to a random port on localhost
    let listener = std::net::TcpListener::bind("").unwrap();
    let local_addr = listener.local_addr().unwrap();

    // Set up a task to add incoming connections into multiplexer
    let mut incoming_multiplexer = multiplexer.clone();
    smol::Task::spawn(async move {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let stream = async_io::Async::new(stream).unwrap();
                    let codec = tokio_util::codec::LinesCodec::new();
                    let framed = tokio_util::codec::Framed::new(stream.compat(), codec);
                    let (sink, stream) = framed.split();
                    let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE);
                Err(_) => unimplemented!()

    // test clients to put into channels
    let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap();
    let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap();

    let mut multiplexer_ch_1 = multiplexer.clone();

    // Simple server that echos the data back to the stream and moves the stream to channel 2.
    smol::Task::spawn(async move {
        while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await {
            match message {
                Some(Ok(data)) => {
                    // echo the data back and move it to channel 2
                    multiplexer_ch_1.send(vec![stream_id], data);
                    multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO);
                Some(Err(err)) => {
                    // the stream had an error
                None => {
                    // stream_id has been dropped





Errors returned by Multiplexer.

Type Definitions


Used when registering channels with Multiplexer.


A value returned by Multiplexer when a stream pair is added.