Skip to main content

Crate mom_rpc

Crate mom_rpc 

Source
Expand description

Transport-agnostic async RPC over message-oriented middleware.

This library provides a unified RpcBroker type for implementing RPC patterns over pub/sub systems like MQTT, AMQP, and DDS. It handles correlation ID generation, request/response matching, timeout handling, and concurrent request processing.

§Supported Transports

TransportDescriptionEnable Flag
Memory (default)In-process testing transportN/A (always on)
AMQP via lapinRabbitMQ and AMQP 0-9-1 brokerstransport_lapin
DDS via dust_ddsBrokerless peer-to-peer transporttransport_dust_dds
MQTT via rumqttcMQTT broker-based transporttransport_rumqttc
Redis via redisRedis Pub/Sub transporttransport_redis

Note: The logging feature (enabled by default) provides diagnostic output via tracing. To disable logging, use default-features = false in your Cargo.toml:

[dependencies]
mom-rpc = { version = "0.9", default-features = false, features = ["transport_rumqttc"] }

§Quick Start

use mom_rpc::{TransportBuilder, RpcBrokerBuilder, Result};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct ReadTemperature { unit: TemperatureUnit }

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
enum TemperatureUnit { Celsius, Fahrenheit }

#[derive(Debug, Serialize, Deserialize)]
struct SensorReading { value: f32, unit: String, timestamp_ms: u64 }

#[tokio::main]
async fn main() -> Result<()> {
    //
    let transport = TransportBuilder::new()
        .uri("memory://")
        .node_id("env-sensor-42")
        .full_duplex()
        .build()
        .await?;

    let server = RpcBrokerBuilder::new(transport.clone()).build()?;
    server.register_rpc_handler("read_temperature", |req: ReadTemperature| async move {
        let celsius = 22.0_f32;
        let (value, unit) = match req.unit {
            TemperatureUnit::Celsius => (celsius, "C"),
            TemperatureUnit::Fahrenheit => (celsius * 9.0 / 5.0 + 32.0, "F"),
        };
        Ok(SensorReading { value, unit: unit.to_string(), timestamp_ms: 0 })
    })?;
    let _handle = server.spawn()?;

    let client = RpcBrokerBuilder::new(transport).build()?;
    let resp: SensorReading = client
        .request_to("env-sensor-42", "read_temperature", ReadTemperature {
            unit: TemperatureUnit::Celsius,
        }).await?;
    println!("Temperature: {} {}", resp.value, resp.unit);

    Ok(())
}

§Examples

See the examples/

  • examples/sensor_client.rs
  • examples/sensor_fullduplex.rs
  • examples/sensor_memory.rs
  • examples/sensor_server.rs

Structs§

Address
A transport address.
CorrelationId
Unique correlation identifier used to match RPC requests and responses.
Envelope
An opaque message envelope.
MemoryHub
Shared message bus for the in-memory transport.
RpcBroker
Unified RPC broker supporting client, server, and full-duplex modes.
RpcBrokerBuilder
Builder for creating RPC broker instances.
Subscription
A subscription identifier.
SubscriptionHandle
Handle returned from a successful subscription.
TransportBase
Shared base state for all transport implementations.
TransportBuilder
Builder for creating transport instances.
TransportConfig
Configuration for creating a transport instance.

Enums§

BrokerMode
Operational mode of an RPC broker.
RpcError
Errors surfaced by the RPC layer.
TransportMode
Operational mode of a transport.

Traits§

Transport
Transport abstraction.

Functions§

create_memory_transport_with_hub
Create a new in-memory transport using the provided hub.

Type Aliases§

Result
Crate-wide result type.
TransportPtr
Shared transport pointer.