Expand description
Transport-agnostic async RPC over message-oriented middleware.
This library provides a unified RpcBroker type for implementing RPC patterns
over pub/sub systems like MQTT, AMQP, and DDS. It handles correlation ID
generation, request/response matching, timeout handling, and concurrent
request processing.
§Supported Transports
| Transport | Description | Enable Flag |
|---|---|---|
| Memory (default) | In-process testing transport | N/A (always on) |
| AMQP via lapin | RabbitMQ and AMQP 0-9-1 brokers | transport_lapin |
| DDS via dust_dds | Brokerless peer-to-peer transport | transport_dust_dds |
| MQTT via rumqttc | MQTT broker-based transport | transport_rumqttc |
| Redis via redis | Redis Pub/Sub transport | transport_redis |
Note: The logging feature (enabled by default) provides diagnostic output via tracing.
To disable logging, use default-features = false in your Cargo.toml:
[dependencies]
mom-rpc = { version = "0.9", default-features = false, features = ["transport_rumqttc"] }§Quick Start
use mom_rpc::{TransportBuilder, RpcBrokerBuilder, Result};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct ReadTemperature { unit: TemperatureUnit }
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
enum TemperatureUnit { Celsius, Fahrenheit }
#[derive(Debug, Serialize, Deserialize)]
struct SensorReading { value: f32, unit: String, timestamp_ms: u64 }
#[tokio::main]
async fn main() -> Result<()> {
//
let transport = TransportBuilder::new()
.uri("memory://")
.node_id("env-sensor-42")
.full_duplex()
.build()
.await?;
let server = RpcBrokerBuilder::new(transport.clone()).build()?;
server.register_rpc_handler("read_temperature", |req: ReadTemperature| async move {
let celsius = 22.0_f32;
let (value, unit) = match req.unit {
TemperatureUnit::Celsius => (celsius, "C"),
TemperatureUnit::Fahrenheit => (celsius * 9.0 / 5.0 + 32.0, "F"),
};
Ok(SensorReading { value, unit: unit.to_string(), timestamp_ms: 0 })
})?;
let _handle = server.spawn()?;
let client = RpcBrokerBuilder::new(transport).build()?;
let resp: SensorReading = client
.request_to("env-sensor-42", "read_temperature", ReadTemperature {
unit: TemperatureUnit::Celsius,
}).await?;
println!("Temperature: {} {}", resp.value, resp.unit);
Ok(())
}§Examples
See the examples/
examples/sensor_client.rsexamples/sensor_fullduplex.rsexamples/sensor_memory.rsexamples/sensor_server.rs
Structs§
- Address
- A transport address.
- Correlation
Id - Unique correlation identifier used to match RPC requests and responses.
- Envelope
- An opaque message envelope.
- Memory
Hub - Shared message bus for the in-memory transport.
- RpcBroker
- Unified RPC broker supporting client, server, and full-duplex modes.
- RpcBroker
Builder - Builder for creating RPC broker instances.
- Subscription
- A subscription identifier.
- Subscription
Handle - Handle returned from a successful subscription.
- Transport
Base - Shared base state for all transport implementations.
- Transport
Builder - Builder for creating transport instances.
- Transport
Config - Configuration for creating a transport instance.
Enums§
- Broker
Mode - Operational mode of an RPC broker.
- RpcError
- Errors surfaced by the RPC layer.
- Transport
Mode - Operational mode of a transport.
Traits§
- Transport
- Transport abstraction.
Functions§
- create_
memory_ transport_ with_ hub - Create a new in-memory transport using the provided hub.
Type Aliases§
- Result
- Crate-wide result type.
- Transport
Ptr - Shared transport pointer.