[][src]Crate stream_multiplexer

This crate provides stream multiplexing with channels.

Channels have their own backpressure that does not affect other channels.

Incoming streams are by default set to channel 0 and can be moved to other channels via ControlMessages.

use bytes::Bytes;
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use stream_multiplexer::{Multiplexer, HalvesStream, ControlMessage, IncomingPacket, OutgoingPacket};
use futures::stream::StreamExt;

// 3 channels of incoming streams, 0 is the channel that new streams join.
// Backpressure is per channel. Streams can be moved between channels by
// sending an OutgoingPacket::ChangeChannel message.
let (channel0_tx, mut channel0_rx) = mpsc::channel(32);
let (channel1_tx, mut channel1_rx) = mpsc::channel(32);
let (channel2_tx, mut channel2_rx) = mpsc::channel(32);

// A Stream for outgoing messages.
let (mut outgoing_tx, outgoing_rx) = mpsc::channel::<OutgoingPacket<Bytes>>(32);

// Construct the multiplexer, giving it the OutgoingPacket stream, and a vector of incoming
// streams. The backlog controls how much of an internal buffer each WriteHalf (TcpSocket in this example) can have.
let outgoing_streams_backlog = 128;
let multiplexer = Multiplexer::new(
    outgoing_streams_backlog,
    outgoing_rx,
    vec![channel0_tx, channel1_tx, channel2_tx],
);

// Bind to a random port on localhost
let socket = TcpListener::bind("127.0.0.1:0").await?;

let local_addr = socket.local_addr()?;

// Use the HalvesStream utility struct to map the stream of new sockets.
// It will use LengthDelimitedCodec with 2 bytes as the packet size.
let halves = HalvesStream::new(socket, 2);

// Control channel for shutting down the multiplexer
let (control_write, control_read) = mpsc::unbounded_channel();
let mp_joinhandle = tokio::task::spawn(multiplexer.run(halves, control_read));

// Make a test connection:
let mut client = tokio::net::TcpStream::connect(local_addr).await?;

// Listen for the channel join announcement
let message = channel0_rx.recv().await.expect("Should have connected.");
matches::assert_matches!(message, IncomingPacket::StreamConnected(_));

// Send 'a message'
let mut data = Bytes::from("\x00\x09a message");
client.write_buf(&mut data).await?;
client.flush();

// Receive 'a message' on channel 0
let incoming_packet = channel0_rx.recv().await.unwrap();
assert_eq!(
    incoming_packet
        .value()
        .expect("should have a value")
        .as_ref()
        .unwrap(),
    &Bytes::from("a message")
);

// Move the client to channel 1
outgoing_tx
    .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 1))
    .await?;

// Listen for the channel join announcement
let message = channel1_rx.recv().await.expect("Should have connected.");
matches::assert_matches!(message, IncomingPacket::StreamConnected(_));

// Send 'a message' again, on channel 1 this time.
let mut data = Bytes::from("\x00\x09a message");
client.write_buf(&mut data).await?;
client.flush();

// Receive 'a message' on channel 1
let incoming_packet = channel1_rx.recv().await.unwrap();
assert_eq!(
    incoming_packet
        .value()
        .expect("should have a value")
        .as_ref()
        .unwrap(),
    &Bytes::from("a message")
);

// Move the client to channel 2
outgoing_tx
    .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 2))
    .await?;

// Listen for the channel join announcement
let message = channel2_rx.recv().await.expect("Should have connected.");
matches::assert_matches!(message, IncomingPacket::StreamConnected(_));

// Send 'a message' again, on channel 2 this time.
let mut data = Bytes::from("\x00\x09a message");
client.write_buf(&mut data).await?;
client.flush();

// Receive 'a message' on channel 2
let incoming_packet = channel2_rx.recv().await.unwrap();
assert_eq!(
    incoming_packet
        .value()
        .expect("should have a value")
        .as_ref()
        .unwrap(),
    &Bytes::from("a message")
);

// Tell multiplexer to shut down
control_write.send(ControlMessage::Shutdown)?;

mp_joinhandle.await.unwrap();

Structs

HalvesStream

Takes a Stream<Item=AsyncRead + AsyncWrite> and provides a Stream<Item=( FramedWrite<WriteHalf, LengthDelimitedCodec>, FramedRead<ReadHalf, LengthDelimitedCodec>)>

IncomingMessage

Produced by the incoming stream

IncrementIdGen

Generates IDs for incoming streams. Is the default IdGen for MultiplexerSenders. This implementation simply increments and wraps at the usize boundary.

Iter

An iterator for Stream IDs and Values

Multiplexer

Manages incoming streams of data and the enqueueing of outgoing data.

OutgoingMessage

The payload of an OutgoingPacket

Enums

ControlMessage

To control the multiplexer, ControlMessage can be sent to the control channel passed into Multiplexer.run()

DisconnectReason

The reason why a stream was removed from a channel.

IncomingPacket

A packet representing a message from a stream.

MultiplexerError

A collection of errors that can be returned.

OutgoingPacket

For sending a message or causing the stream to change to a different channel

Traits

IdGen

Provided to MultiplexerSenders to override the default incrementing generator