remoc 0.15.3

🦑 Remote multiplexed objects, channels, observable collections and RPC making remote interactions seamless. Provides multiple remote channels and RPC over TCP, TLS or any other transport.
Documentation
#[cfg(feature = "rch")]
use futures::StreamExt;

#[allow(unused_imports)]
use std::{net::Ipv4Addr, sync::Once};

#[cfg(feature = "rch")]
#[cfg(not(target_family = "wasm"))]
use tokio::net::{TcpListener, TcpStream};

#[cfg(feature = "rch")]
use remoc::{rch::base, RemoteSend};

use remoc::exec;

mod chmux;

#[cfg(feature = "serde")]
mod codec;

#[cfg(feature = "rch")]
mod rch;

#[cfg(feature = "rfn")]
mod rfn;

#[cfg(feature = "robj")]
mod robj;

#[cfg(feature = "robs")]
mod robs;

#[cfg(feature = "rtc")]
mod rtc;

static INIT: Once = Once::new();

pub fn init() {
    INIT.call_once(|| {
        use tracing_subscriber::{filter::EnvFilter, fmt::format::FmtSpan};
        tracing_subscriber::fmt::fmt()
            .with_env_filter(EnvFilter::from_default_env())
            .with_span_events(FmtSpan::NEW)
            .init();
    });
}

#[macro_export]
macro_rules! loop_transport {
    ($queue_length:expr, $a_tx:ident, $a_rx:ident, $b_tx:ident, $b_rx:ident) => {
        let ($a_tx, $b_rx) = futures::channel::mpsc::channel::<bytes::Bytes>($queue_length);
        let ($b_tx, $a_rx) = futures::channel::mpsc::channel::<bytes::Bytes>($queue_length);

        let $a_rx = $a_rx.map(Ok::<_, std::io::Error>);
        let $b_rx = $b_rx.map(Ok::<_, std::io::Error>);
    };
}

#[cfg(feature = "rch")]
pub async fn loop_channel<T>() -> ((base::Sender<T>, base::Receiver<T>), (base::Sender<T>, base::Receiver<T>))
where
    T: RemoteSend,
{
    let cfg = remoc::chmux::Cfg::default();
    loop_channel_with_cfg(cfg).await
}

#[cfg(feature = "rch")]
pub async fn droppable_loop_channel<T>(
) -> ((base::Sender<T>, base::Receiver<T>), (base::Sender<T>, base::Receiver<T>), tokio::sync::mpsc::Receiver<()>)
where
    T: RemoteSend,
{
    let cfg = remoc::chmux::Cfg::default();
    droppable_loop_channel_with_cfg(cfg).await
}

#[cfg(feature = "rch")]
pub async fn loop_channel_with_cfg<T>(
    cfg: remoc::chmux::Cfg,
) -> ((base::Sender<T>, base::Receiver<T>), (base::Sender<T>, base::Receiver<T>))
where
    T: RemoteSend,
{
    let (a, b, mut drop_rx) = droppable_loop_channel_with_cfg(cfg).await;
    exec::spawn(async move {
        let _ = drop_rx.recv().await;
    });
    (a, b)
}

#[cfg(feature = "rch")]
pub async fn droppable_loop_channel_with_cfg<T>(
    cfg: remoc::chmux::Cfg,
) -> ((base::Sender<T>, base::Receiver<T>), (base::Sender<T>, base::Receiver<T>), tokio::sync::mpsc::Receiver<()>)
where
    T: RemoteSend,
{
    let (drop_tx, drop_rx) = tokio::sync::mpsc::channel(1);
    loop_transport!(0, transport_a_tx, transport_a_rx, transport_b_tx, transport_b_rx);

    let a_cfg = cfg.clone();
    let a_drop_tx = drop_tx.clone();
    let a = async move {
        let (conn, tx, rx) = remoc::Connect::framed(a_cfg, transport_a_tx, transport_a_rx).await.unwrap();
        exec::spawn(async move {
            tokio::select! {
                _ = conn => (),
                _ = a_drop_tx.closed() => (),
            }
        });
        (tx, rx)
    };

    let b_cfg = cfg.clone();
    let b = async move {
        let (conn, tx, rx) = remoc::Connect::framed(b_cfg, transport_b_tx, transport_b_rx).await.unwrap();
        exec::spawn(async move {
            tokio::select! {
                _ = conn => (),
                _ = drop_tx.closed() => (),
            }
        });
        (tx, rx)
    };

    let (a, b) = tokio::join!(a, b);
    (a, b, drop_rx)
}

#[cfg(feature = "rch")]
#[cfg(not(target_family = "wasm"))]
pub async fn tcp_loop_channel<T>(
    tcp_port: u16,
) -> ((base::Sender<T>, base::Receiver<T>), (base::Sender<T>, base::Receiver<T>))
where
    T: RemoteSend,
{
    let server = async move {
        let listener = TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), tcp_port)).await.unwrap();
        let (socket, _) = listener.accept().await.unwrap();
        let (socket_rx, socket_tx) = socket.into_split();
        let (conn, tx, rx) =
            remoc::Connect::io_buffered(Default::default(), socket_rx, socket_tx, 100_000).await.unwrap();
        exec::spawn(conn);
        (tx, rx)
    };

    let client = async move {
        let socket = TcpStream::connect((Ipv4Addr::new(127, 0, 0, 1), tcp_port)).await.unwrap();
        let (socket_rx, socket_tx) = socket.into_split();
        let (conn, tx, rx) =
            remoc::Connect::io_buffered(Default::default(), socket_rx, socket_tx, 8721).await.unwrap();
        exec::spawn(conn);
        (tx, rx)
    };

    tokio::join!(server, client)
}