ocapn-netlayer 0.1.4

OCapN transport layer interfaces and types
Documentation
use tokio::io::{
    copy, split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::{select, spawn};
use tokio_util::sync::CancellationToken;

use ocapn_syrup::Value;

use crate::{Error, Result};

/// A bidirectional FIFO channel between two peers on the network.
pub struct Connection {
    /// Outgoing messages sent to the peer.
    to_peer: mpsc::Sender<Value>,

    /// Incoming messages received from the peer.
    from_peer: mpsc::Receiver<Value>,

    /// Cancellation token used to initiate termination of the connection and
    /// releasing of resources.
    cancel: CancellationToken,

    /// Cancellation token used to indicate confirmation that resources have
    /// been released.
    done: CancellationToken,
}

impl Connection {
    pub async fn send(&self, value: Value) -> Result<()> {
        // TODO: check if connection OK, return error otherwise
        // TODO: is an acknowledged send necessary?
        self.to_peer.send(value).await.map_err(|e| e.into())
    }

    pub async fn recv(&mut self) -> Result<Value> {
        self.from_peer
            .recv()
            .await
            .ok_or(anyhow::format_err!("connection terminated"))
    }

    pub async fn close(&mut self) -> Result<()> {
        self.cancel.cancel();
        // TODO: timeout waiting for shutdown?
        self.done.cancelled().await;
        Ok(())
    }
}

impl From<TcpStream> for Connection {
    fn from(stream: TcpStream) -> Self {
        split(stream).into()
    }
}

impl<T: AsyncRead + AsyncWrite + Send + 'static> From<(ReadHalf<T>, WriteHalf<T>)> for Connection {
    fn from(split_stream: (ReadHalf<T>, WriteHalf<T>)) -> Self {
        let (mut rd_from_stream, mut wr_to_stream) = split_stream;
        let (tx_to_stream, mut rx_to_stream) = mpsc::channel::<Value>(32);
        let (tx_from_stream, rx_from_stream) = mpsc::channel::<Value>(32);

        let cancel = CancellationToken::new(); // Used to initiate shutdown
        let done = CancellationToken::new(); // Used to acknowledge shutdown complete

        let cancel_read_task = cancel.clone();
        let done_read_task = done.clone();
        let cancel_write_task = cancel.clone();
        let done_write_task = done.clone();

        // Read from the stream, deserializing onto the from_stream channel.
        spawn(async move {
            let result: Result<()> = async {
                let mut rd_pending_buf: Vec<u8> = vec![];
                loop {
                    let mut rd_buf = vec![0; 32768];
                    select! {
                        _ = cancel_read_task.cancelled() => {
                            return Ok(())
                        }
                        res = rd_from_stream.read(&mut rd_buf) => {
                            let rd = res? as usize;
                            rd_pending_buf.extend_from_slice(&rd_buf[0..rd]);
                        }
                    }
                    loop {
                        match ocapn_syrup::parse_value(&rd_pending_buf[..]) {
                            Ok((rest, value)) => {
                                tx_from_stream.send(value).await?;
                                rd_pending_buf = rest;
                            }
                            Err(nom::Err::Incomplete(_)) => break,
                            Err(e) => return Err(e.into()),
                        };
                    }
                }
            }
            .await;
            done_read_task.cancel();
            result
        });

        // Write to the stream, serializing values from the to_stream channel.
        spawn(async move {
            let result: Result<()> = async {
                loop {
                    select! {
                        _ = cancel_write_task.cancelled() => {
                            return Ok(())
                        }
                        value = rx_to_stream.recv() => {
                            let mut offset: usize = 0;
                            let wr_buf = value.ok_or(Error::msg("channel closed"))?.to_vec();
                            loop {
                                if offset >= wr_buf.len() {
                                    break
                                }
                                let wr = copy(&mut &wr_buf[offset..], &mut wr_to_stream).await?;
                                offset += wr as usize;
                            }
                            wr_to_stream.flush().await?;
                        }
                    }
                }
            }
            .await;
            done_write_task.cancel();
            result
        });

        Connection {
            cancel,
            done,
            from_peer: rx_from_stream,
            to_peer: tx_to_stream,
        }
    }
}

