tropocol 0.1.2

Send and receive serde-compatible objects over TCP (async)
Documentation
use serde_json::{to_vec, from_slice};
use std::net::SocketAddr;

use super::*;

/// Handles encoding/decoding and transmission/reception of TCP messages
pub async fn session<I, O, R, T>(
    stream: TcpStream,
    handle_incoming: R,
    await_outgoing: T,
) -> Result<(), Error>
where
    I: Incoming,
    O: Outgoing,
    R: HandleIncoming<I>,
    T: GetOutgoing<O>,
{
    let peer_addr = stream.peer_addr().ok();
    let tx_tcp = stream.clone();
    let rx_tcp = stream;

    let tx_task = transmit(tx_tcp, await_outgoing, peer_addr);
    let rx_task = receive(rx_tcp, handle_incoming, peer_addr);

    let result = or(tx_task, rx_task).await;

    #[cfg(feature = "log")]
    if let Err(error) = &result {
        match peer_addr {
            Some(addr) => log::error!("{}: {}", addr, error),
            None => log::error!("<noaddr>: {}", error),
        }
    } else {
        match peer_addr {
            Some(addr) => log::debug!("{}: Connection Closed", addr),
            None => log::debug!("<noaddr>: Connection Closed"),
        }
    }

    result
}

#[allow(unused_variables)]
async fn receive<I, R>(
    mut rx_tcp: TcpStream,
    mut handler: R,
    peer_addr: Option<SocketAddr>,
) -> Result<(), Error>
where
    I: Incoming,
    R: HandleIncoming<I>,
{
    loop {
        let mut len = [0u8; 8];

        if rx_tcp.read_exact(&mut len).await.is_err() {
            break Ok((/* Connection Closed (Clean) */));
        }

        let len = u64::from_be_bytes(len) as usize;
        let mut buffer = vec![0u8; len];

        if rx_tcp.read_exact(&mut buffer).await.is_err() {
            break Err(Error::UnexpectedEof);
        }

        let incoming = from_slice(&buffer).map_err(Error::Codec)?;

        #[cfg(feature = "log")]
        match peer_addr {
            Some(addr) => log::debug!("{}: Received {:?}", addr, incoming),
            None => log::debug!("<noaddr>: Received {:?}", incoming),
        }

        handler.handle_incoming(incoming).await;
    }
}

#[allow(unused_variables)]
async fn transmit<O, T>(
    mut tx_tcp: TcpStream,
    mut source: T,
    peer_addr: Option<SocketAddr>,
) -> Result<(), Error>
where
    O: Outgoing,
    T: GetOutgoing<O>,
{
    while let Some(outgoing) = source.get_outgoing().await {
        #[cfg(feature = "log")]
        match peer_addr {
            Some(addr) => log::debug!("{}: Sending {:?}", addr, outgoing),
            None => log::debug!("<noaddr>: Sending {:?}", outgoing),
        }

        let bytes = to_vec(&outgoing).map_err(Error::Codec)?;
        let len = (bytes.len() as u64).to_be_bytes();

        let mut tx_fail = false;
        tx_fail |= tx_tcp.write_all(&len).await.is_err();
        tx_fail |= tx_tcp.write_all(&bytes).await.is_err();

        if tx_fail {
            return Err(Error::UnexpectedEof);
        }
    }

    Ok(())
}