tarpc-cat 0.1.0

RPC framework built on comp-cat-rs: typed effects, no async, categorical foundations
Documentation
//! RPC client for calling remote services.
//!
//! Provides two calling styles:
//!
//! - [`call`]: connection-per-request over TCP.  Opens a fresh
//!   connection, sends the request, receives the response, and
//!   closes.  Simple and stateless.
//!
//! - [`call_on`]: transport-agnostic.  Sends a request over any
//!   [`Transport`] and returns both the response and the transport
//!   for further composition.
//!
//! [`Transport`]: crate::transport::Transport

use std::net::SocketAddr;

use comp_cat_rs::effect::io::Io;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::error::Error;
use crate::protocol::{Envelope, RequestId};
use crate::transport::{TcpTransport, Transport};

/// Remote server address.
#[derive(Debug, Clone, Copy)]
pub struct ServerAddr(SocketAddr);

impl ServerAddr {
    /// Create from a [`SocketAddr`].
    #[must_use]
    pub fn new(addr: SocketAddr) -> Self {
        Self(addr)
    }

    /// The underlying socket address.
    #[must_use]
    pub fn addr(self) -> SocketAddr {
        self.0
    }
}

/// Send an RPC request over a fresh TCP connection.
///
/// Opens a connection to `addr`, sends `request` as an envelope,
/// reads the response envelope, and returns the deserialized
/// response.  The connection is closed when the [`Io`] finishes.
///
/// # Errors
///
/// Returns [`Error::Io`] on connection failure,
/// [`Error::Serialize`] / [`Error::Deserialize`] on encoding issues,
/// or [`Error::Server`] if the server replies with an error envelope.
///
/// # Examples
///
/// ```rust,ignore
/// use tarpc_cat::client::{call, ServerAddr};
///
/// let addr = ServerAddr::new("127.0.0.1:9000".parse().unwrap());
/// let response: String = call(addr, "ping".to_owned()).run()?;
/// ```
#[must_use]
pub fn call<Req, Resp>(addr: ServerAddr, request: Req) -> Io<Error, Resp>
where
    Req: Serialize + Send + 'static,
    Resp: DeserializeOwned + Send + 'static,
{
    TcpTransport::connect(addr.addr()).flat_map(move |transport| {
        call_on(transport, request).map(|(resp, _transport)| resp)
    })
}

/// Send an RPC request over an existing [`Transport`].
///
/// Serializes `request` into an envelope, sends it, receives the
/// response envelope, and deserializes the response.  Returns both
/// the response and the transport for further composition.
///
/// # Errors
///
/// Returns [`Error::Serialize`] / [`Error::Deserialize`] on
/// encoding issues, or [`Error::Server`] if the server replies
/// with an error envelope.
#[must_use]
pub fn call_on<T, Req, Resp>(transport: T, request: Req) -> Io<Error, (Resp, T)>
where
    T: Transport,
    Req: Serialize + Send + 'static,
    Resp: DeserializeOwned + Send + 'static,
{
    Io::suspend(move || {
        let payload = serde_json::to_string(&request).map_err(Error::from_serialize)?;
        Ok((payload, transport))
    })
    .flat_map(|(payload, transport)| {
        let envelope = Envelope::Request {
            id: RequestId::new(0),
            payload,
        };
        transport.send(envelope)
    })
    .flat_map(Transport::recv)
    .flat_map(|(envelope, transport)| {
        Io::suspend(move || {
            match envelope {
                Envelope::Response { payload, .. } => {
                    let resp = serde_json::from_str(&payload)
                        .map_err(Error::from_deserialize)?;
                    Ok((resp, transport))
                }
                Envelope::Error { message, .. } => Err(Error::Server { message }),
                Envelope::Request { .. } => Err(Error::Server {
                    message: "unexpected request envelope from server".to_owned(),
                }),
            }
        })
    })
}