use cs_utils::constants::time::{DAY_MS, SECOND_MS};
use cs_utils::futures::wait;
use cs_utils::random_str;
use multiplexed_connection::{MultiplexedConnection, Connected, Channel};
use tokio::io::{duplex, split, AsyncReadExt, AsyncWriteExt};
use tokio::try_join;
async fn handle_channel(
connection_name: &'static str,
channel: Box<dyn Channel>,
should_send_data: bool,
) {
let channel_label = channel.label().clone();
let (mut source, mut sink) = split(channel);
let channel_label1 = channel_label.clone();
let channel_label2 = channel_label.clone();
try_join!(
tokio::spawn(async move {
let mut buf = [0; 1024];
while let Ok(bytes_read) = source.read(&mut buf).await {
let data_str = std::str::from_utf8(&buf[..bytes_read])
.expect("Cannot parse UTF8 string.");
println!("<< {:?} received {:?} on channel {:?}", &connection_name, &data_str, &channel_label1);
}
}),
tokio::spawn(async move {
loop {
if !should_send_data {
break;
}
let data_str = random_str(8);
println!(">> {:?} sending {:?} on channel {:?}", &connection_name, &data_str, &channel_label2);
sink
.write(data_str.as_bytes()).await
.expect("Cannot send data.");
wait(1 * SECOND_MS).await;
}
}),
).unwrap();
}
async fn handle_connection(
connection_name: &'static str,
mut connection: Box<dyn Connected>,
) {
let mut on_remote_channel = connection.on_remote_channel()
.expect("No `on_remote_channel` listener found.");
try_join!(
tokio::spawn(async move {
while let Some(remote_channel) = on_remote_channel.recv().await {
tokio::spawn(handle_channel(&connection_name, remote_channel, false));
}
}),
tokio::spawn(async move {
loop {
let channel_label = format!("channel-{}", random_str(4));
let channel = connection
.channel(channel_label).await
.expect("Failed to open data channel.");
println!("🚏 {:?} created data channel {:?}", &connection_name, channel.label());
tokio::spawn(handle_channel(&connection_name, channel, true));
wait(15 * SECOND_MS).await;
}
}),
).unwrap();
}
#[tokio::main]
async fn main() {
let (duplex1, duplex2) = duplex(4096);
let connection1 = MultiplexedConnection::new(duplex1);
let connection2 = MultiplexedConnection::new(duplex2);
try_join!(
tokio::spawn(async move {
wait(10).await;
let connected = connection1
.listen().await
.expect("Failed to receive incoming connection.");
println!("🙌 got incoming connection");
handle_connection(
"connection1",
connected,
).await
}),
tokio::spawn(async move {
let connected = connection2
.connect().await
.expect("Failed to connect.");
println!("👌 connected");
handle_connection(
"connection2",
connected,
).await
}),
).unwrap();
wait(DAY_MS).await;
}