[−][src]Crate stream_multiplexer
This crate provides natural backpressure to classes of streams.
Streams are gathered into 'channels' that can be polled via recv()
. Channels are indpendent
of each other and have their own backpressure.
Example
With a TCP server you may have two different classes of connections: Authenticated and Unauthenticated. By grouping each class of connection into it's own channel, you can favor the Authenticated connections over the Unauthenticated. This would provide a better experience for those that have been able to authenticate.
Code Example
use futures_util::stream::StreamExt; use tokio_util::compat::*; smol::block_on(async move { const CHANNEL_ONE: usize = 1; const CHANNEL_TWO: usize = 2; // Initialize a multiplexer let mut multiplexer = stream_multiplexer::Multiplexer::new(); // Set up the recognized channels multiplexer.add_channel(CHANNEL_ONE); multiplexer.add_channel(CHANNEL_TWO); // Bind to a random port on localhost let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = listener.local_addr().unwrap(); // Set up a task to add incoming connections into multiplexer let mut incoming_multiplexer = multiplexer.clone(); smol::Task::spawn(async move { for stream in listener.incoming() { match stream { Ok(stream) => { let stream = async_io::Async::new(stream).unwrap(); let codec = tokio_util::codec::LinesCodec::new(); let framed = tokio_util::codec::Framed::new(stream.compat(), codec); let (sink, stream) = framed.split(); let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE); } Err(_) => unimplemented!() } } }).detach(); // test clients to put into channels let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap(); let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap(); let mut multiplexer_ch_1 = multiplexer.clone(); // Simple server that echos the data back to the stream and moves the stream to channel 2. smol::Task::spawn(async move { while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await { match message { Some(Ok(data)) => { // echo the data back and move it to channel 2 multiplexer_ch_1.send(vec![stream_id], data); multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO); } Some(Err(err)) => { // the stream had an error } None => { // stream_id has been dropped } } } }).detach(); });
Structs
Multiplexer |
Enums
MultiplexerError | Errors returned by |
Type Definitions
ChannelId | Used when registering channels with |
StreamId | A value returned by |