xrpc-rs 0.2.2

async RPC library optimized for shared memory communication
Documentation

xRPC-rs

Crates.io Documentation License

High-performance local IPC library for Rust with seamless transport flexibility.

Start with in-process channels for development, scale to shared memory for production IPC, extend to TCP for network deployment—same interface.

Status: Alpha - core features complete, API may change.

Quick Start

use xrpc::{
    RpcClient, RpcServer, MessageChannelAdapter,
    SharedMemoryFrameTransport, SharedMemoryConfig,
};
use serde::{Serialize, Deserialize};
use std::sync::Arc;

#[derive(Serialize, Deserialize)]
struct AddRequest { a: i32, b: i32 }

#[derive(Serialize, Deserialize)]
struct AddResponse { result: i32 }

// =================== Server
let transport = SharedMemoryFrameTransport::create_server("my_service", SharedMemoryConfig::default())?;
let channel = Arc::new(MessageChannelAdapter::new(transport));

let server = RpcServer::new();
server.register_typed("add", |req: AddRequest| async move {
    Ok(AddResponse { result: req.a + req.b })
});
server.serve(channel).await?;

// =================== Client
let transport = SharedMemoryFrameTransport::connect_client("my_service")?;
let channel = MessageChannelAdapter::new(transport);

let client = RpcClient::new(channel);
let _handle = client.start();
let resp: AddResponse = client.call("add", &AddRequest { a: 1, b: 2 }).await?;
assert_eq!(resp.result, 3);

Installation

[dependencies]
xrpc-rs = "0.2"

# Optional codecs: codec-messagepack, codec-cbor, codec-postcard, codec-all
xrpc-rs = { version = "0.2", features = ["codec-messagepack"] }

Status

Feature Status
FrameTransport Layer (TCP, Unix, SharedMemory, Channel) ✅ Completed
MessageChannel with compression ✅ Completed
RPC Client/Server ✅ Completed
Streaming ✅ Completed
Connection Pooling ✅ Completed
Service Discovery & Load Balancing ✅ Completed
Docs & Examples In Progress

Architecture

xRPC follows a layered architecture:

Layer Trait/Module Description
Layer 1 FrameTransport Low-level byte transmission with framing
Layer 2 MessageChannel Message-aware channel with compression
Layer 3 RpcClient/RpcServer RPC with method dispatch, streaming
Layer 4 LoadBalancer Service discovery, load balancing
Application
    ↓
LoadBalancedClient (Layer 4)
    ↓
RpcClient/RpcServer (Layer 3)
    ↓
MessageChannel (Layer 2)
    ↓
FrameTransport (Layer 1)
    ↓
Network/IPC

Transport Comparison

Transport Use Case Cross-Process Serialization
SharedMemoryFrameTransport Production IPC Yes Yes
TcpFrameTransport Network / Remote Yes Yes
UnixFrameTransport Local IPC (Unix) Yes Yes
ChannelFrameTransport Same-process / Testing No Yes
ArcFrameTransport Same-process fast path No No (zero-copy)

Load Balancing

Distribute requests across multiple server instances:

use xrpc::{
    LoadBalancedClient, LoadBalancer, ClientFactory, Endpoint,
    StaticDiscovery, RoundRobin, RpcClient, MessageChannelAdapter,
    TcpFrameTransport, TcpConfig, BincodeCodec, RpcError,
};
use async_trait::async_trait;
use std::sync::Arc;

struct TcpFactory;

#[async_trait]
impl ClientFactory<MessageChannelAdapter<TcpFrameTransport>, BincodeCodec> for TcpFactory {
    async fn create(&self, endpoint: &Endpoint) 
        -> Result<RpcClient<MessageChannelAdapter<TcpFrameTransport>, BincodeCodec>, RpcError> 
    {
        let Endpoint::Tcp(addr) = endpoint else {
            return Err(RpcError::ClientError("Expected TCP".into()));
        };
        let transport = TcpFrameTransport::connect(*addr, TcpConfig::default())
            .await.map_err(RpcError::Transport)?;
        Ok(RpcClient::new(MessageChannelAdapter::new(transport)))
    }
}

// Setup load balancer
let endpoints = vec![
    Endpoint::tcp_from_str("127.0.0.1:8001")?,
    Endpoint::tcp_from_str("127.0.0.1:8002")?,
    Endpoint::tcp_from_str("127.0.0.1:8003")?,
];

let discovery = Arc::new(StaticDiscovery::new(endpoints));
let lb = Arc::new(LoadBalancer::new(discovery, RoundRobin::new()));
let client = LoadBalancedClient::new(lb, Arc::new(TcpFactory));
client.init().await?;

// Calls automatically distributed across servers with failover
let response: Response = client.call("method", &request).await?;

Strategies: RoundRobin, Random, LeastConnections, WeightedRoundRobin, ScoreBased

Features:

  • Automatic failover on server failure
  • Stream affinity (streaming calls stay on same server)
  • Health tracking with configurable failure thresholds
  • DNS-based discovery with refresh

Documentation

Planned:

  • Transport layer guide
  • Client/Server usage guide
  • Streaming guide
  • Discovery & load balancing guide
  • Codec guide (Bincode, JSON, MessagePack, CBOR, Postcard)
  • Architecture overview & getting started

Examples

RPC Client/Server

# Terminal 1
cargo run --example rpc_client_server -- server

# Terminal 2
cargo run --example rpc_client_server -- client

Load Balancing

cargo run --example load_balancing

SharedMemory Transport

# Terminal 1
cargo run --example message_transport_shm -- server

# Terminal 2
cargo run --example message_transport_shm -- client

Byte-level Transports

cargo run --example byte_transports

Planned:

  • examples/streaming.rs - Server streaming RPC
  • examples/compression.rs - LZ4/Zstd compression
  • examples/custom_codec.rs - JSON/MessagePack codec usage

License

MIT License - see LICENSE for details.