mom-rpc
Transport-agnostic async RPC over message-oriented middleware.
This crate provides a clean, strongly-typed RPC abstraction on top of pub/sub systems (such as MQTT or AMQP), without baking transport details into your application code.
It is designed to avoid exposing message broker details from application level code.
Binary Footprint
While this crate supports multiple transport implementations, applications only compile the transports they enable. The crate size shown on crates.io is the total of all transport implementations combined, but thanks to Cargo features, your application will only include the code for the transports you actually use. A typical application using a single transport will compile to approximately 45-55 KiB of mom-rpc code regardless of how many total transports the library supports.
Motivation
Message-oriented middleware (pub/sub systems, etc.) is great for decoupling — but insufficent for request/response workflows:
- no native RPC semantics
- no correlation handling
- unclear routing vs dispatch rules
- awkward client/server lifecycles
This crate solves that by providing:
- async RPC semantics (request / response)
- correlation management
- method dispatch
- pluggable transports
- predictable behavior, even over unreliable systems
Design Rationale
Unlike RPC libraries that target a single broker, mom-rpc provides transport abstraction — write your RPC code once, run it on AMQP, DDS, MQTT, Redis, or in-memory by changing feature flags, not code. The same application works unchanged across development, staging, and production regardless of broker choice.
Key properties
-
Transport-agnostic Works over multiple backends via a small transport trait.
-
Strongly typed RPC Typed request and response payloads using
serde. -
Async-first Built for Tokio, uses
async/awaitthroughout. -
Cross-platform Pure Rust with no platform-specific dependencies. Suitable for embedded/edge deployments.
-
No callbacks required Client APIs return futures; servers use async handlers.
-
Explicit invariants Correlation, routing, and dispatch rules are enforced by types.
Usage
In-Memory Transport (Testing)
Suitable for testing and single-process applications - no broker required:
use ;
use ;
async
Run it:
Testing with RabbitMQ
See scripts/manual-tests/README.md for automated test scripts.
==> Checking
==> Starting
)
)
==> Building
==> Starting
==> Running
==> Cleaning
)
MQTT Transport (Production)
For distributed deployments with an MQTT broker:
Cargo.toml:
[]
= { = "0.9", = ["transport_rumqttc"] }
Server:
use ;
let transport = new
.uri
.node_id
.server_mode
.build
.await?;
let server = new.build?;
server.register?;
// Run blocks until shutdown
server.run.await?;
transport.close.await?;
Client:
use ;
use Duration;
let transport = new
.uri
.node_id
.client_mode
.build
.await?;
let client = new
.retry_max_attempts
.retry_initial_delay
.retry_max_delay
.request_total_timeout
.build?;
let resp: SensorReading = client
.request_to
.await?;
println!;
transport.close.await?;
See complete working examples:
examples/sensor_client.rsexamples/sensor_fullduplex.rsexamples/sensor_memory.rsexamples/sensor_server.rs
Logging
By default, mom-rpc emits structured logs via the tracing crate.
Reduce Verbosity
Add a subscriber in your application:
[]
= { = "0.3", = ["env-filter"] }
use ;
fmt
.with_env_filter
.init;
Runtime Control
You can control logging dynamically via the RUST_LOG environment variable:
# For just mom-rpc debug logs
RUST_LOG=mom_rpc=debug
# For all debug logs
RUST_LOG=debug
# For specific module
RUST_LOG=mom_rpc::retry=debug
If you are using a transport backend, you can configure multiple modules:
RUST_LOG=mom_rpc=debug,dust_dds=warn
Disable Logging Entirely
Disable the logging feature:
[]
= { = "0.9", = false, = ["transport_rumqttc"] }
mom-rpc does not install a global subscriber. The application is responsible for configuring tracing.
Transports
The crate includes a memory transport by default.
The memory transport:
- requires no broker
- is always enabled
- is used for integration testing
- is in-process only
- requires all participants to share the same transport instance
It provides a deterministic loopback environment for testing and examples. It does not model an external broker.
Broker-backed transports (e.g. MQTT) are implemented behind feature flags and run out-of-process, with shared state managed by the broker itself. All transports conform to the same RPC contract and approximate the in-memory transport's delivery semantics as closely as the underlying system allows.
Supported Transports
mom-rpc provides multiple transport backends. Each is feature-gated so you only compile what you use:
The memory transport is always available - no feature flag required.
You can enable multiple transports and choose at runtime:
[]
= { = "0.9", = ["transport_rumqttc", "transport_lapin"] }
// TransportBuilder automatically tries enabled transports in order
let transport = new
.uri // e.g., "mqtt://localhost:1883" or "amqp://localhost:5672/%2f"
.node_id
.transport_type // or "rumqttc"
.client_mode
.build
.await?;
The builder tries transports in this order: dust_dds → rumqttc → lapin → memory. The first compatible transport succeeds. For explicit control, use .transport_type("rumqttc").
Applications can also run multiple transports concurrently (e.g., MQTT for IoT devices and AMQP for backend services) by creating separate transport instances.
Transport implementation sizes (as of v0.9.2):
| Transport | Feature Flag | SLOC | Use Case |
|---|---|---|---|
| In-memory | (always available) | 107 | Testing, single-process |
| AMQP | transport_lapin |
313 | RabbitMQ, enterprise messaging |
| MQTT | transport_rumqttc |
418 | IoT, lightweight pub/sub |
| DDS | transport_dust_dds |
703 | Real-time, mission-critical |
| Redis | transport_redis |
368 | In-memory pub/sub, low-latency messaging |
Notes:
- Core library: 1,402 lines, including In-memory.
- Total: 3,204 lines.
- SLOC measured using
tokei(crates.io methodology).
Example: An application using only the MQTT transport compiles 1402 + 418 = 1820 lines of mom-rpc code.
With both MQTT and AMQP enabled: 1402 + 418 + 313 = 2133 lines.
Overriding Default Timeout
request_total_timeout is a total wall-clock budget for the entire request, including all retry attempts. It is distinct from retry_max_delay, which caps the interval between individual retries.
The actual elapsed time may be less than the total timeout if the retry sequence exhausts its attempts first — the first limit reached takes precedence.
Configure timeouts per-request or at the broker level:
use Duration;
// Configure default timeout on the broker
let client = new
.request_total_timeout // global default timeout.
.build?;
// Per-request timeout (overrides the broker default)
let response: MyResponse = client
.request_to_with_timeout
.await?;
// Returns RpcError::Timeout if request exceeds timeout
Full-Duplex Applications
For applications that both send and receive RPC calls (like device↔agent communication), use full-duplex mode:
use ;
let transport = new
.uri
.node_id
.full_duplex // ← Both request and response queues
.build
.await?;
let broker = new.build?;
// Register handlers (acts as server)
broker.register?;
broker.spawn?;
// Make requests (acts as client)
let resp = broker.request_to.await?;
See examples/sensor_fullduplex.rs for a complete full-duplex example.
None-Goals
This crate intentionally does not provide:
- exactly-once delivery
- durable message replay
- transactional guarantees
- broker configuration
- distributed consensus
This crate focuses narrowly on RPC semantics. The underlying transport is expected to provide any required distributed-systems features.
Architecture
The design separates:
- transport mechanics
- RPC semantics
- user-facing APIs
RPC Delivery Semantics
mom-rpc provides:
- At-most-one response delivered to the caller (first response wins).
- No exactly-once guarantees.
Handler invocation depends on the reliability of the underlying transport. In failure or retry scenarios, a handler may be invoked more than once or not at all.
Applications requiring exactly-once effects must ensure idempotency or implement deduplication keyed by correlation_id.
Transport-Specific Considerations
Broker-based transports (AMQP, MQTT, REDIS):
Due to the star topology, there's a potential race condition during startup where
clients may publish before servers have subscribed. The unified broker API (0.9+) includes
built-in retry with exponential backoff to handle these races gracefully. Configure via
RpcBrokerBuilder::retry_max_attempts() and related methods.
Peer-to-peer transports (DDS): Direct peer-to-peer discovery eliminates the startup race through explicit reader/writer matching. DDS also eliminates the single point of failure and serialization bottleneck inherent in broker-based architectures.
Security
The rumqttc and lapin transports support TLS but delegate certificate validation and connection security to the broker. Transport security is intentionally treated as an external concern to avoid coupling RPC semantics to cryptographic policy.
TLS/SSL
For production deployments:
- Use TLS-enabled brokers (port 8883)
- Configure broker authentication (username/password, certificates)
- Terminate TLS at the broker or use mTLS
Authentication
This library does not handle authentication. Delegate to:
- Broker-level auth (username/password, client certificates)
- Network-level security (VPN, firewall rules)
- Message-level encryption (application responsibility)
Documentation
- Complete API reference on docs.rs
- Design patterns and module structure
- Development guide and standards
- Release notes
Status
- Early development / API unstable
- Issues and contributions are tracked via GitHub
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this crate by you shall be dual licensed as above, without any additional terms or conditions.