multiplexed-connection 0.4.0

Creates connection with multiple data channels over single async data stream.
Documentation
use cs_utils::constants::time::{DAY_MS, SECOND_MS};
use cs_utils::futures::wait;
use cs_utils::random_str;

use multiplexed_connection::{MultiplexedConnection, Connected, Channel};

use tokio::io::{duplex, split, AsyncReadExt, AsyncWriteExt};
use tokio::try_join;

async fn handle_channel(
    connection_name: &'static str,
    channel: Box<dyn Channel>,
    should_send_data: bool,
) {
    let channel_label = channel.label().clone();
    let (mut source, mut sink) = split(channel);

    let channel_label1 = channel_label.clone();
    let channel_label2 = channel_label.clone();
    try_join!(
        tokio::spawn(async move {
            let mut buf = [0; 1024];

            while let Ok(bytes_read) = source.read(&mut buf).await {
                let data_str = std::str::from_utf8(&buf[..bytes_read])
                    .expect("Cannot parse UTF8 string.");

                println!("<< {:?} received {:?} on channel {:?}", &connection_name, &data_str, &channel_label1);
            }
        }),
        tokio::spawn(async move {
            loop {
                if !should_send_data {
                    break;
                }

                let data_str = random_str(8);

                println!(">> {:?} sending {:?} on channel {:?}", &connection_name, &data_str, &channel_label2);

                sink
                    .write(data_str.as_bytes()).await
                    .expect("Cannot send data.");

                wait(1 * SECOND_MS).await;
            }
        }),
    ).unwrap();
}

async fn handle_connection(
    connection_name: &'static str,
    mut connection: Box<dyn Connected>,
) {
    let mut on_remote_channel = connection.on_remote_channel()
        .expect("No `on_remote_channel` listener found.");

    try_join!(
        tokio::spawn(async move {
            while let Some(remote_channel) = on_remote_channel.recv().await {
                tokio::spawn(handle_channel(&connection_name, remote_channel, false));
            }
        }),
        tokio::spawn(async move {
            loop {
                let channel_label = format!("channel-{}", random_str(4));

                let channel = connection
                    .channel(channel_label).await
                    .expect("Failed to open data channel.");

                println!("🚏 {:?} created data channel {:?}", &connection_name, channel.label());
                
                tokio::spawn(handle_channel(&connection_name, channel, true));

                wait(15 * SECOND_MS).await;
            }
        }),
    ).unwrap();
}

#[tokio::main]
async fn main() {
    let (duplex1, duplex2) = duplex(4096);

    let connection1 = MultiplexedConnection::new(duplex1);
    let connection2 = MultiplexedConnection::new(duplex2);

    try_join!(
        tokio::spawn(async move {
            wait(10).await;

            let connected = connection1
                .listen().await
                .expect("Failed to receive incoming connection.");

            println!("🙌 got incoming connection");

            handle_connection(
                "connection1",
                connected,
            ).await
        }),
        tokio::spawn(async move {
            let connected = connection2
                .connect().await
                .expect("Failed to connect.");

            println!("👌 connected");

            handle_connection(
                "connection2",
                connected,
            ).await
        }),
    ).unwrap();

    wait(DAY_MS).await;
}