actrpc-transport 0.1.0

Transport abstractions and implementations for ActRPC.
Documentation
use crate::TransportError;
use actrpc_core::{error::CodecError, json_rpc::JsonRpcMessage};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};

pub async fn write_message<W>(
    writer: &mut W,
    message: &JsonRpcMessage,
) -> Result<(), TransportError>
where
    W: AsyncWrite + Unpin + Send,
{
    let payload = serde_json::to_string(message)
        .map_err(|source| CodecError::Serialize(source.to_string()))?;

    writer
        .write_all(payload.as_bytes())
        .await
        .map_err(|source| TransportError::Io {
            message: format!("failed to write newline-delimited JSON-RPC payload: {source}"),
        })?;

    writer
        .write_all(b"\n")
        .await
        .map_err(|source| TransportError::Io {
            message: format!(
                "failed to write newline-delimited JSON-RPC frame delimiter: {source}"
            ),
        })?;

    writer.flush().await.map_err(|source| TransportError::Io {
        message: format!("failed to flush newline-delimited JSON-RPC frame: {source}"),
    })?;

    Ok(())
}

pub async fn read_message<R>(reader: &mut R) -> Result<JsonRpcMessage, TransportError>
where
    R: AsyncBufRead + Unpin + Send,
{
    let mut line = String::new();

    let bytes_read = reader
        .read_line(&mut line)
        .await
        .map_err(|source| TransportError::Io {
            message: format!("failed to read newline-delimited JSON-RPC frame: {source}"),
        })?;

    if bytes_read == 0 {
        return Err(TransportError::Connection {
            message: "peer closed connection while reading newline-delimited JSON-RPC frame"
                .to_owned(),
        });
    }

    let trimmed = line.trim();

    if trimmed.is_empty() {
        return Err(TransportError::Codec(CodecError::Deserialize(
            "received empty newline-delimited JSON-RPC frame".to_owned(),
        )));
    }

    serde_json::from_str::<JsonRpcMessage>(trimmed)
        .map_err(|source| CodecError::Deserialize(source.to_string()).into())
}