remoc 0.18.3

🦑 Remote multiplexed objects, channels, observable collections and RPC making remote interactions seamless. Provides multiple remote channels and RPC over TCP, TLS or any other transport.
Documentation
use futures::{channel::oneshot, future::try_join, stream::StreamExt};
use std::time::Duration;
use tracing::Instrument;

#[cfg(feature = "js")]
use wasm_bindgen_test::wasm_bindgen_test;

use crate::loop_transport;
use remoc::{
    chmux::{self, PortsExhausted, ReceiverStream, SendError},
    exec,
    exec::time::sleep,
};

fn cfg() -> chmux::Cfg {
    chmux::Cfg {
        connection_timeout: Some(Duration::from_secs(1)),
        max_ports: 20,
        ports_exhausted: PortsExhausted::Fail,
        max_data_size: 1_000_000,
        max_received_ports: 100,
        chunk_size: 9,
        receive_buffer: 4,
        shared_send_queue: 3,
        connect_queue: 2,
        ..Default::default()
    }
}

fn cfg2() -> chmux::Cfg {
    chmux::Cfg {
        connection_timeout: Some(Duration::from_secs(1)),
        max_ports: 20,
        ports_exhausted: PortsExhausted::Wait(Some(Duration::from_secs(5))),
        max_data_size: 1_000_000,
        max_received_ports: 100,
        chunk_size: 4,
        receive_buffer: 4,
        shared_send_queue: 1,
        connect_queue: 1,
        ..Default::default()
    }
}

#[cfg_attr(not(feature = "js"), tokio::test)]
#[cfg_attr(feature = "js", wasm_bindgen_test)]
async fn basic() {
    crate::init();

    println!("Connecting...");
    loop_transport!(0, a_tx, a_rx, b_tx, b_rx);
    let ((a_mux, a_client, a_server), (b_mux, b_client, mut b_server)) =
        try_join(chmux::ChMux::new(cfg(), a_tx, a_rx), chmux::ChMux::new(cfg2(), b_tx, b_rx)).await.unwrap();
    println!("Connected: a_mux={:?}, b_mux={:?}", &a_mux, &b_mux);

    let (a_mux_done_tx, a_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            println!("A mux run");
            a_mux.run().await.expect("a_mux");
            let _ = a_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("A mux")),
    );

    let (b_mux_done_tx, b_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            println!("B mux run");
            b_mux.run().await.expect("b_mux");
            let _ = b_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("B mux")),
    );

    const N_MSG: usize = 500;

    let (server_done_tx, server_done_rx) = oneshot::channel();
    exec::spawn(async move {
        println!("B server start");
        while let Some((mut tx, mut rx)) = b_server.accept().await.unwrap() {
            exec::spawn(async move {
                while let Some(msg) = rx.recv().await.unwrap() {
                    println!("Server received: {}", String::from_utf8(msg.into()).unwrap());
                }
                println!("Server receiver ended");
            });

            for i in 0..N_MSG {
                let msg = format!("message no {i}");
                match tx.send(msg.clone().into()).await {
                    Ok(()) => (),
                    Err(SendError::Closed { gracefully }) => {
                        println!("Client closed connection with gracefully={gracefully}");
                        break;
                    }
                    other => other.unwrap(),
                }
                println!("Server sent: {msg}");
            }

            drop(tx);
            println!("Server dropped transmitter");
        }

        println!("B Server quit");
        drop(b_client);
        let _ = server_done_tx.send(());
    });

    {
        println!("A client connecting 1...");
        let ret: Result<(chmux::Sender, chmux::Receiver), _> = a_client.connect().await;
        println!("A client connect result: {:?}", &ret);
    }

    println!("Delay test...");
    sleep(Duration::from_secs(3)).await;

    println!("A client connecting 2...");
    let (mut tx, mut rx): (chmux::Sender, chmux::Receiver) = a_client.connect().await.unwrap();
    println!("A client connected.");

    let mut n_recv = 0;
    while let Some(msg) = rx.recv().await.unwrap() {
        let s = String::from_utf8(msg.into()).unwrap();
        println!("A client received: {}", &s);
        n_recv += 1;

        println!("A client replying...");
        tx.send(format!("Reply: {}", &s).into()).await.unwrap();
        println!("A client replied");
    }
    if n_recv != N_MSG {
        panic!("received not equal number messages than sent");
    }
    println!("A client receiver closed");

    drop(tx);
    drop(rx);
    drop(a_client);

    println!("Waiting for server");
    server_done_rx.await.unwrap();

    drop(a_server);

    println!("Waiting for multiplexers");
    a_mux_done_rx.await.unwrap();
    b_mux_done_rx.await.unwrap();
}