/// Create a pair of connections, directly connected together by buffered mpsc
/// channels.
pub fn new_pipe_connection() -> Result<(Connection, Connection)> {
    // Naming bidirectional channels can be tricky, when you're simultaneously
    // wiring things up on both sides, from the perspectives of both sides of the
    // communication. So these variable names are worth a little explanation,
    // they follow a terse convention that tries to encode the purpose of each
    // sender and receiver.
    //
    // tx_to_peer_1 means "the write side of the channel (tx_) to the other peer (to_), from peer 1's perspective" (peer_1)
    // rx_from_peer_2 means "the read side of the channel (rx_) from the other peer (from_), from peer 2's perspecitve" (peer_2)
    let (tx_to_peer_1, mut rx_to_peer_1) = mpsc::channel::<Value>(32);
    let (tx_from_peer_1, rx_from_peer_1) = mpsc::channel::<Value>(32);

    let (tx_to_peer_2, mut rx_to_peer_2) = mpsc::channel::<Value>(32);
    let (tx_from_peer_2, rx_from_peer_2) = mpsc::channel::<Value>(32);

    // Spawn two concurrent channel receiver-senders, for full duplex
    // communication without a chance of deadlock.

    let cancel = CancellationToken::new(); // Used to initiate shutdown
    let done_1 = CancellationToken::new(); // Used to ack shutdown complete to conn_1
    let done_2 = CancellationToken::new(); // Used to ack shutdown complete to conn_2

    let cancel_1_task = cancel.clone();
    let cancel_2_task = cancel.clone();
    let done_1_task = done_1.clone();
    let done_2_task = done_2.clone();

    // Ferry messages from peer 1 to peer 2.
    spawn(async move {
        let result: Result<()> = async {
            loop {
                select! {
                    _ = cancel_1_task.cancelled() => {
                        return Ok(());
                    }
                    sent_from_1_to_2 = rx_to_peer_1.recv() => {
                        match sent_from_1_to_2 {
                           Some(value) => tx_from_peer_2.send(value).await?,
                          None=> return Ok(()),
                        }
                    }
                }
            }
        }
        .await;
        done_1_task.cancel();
        result
    });

    // Ferry messages from peer 2 to peer 1.
    spawn(async move {
        let result: Result<()> = async {
            loop {
                select! {
                    _ = cancel_2_task.cancelled() => {
                        return Ok(());
                    }
                    sent_from_2_to_1 = rx_to_peer_2.recv() => {
                        match sent_from_2_to_1 {
                           Some(value) => tx_from_peer_1.send(value).await?,
                          None=> return Ok(()),
                        }
                    }
                }
            }
        }
        .await;
        done_2_task.cancel();
        result
    });

    let conn_1 = Connection {
        to_peer: tx_to_peer_1,
        from_peer: rx_from_peer_1,
        cancel: cancel.clone(),
        done: done_1,
    };

    let conn_2 = Connection {
        to_peer: tx_to_peer_2,
        from_peer: rx_from_peer_2,
        cancel: cancel.clone(),
        done: done_2,
    };

    Ok((conn_1, conn_2))
}

// Under consideration:
// loopback netlayer connector -- will be great for testing!
// tor netlayer connector -- let's use Arti if we can!
// veilid netlayer connector -- rise and reverberate!
// clearnet tcp netlayer connector -- let a reverse proxy terminate tls
// meshtastic netlayer connector -- probably serial because BLE on rust doesn't work yet

#[tokio::test]
async fn test_pipe() {
    let (mut alice, mut bob) = new_pipe_connection().expect("pipe");
    alice
        .send(Value::string("anyone receiving?"))
        .await
        .expect("send");
    let from_alice = bob.recv().await.expect("recv");
    bob.send(Value::string("yep still here"))
        .await
        .expect("send");
    let from_bob = alice.recv().await.expect("recv");
    assert_eq!(from_alice, Value::string("anyone receiving?"));
    assert_eq!(from_bob, Value::string("yep still here"));
    alice.close().await.expect("closed");
    bob.close().await.expect("closed");
}