tarpc-cat 0.1.0

RPC framework built on comp-cat-rs: typed effects, no async, categorical foundations
Documentation
//! Transport abstraction for sending and receiving envelopes.
//!
//! A [`Transport`] is a bidirectional channel that sends and receives
//! [`Envelope`] values.  It uses linear state-threading: each operation
//! consumes `self` and returns it, ensuring exclusive ownership without
//! mutation.
//!
//! Two implementations are provided:
//!
//! - [`TcpTransport`]: wraps a [`std::net::TcpStream`] with
//!   length-delimited JSON framing.
//! - [`ChannelTransport`]: wraps [`std::sync::mpsc`] channels for
//!   in-memory testing.

use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc;

use comp_cat_rs::effect::io::Io;

use crate::codec;
use crate::error::Error;
use crate::protocol::Envelope;

/// A bidirectional transport for [`Envelope`] messages.
///
/// Uses linear state-threading: the transport is consumed and
/// returned on each operation, ensuring exclusive ownership.
pub trait Transport: Sized + Send + 'static {
    /// Send an envelope, returning the transport for further use.
    ///
    /// # Errors
    ///
    /// Returns [`Error::Serialize`] or [`Error::Io`] on failure.
    fn send(self, envelope: Envelope) -> Io<Error, Self>;

    /// Receive an envelope, returning both the envelope and the
    /// transport for further use.
    ///
    /// # Errors
    ///
    /// Returns [`Error::Deserialize`], [`Error::Io`], or
    /// [`Error::ConnectionClosed`] on failure.
    fn recv(self) -> Io<Error, (Envelope, Self)>;
}

/// TCP transport using length-delimited JSON framing.
///
/// Wraps a [`TcpStream`] and uses the [`codec`] module for
/// serialization.
pub struct TcpTransport {
    stream: TcpStream,
}

impl TcpTransport {
    /// Connect to a remote address.
    ///
    /// Returns an [`Io`] that, when run, establishes the TCP connection.
    ///
    /// # Errors
    ///
    /// Returns [`Error::Io`] if the connection cannot be established.
    #[must_use]
    pub fn connect(addr: SocketAddr) -> Io<Error, Self> {
        Io::suspend(move || {
            TcpStream::connect(addr)
                .map(|stream| Self { stream })
                .map_err(Error::from)
        })
    }

    /// Wrap an existing [`TcpStream`].
    #[must_use]
    pub fn from_stream(stream: TcpStream) -> Self {
        Self { stream }
    }
}

impl Transport for TcpTransport {
    fn send(self, envelope: Envelope) -> Io<Error, Self> {
        Io::suspend(move || {
            let mut writer: &TcpStream = &self.stream;
            codec::encode(&mut writer, &envelope)?;
            Ok(self)
        })
    }

    fn recv(self) -> Io<Error, (Envelope, Self)> {
        Io::suspend(move || {
            let mut reader: &TcpStream = &self.stream;
            let envelope: Envelope = codec::decode(&mut reader)?;
            Ok((envelope, self))
        })
    }
}

/// In-memory channel transport for testing.
///
/// Uses [`std::sync::mpsc`] channels with serialized bytes to match
/// the behavior of [`TcpTransport`] (including serialization round-trips).
pub struct ChannelTransport {
    sender: mpsc::Sender<Vec<u8>>,
    receiver: mpsc::Receiver<Vec<u8>>,
}

impl ChannelTransport {
    /// Create a pair of connected in-memory transports.
    ///
    /// Messages sent on one transport are received by the other.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// let (client_side, server_side) = ChannelTransport::pair();
    /// ```
    #[must_use]
    pub fn pair() -> (Self, Self) {
        let (tx1, rx1) = mpsc::channel();
        let (tx2, rx2) = mpsc::channel();
        (
            Self {
                sender: tx1,
                receiver: rx2,
            },
            Self {
                sender: tx2,
                receiver: rx1,
            },
        )
    }
}

impl Transport for ChannelTransport {
    fn send(self, envelope: Envelope) -> Io<Error, Self> {
        Io::suspend(move || {
            let mut buf = Vec::new();
            codec::encode(&mut buf, &envelope)?;
            self.sender
                .send(buf)
                .map_err(|_| Error::ConnectionClosed)?;
            Ok(self)
        })
    }

    fn recv(self) -> Io<Error, (Envelope, Self)> {
        Io::suspend(move || {
            let buf = self
                .receiver
                .recv()
                .map_err(|_| Error::ConnectionClosed)?;
            let mut cursor = std::io::Cursor::new(buf);
            let envelope: Envelope = codec::decode(&mut cursor)?;
            Ok((envelope, self))
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::protocol::RequestId;

    #[test]
    fn channel_transport_round_trip() -> Result<(), Error> {
        let (a, b) = ChannelTransport::pair();
        let envelope = Envelope::Request {
            id: RequestId::new(7),
            payload: r#""hello""#.to_owned(),
        };

        let a = a.send(envelope).run()?;
        let (received, b) = b.recv().run()?;

        match received {
            Envelope::Request { id, payload } => {
                assert_eq!(id.value(), 7);
                assert_eq!(payload, r#""hello""#);
            }
            _ => return Err(Error::Server {
                message: "wrong variant".to_owned(),
            }),
        }

        let reply = Envelope::Response {
            id: RequestId::new(7),
            payload: r#""world""#.to_owned(),
        };
        let _b = b.send(reply).run()?;
        let (received, _a) = a.recv().run()?;

        match received {
            Envelope::Response { id, payload } => {
                assert_eq!(id.value(), 7);
                assert_eq!(payload, r#""world""#);
            }
            _ => return Err(Error::Server {
                message: "wrong variant".to_owned(),
            }),
        }

        Ok(())
    }
}