[−][src]Crate stream_multiplexer
This crate provides stream multiplexing with channels.
Channels have their own backpressure that does not affect other channels.
Incoming streams are by default set to channel 0 and can be moved to other channels via ControlMessage
s.
use bytes::Bytes; use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc; use stream_multiplexer::{Multiplexer, HalvesStream, ControlMessage, IncomingPacket, OutgoingPacket}; use futures::stream::StreamExt; // 3 channels of incoming streams, 0 is the channel that new streams join. // Backpressure is per channel. Streams can be moved between channels by // sending an OutgoingPacket::ChangeChannel message. let (channel0_tx, mut channel0_rx) = mpsc::channel(32); let (channel1_tx, mut channel1_rx) = mpsc::channel(32); let (channel2_tx, mut channel2_rx) = mpsc::channel(32); // A Stream for outgoing messages. let (mut outgoing_tx, outgoing_rx) = mpsc::channel::<OutgoingPacket<Bytes>>(32); // Construct the multiplexer, giving it the OutgoingPacket stream, and a vector of incoming // streams. The backlog controls how much of an internal buffer each WriteHalf (TcpSocket in this example) can have. let outgoing_streams_backlog = 128; let multiplexer = Multiplexer::new( outgoing_streams_backlog, outgoing_rx, vec![channel0_tx, channel1_tx, channel2_tx], ); // Bind to a random port on localhost let socket = TcpListener::bind("127.0.0.1:0").await?; let local_addr = socket.local_addr()?; // Use the HalvesStream utility struct to map the stream of new sockets. // It will use LengthDelimitedCodec with 2 bytes as the packet size. let halves = HalvesStream::new(socket, 2); // Control channel for shutting down the multiplexer let (control_write, control_read) = mpsc::unbounded_channel(); let mp_joinhandle = tokio::task::spawn(multiplexer.run(halves, control_read)); // Make a test connection: let mut client = tokio::net::TcpStream::connect(local_addr).await?; // Send 'a message' let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 0 let incoming_packet = channel0_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Move the client to channel 1 outgoing_tx .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 1)) .await?; // Send 'a message' again, on channel 1 this time. let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 1 let incoming_packet = channel1_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Move the client to channel 2 outgoing_tx .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 2)) .await?; // Send 'a message' again, on channel 2 this time. let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 2 let incoming_packet = channel2_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Tell multiplexer to shut down control_write.send(ControlMessage::Shutdown)?; mp_joinhandle.await.unwrap();
Structs
HalvesStream | Takes a Stream<Item=AsyncRead + AsyncWrite> and provides a Stream<Item=( FramedWrite<WriteHalf, LengthDelimitedCodec>, FramedRead<ReadHalf, LengthDelimitedCodec>)> |
IncomingMessage | Produced by the incoming stream |
IncrementIdGen | Generates IDs for incoming streams. Is the default |
Multiplexer | Manages incoming streams of data and the enqueueing of outgoing data. |
OutgoingMessage | The payload of an OutgoingPacket |
Enums
ControlMessage | To control the multiplexer, |
IncomingPacket | A packet representing a message from a stream. |
MultiplexerError | A collection of errors that can be returned. |
OutgoingPacket | For sending a message or causing the stream to change to a different channel |
Traits
IdGen | Provided to MultiplexerSenders to override the default incrementing generator |