1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
#![warn( missing_docs, missing_debug_implementations, missing_copy_implementations, trivial_casts, trivial_numeric_casts, unreachable_pub, unsafe_code, unstable_features, unused_import_braces, unused_qualifications )] /*! This crate provides stream multiplexing with channels. Channels have their own backpressure that does not affect other channels. Incoming streams are by default set to channel 0 and can be moved to other channels via `ControlMessage`s. ```rust # use std::error::Error; use bytes::Bytes; use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc; use stream_multiplexer::{Multiplexer, HalvesStream, ControlMessage, IncomingPacket, OutgoingPacket}; use futures::stream::StreamExt; # fn main() -> Result<(), Box<dyn Error>> { # tokio::runtime::Builder::new().basic_scheduler().enable_all().build().unwrap().block_on(async move { // 3 channels of incoming streams, 0 is the channel that new streams join. // Backpressure is per channel. Streams can be moved between channels by // sending an OutgoingPacket::ChangeChannel message. let (channel0_tx, mut channel0_rx) = mpsc::channel(32); let (channel1_tx, mut channel1_rx) = mpsc::channel(32); let (channel2_tx, mut channel2_rx) = mpsc::channel(32); // A Stream for outgoing messages. let (mut outgoing_tx, outgoing_rx) = mpsc::channel::<OutgoingPacket<Bytes>>(32); // Construct the multiplexer, giving it the OutgoingPacket stream, and a vector of incoming // streams. The backlog controls how much of an internal buffer each WriteHalf (TcpSocket in this example) can have. let outgoing_streams_backlog = 128; let multiplexer = Multiplexer::new( outgoing_streams_backlog, outgoing_rx, vec![channel0_tx, channel1_tx, channel2_tx], ); // Bind to a random port on localhost let socket = TcpListener::bind("127.0.0.1:0").await?; let local_addr = socket.local_addr()?; // Use the HalvesStream utility struct to map the stream of new sockets. // It will use LengthDelimitedCodec with 2 bytes as the packet size. let halves = HalvesStream::new(socket, 2); // Control channel for shutting down the multiplexer let (control_write, control_read) = mpsc::unbounded_channel(); let mp_joinhandle = tokio::task::spawn(multiplexer.run(halves, control_read)); // Make a test connection: let mut client = tokio::net::TcpStream::connect(local_addr).await?; // Listen for the channel join announcement let message = channel0_rx.recv().await.expect("Should have connected."); matches::assert_matches!(message, IncomingPacket::StreamConnected(_)); // Send 'a message' let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 0 let incoming_packet = channel0_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Move the client to channel 1 outgoing_tx .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 1)) .await?; // Listen for the channel join announcement let message = channel1_rx.recv().await.expect("Should have connected."); matches::assert_matches!(message, IncomingPacket::StreamConnected(_)); // Send 'a message' again, on channel 1 this time. let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 1 let incoming_packet = channel1_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Move the client to channel 2 outgoing_tx .send(OutgoingPacket::ChangeChannel(vec![incoming_packet.id()], 2)) .await?; // Listen for the channel join announcement let message = channel2_rx.recv().await.expect("Should have connected."); matches::assert_matches!(message, IncomingPacket::StreamConnected(_)); // Send 'a message' again, on channel 2 this time. let mut data = Bytes::from("\x00\x09a message"); client.write_buf(&mut data).await?; client.flush(); // Receive 'a message' on channel 2 let incoming_packet = channel2_rx.recv().await.unwrap(); assert_eq!( incoming_packet .value() .expect("should have a value") .as_ref() .unwrap(), &Bytes::from("a message") ); // Tell multiplexer to shut down control_write.send(ControlMessage::Shutdown)?; mp_joinhandle.await.unwrap(); # Ok::<_, Box<dyn Error>>(()) # }); # Ok(()) # } ``` */ mod error; mod halt; mod halves_stream; mod id_gen; mod multiplexer; mod multiplexer_senders; mod packets; mod send_all_own; mod sender; mod stream_mover; use futures::prelude::*; pub use error::*; use halt::*; pub use halves_stream::*; pub use id_gen::*; pub use multiplexer::*; use multiplexer_senders::*; pub use packets::*; use send_all_own::*; use sender::*; use stream_mover::*; type StreamId = usize; type ChannelId = usize; /// Container for an incoming stream's Read and Write halves. #[derive(Debug)] pub struct IncomingStream<ReadSt, WriteSi> where ReadSt: Stream + Unpin, ReadSt::Item: std::fmt::Debug, { /// Channel that the stream should be placed in. channel: ChannelId, /// Write half of a connection. write_sink: WriteSi, /// Read half of a connection. read_stream: ReadSt, } impl<ReadSt, WriteSi> IncomingStream<ReadSt, WriteSi> where ReadSt: Stream + Unpin, ReadSt::Item: std::fmt::Debug, { /// Creates a new IncomingStream pub fn new(channel: ChannelId, write_sink: WriteSi, read_stream: ReadSt) -> Self { Self { channel, write_sink, read_stream, } } } /// To control the multiplexer, `ControlMessage` can be sent to the `control` channel passed into /// `Multiplexer.run()` #[derive(Copy, Clone, PartialEq, Debug)] pub enum ControlMessage { /// Shut down the `Multiplexer` Shutdown, } #[cfg(test)] mod tests { use super::*; #[allow(dead_code)] pub(crate) fn init_logging() { use tracing_subscriber::FmtSubscriber; let subscriber = FmtSubscriber::builder() .with_max_level(tracing::Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber) .expect("setting default subscriber failed"); } pub(crate) fn sender_reader<St, Si>(sink: Si, stream: St) -> (Sender<Si>, HaltAsyncRead<St>) where St: Stream + Unpin, Si: Unpin, { // Wrap the reader so that it can be retrieved let (halt, reader) = HaltRead::wrap(stream); // Give the Sender the other end of the ejection channel let sender = Sender::new(sink, halt); (sender, reader) } }