#[cfg_attr(not(feature = "js"), tokio::test)]
#[cfg_attr(feature = "js", wasm_bindgen_test)]
async fn receiver_stream() {
    crate::init();

    loop_transport!(0, a_tx, a_rx, b_tx, b_rx);
    let ((a_mux, a_client, a_server), (b_mux, b_client, mut b_server)) =
        try_join(chmux::ChMux::new(cfg(), a_tx, a_rx), chmux::ChMux::new(cfg2(), b_tx, b_rx)).await.unwrap();

    let (a_mux_done_tx, a_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            a_mux.run().await.expect("a_mux");
            let _ = a_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("A mux")),
    );

    let (b_mux_done_tx, b_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            b_mux.run().await.expect("b_mux");
            let _ = b_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("B mux")),
    );

    const N_MSG: usize = 100;

    let (server_done_tx, server_done_rx) = oneshot::channel();
    exec::spawn(async move {
        while let Some((mut tx, rx)) = b_server.accept().await.unwrap() {
            let mut n = 0;
            let mut rx = ReceiverStream::from(rx);
            while let Some(msg) = rx.next().await {
                let msg = msg.unwrap();
                tx.send(msg.into()).await.unwrap();

                n += 1;
                if n == N_MSG {
                    rx.close();
                }
            }
        }

        drop(b_client);
        let _ = server_done_tx.send(());
    });

    let (mut tx, mut rx): (chmux::Sender, chmux::Receiver) = a_client.connect().await.unwrap();

    let mut n = 0;
    loop {
        let s = format!("{n}");
        let data = s.as_bytes().to_vec();
        match tx.send(data.clone().into()).await {
            Ok(()) => (),
            Err(err) if err.is_closed() => break,
            Err(err) => panic!("send error: {err}"),
        }

        n += 1;

        let reply = rx.recv().await.unwrap().unwrap();
        let reply_data: Vec<u8> = reply.into();
        assert_eq!(data, reply_data);
    }
    assert!(n > N_MSG);

    drop(tx);
    drop(rx);
    drop(a_client);

    println!("Waiting for server");
    server_done_rx.await.unwrap();

    drop(a_server);

    println!("Waiting for multiplexers");
    a_mux_done_rx.await.unwrap();
    b_mux_done_rx.await.unwrap();
}

#[cfg_attr(
    all(not(feature = "js"), not(target_family = "wasm")),
    tokio::test(flavor = "multi_thread", worker_threads = 8)
)]
#[cfg_attr(all(not(feature = "js"), target_family = "wasm"), tokio::test)]
#[cfg_attr(feature = "js", wasm_bindgen_test)]
async fn hangup() {
    crate::init();

    println!("Connecting...");
    loop_transport!(0, a_tx, a_rx, b_tx, b_rx);
    let ((a_mux, a_client, a_server), (b_mux, b_client, mut b_server)) =
        try_join(chmux::ChMux::new(cfg(), a_tx, a_rx), chmux::ChMux::new(cfg2(), b_tx, b_rx)).await.unwrap();
    println!("Connected: a_mux={:?}, b_mux={:?}", &a_mux, &b_mux);

    let (a_mux_done_tx, a_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            println!("A mux start");
            a_mux.run().await.unwrap();
            let _ = a_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("A mux")),
    );

    let (b_mux_done_tx, b_mux_done_rx) = oneshot::channel();
    exec::spawn(
        async move {
            println!("B mux start");
            b_mux.run().await.unwrap();
            let _ = b_mux_done_tx.send(());
        }
        .instrument(tracing::info_span!("B mux")),
    );

    let (server_done_tx, server_done_rx) = oneshot::channel();
    exec::spawn(async move {
        println!("B server start");
        while let Some((mut tx, mut rx)) = b_server.accept().await.unwrap() {
            while let Some(msg) = rx.recv().await.unwrap() {
                let msg = Vec::from(msg);
                println!("Server received: {}", msg[0]);
                if msg[0] == 100 {
                    break;
                }
            }
            println!("Server dropping receiver");
            drop(rx);

            tx.send("Server 1 msg".into()).await.unwrap();
            tx.send("Server 2nd message".into()).await.unwrap();

            println!("Server closed connection");
        }
        println!("B Server quit");
        drop(b_client);
        let _ = server_done_tx.send(());
    });

    println!("A client connecting to service...");
    let (mut tx, mut rx): (chmux::Sender, chmux::Receiver) = a_client.connect().await.unwrap();
    println!("A client connected.");

    let hn = tx.closed();

    let mut i = 0u16;
    loop {
        let msg = vec![i as u8];
        match tx.send(msg.into()).await {
            Ok(()) => println!("A client sent: {i}"),
            Err(err) => {
                println!("A client send error: {err:?}");
                break;
            }
        }
        i += 1;
    }
    if i <= 100 {
        panic!("Send error too early");
    }

    println!("Waiting for hangup notification...");
    hn.await;

    loop {
        match rx.recv().await.unwrap() {
            Some(msg) => {
                println!("Client received: {}", String::from_utf8(msg.into()).unwrap());
            }
            None => {
                println!("Client receive end");
                break;
            }
        }
    }

    println!("A client receiver closed");

    drop(tx);
    drop(rx);
    drop(a_client);

    println!("Waiting for server close");
    server_done_rx.await.unwrap();

    drop(a_server);

    println!("Waiting for multiplexers");
    a_mux_done_rx.await.unwrap();
    b_mux_done_rx.await.unwrap();
}