easy-rmq-rs
Rust AMQP library with connection pool, publisher, subscriber, and dependency injection support.
Features
- Connection Pool: Efficiently manages AMQP connections using deadpool
- Simple Publisher API: Single
publish()method for all data types (&str,&[u8],&String,&Vec<u8>) - Subscriber: Receive messages from queues with handlers
- Worker Registry: Register and manage multiple workers with a clean pattern
- Auto Setup: Automatically creates exchanges and queues
- Retry Mechanism: Automatic retry with delay for failed messages
- Single Active Consumer: Ensure only one consumer processes messages at a time
- Prefetch Control: AMQP prefetch (QoS) configuration
- Parallel Processing: Configurable worker concurrency with async/blocking spawn
- Middleware: Custom middleware for logging, metrics, and distributed tracing (static-only)
- Distributed Tracing: Built-in trace ID generation with OpenTelemetry support
- Handler DI: Dependency injection for handlers with
Data<T>wrapper - Dependency Injection: Support for trait-based DI pattern
- Type Safe: Strong error handling with thiserror
- Async Handlers: All message handlers must be async for optimal performance
Installation
[]
= { = "./easy-rmq-rs" }
Breaking Changes
⚠️ Version 2.0+: All message handlers must be async functions. This change enables better performance and proper async/await support throughout the library.
Migration Guide:
// Old (sync handlers - NO LONGER SUPPORTED)
// New (async handlers - REQUIRED)
async
All handler functions now receive Vec<u8> (owned) instead of &[u8] (borrowed) to support async execution.
Quick Start
1. Start RabbitMQ
2. Subscriber Example (Run First!)
Open terminal 1 - Subscriber sets up queue & binding:
3. Publisher Example (Run Second)
Open terminal 2:
Press Ctrl+C on subscriber for graceful shutdown.
Architecture & Best Practices
🎯 Simple & Clean:
- Default Exchange:
amq.direct(RabbitMQ built-in) - Publisher: Auto-create exchange + send messages
- Subscriber: Auto-create exchange + queue + binding
- Worker Registry: Register multiple workers with clean pattern
- Retry: Automatic retry with delay for failed messages
- Prefetch: AMQP QoS control for message buffering
- Concurrency: Parallel worker processing
- Full Auto-Setup: No manual infrastructure needed
This follows AMQP best practices:
- Producer → Send to exchange (auto-created if not exists)
- Consumer → Auto-create everything + consume
- Registry → Manage multiple workers with consistent pattern
Basic Usage
Creating a Client
use AmqpClient;
async
Publisher
Publisher simple - send to default exchange:
use AmqpClient;
let client = new?;
let publisher = client.publisher;
// Publish text
publisher.publish.await?;
// Publish JSON
let order = Order ;
// Automatically serialized
let json = to_vec?;
publisher.publish.await?;
Publisher accepts multiple data types:
&str- string slices&[u8]- byte slices&String- owned strings&Vec<u8>- byte vectors
// All of these work:
publisher.publish.await?; // &str
publisher.publish.await?; // &[u8]
publisher.publish.await?; // &String
publisher.publish.await?; // &Vec<u8>
✅ Auto send to default exchange (amq.direct)
✅ Auto-create exchange if not exists (durable)
✅ No manual setup needed
✅ Simple API - One method for all data types
Multiple Exchanges
use ExchangeKind;
let client = new?;
// Publisher 1 - Direct exchange
let pub1 = client.publisher.with_exchange;
pub1.publish.await?;
// Publisher 2 - Topic exchange
let pub2 = client.publisher.with_exchange;
pub2.publish.await?;
// Publisher 3 - Fanout exchange
let pub3 = client.publisher.with_exchange;
pub3.publish.await?;
// Shortcut methods
let pub4 = client.publisher.with_topic;
let pub5 = client.publisher.with_direct;
let pub6 = client.publisher.with_fanout;
✅ Explicit - exchange type clear from parameters ✅ Flexible - Direct, Topic, Fanout, Headers ✅ Auto-create exchange with appropriate type
Subscriber with Worker Registry
⚠️ Important: All handlers must be async fn and receive Vec<u8> (owned bytes).
Use SubscriberRegistry to manage multiple workers:
use ;
use ExchangeKind;
async
async
async
Queue Format per Exchange Type:
| Exchange | Parameter | Queue Name | Routing Key |
|---|---|---|---|
| Direct | .queue("rk") |
rk.job |
rk |
| Topic | .routing_key("rk") + .queue("q") |
q |
rk |
| Fanout | .queue("q") |
q |
"" |
✅ Auto-created exchange + queue + binding
✅ Direct: queue auto-formatted with .job suffix
✅ Topic/Fanout: full control over queue name
Advanced Worker Configuration
Retry Mechanism
Automatically retry failed messages with delay:
new
.pool
.with_exchange
.queue
.retry // max 3 retries, 5 second delay
.build
How it works:
- Failed messages sent to
{queue}.retrywith TTL - After delay, message returns to original queue
- After max retries exceeded, sent to
{queue}.dlq(Dead Letter Queue) - Retry count tracked in message headers:
x-retry-count
Single Active Consumer
Enable single active consumer mode to ensure only one consumer processes messages from a queue at a time. This is crucial for scenarios requiring strict message ordering and avoiding race conditions, such as inventory management:
new
.pool
.with_exchange
.queue
.single_active_consumer
.prefetch // ⚠️ Must be 1 to avoid race conditions
.concurrency // ⚠️ Must be 1 to avoid race conditions
.build
Single Active Consumer behavior:
- Only one consumer actively receives messages from the queue
- Other consumers remain standby and take over if the active consumer fails
- Useful for:
- Inventory/stock updates - prevent overselling by processing sequentially
- Payment processing - ensure transactions are processed in order
- Workflow orchestration - maintain strict execution order
- High availability scenarios with automatic failover
⚠️ Important:
- MUST set
.prefetch(1)and.concurrency(1)to avoid race conditions - Messages MUST be processed sequentially (one at a time)
- Cannot be changed on existing queues (delete queue first if needed)
- Requires RabbitMQ 3.12+ with
single-active-consumerplugin enabled - Use
rabbitmq-plugins enable rabbitmq_single_active_consumerto enable
Why prefetch(1) and concurrency(1)?
- Single active consumer ensures only ONE consumer is active
- If prefetch > 1: Single consumer buffers multiple messages, risking race conditions
- If concurrency > 1: Single consumer runs parallel workers, breaking message ordering
- Both MUST be 1 to guarantee sequential, ordered processing
Example: Stock Update Race Condition
Without SAC (parallel processing):
Message 1: "Item A stock +10" → Consumer 1 reads stock: 50
Message 2: "Item A stock -5" → Consumer 2 reads stock: 50
Consumer 1 writes: 60
Consumer 2 writes: 45 ❌ Wrong! Should be 55
With SAC (sequential processing):
Message 1: "Item A stock +10" → reads: 50, writes: 60
Message 2: "Item A stock -5" → reads: 60, writes: 55 ✓ Correct!
Prefetch (QoS) Control
Control how many messages pre-fetched from broker:
new
.pool
.queue
.prefetch // Buffer 10 messages
.build
Prefetch behavior:
- Without
.concurrency(): Messages buffered, processed sequentially 1-by-1 - With
.concurrency(): Buffer size for parallel workers
Parallel Processing
Run multiple workers concurrently with controlled parallelism:
new
.pool
.queue
.prefetch // Buffer 50 messages
.concurrency // Spawn 10 parallel workers
.parallelize // Async tasks
.build
Configuration breakdown:
.prefetch(N)- AMQP prefetch count (buffer size from broker).concurrency(N)- Number of parallel worker tasks.parallelize(spawn_fn)- Spawn function for task creation
Spawn function options:
// Async I/O tasks (default, good for database/HTTP calls)
.parallelize
// CPU-intensive or blocking operations
.parallelize
Worker model:
- Each worker runs its own consumer loop with unique consumer tag
- Workers compete for messages from the same queue
- Prefetch divides evenly among workers (e.g., prefetch=50, 10 workers → 5 per worker)
Configuration Comparison:
| Scenario | .prefetch() |
.concurrency() |
.parallelize() |
Behavior |
|---|---|---|---|---|
| Sequential | Not set / 1 | Not set | Not set | 1 message at a time |
| Buffered | 10 | Not set | Not set | Buffer 10, process 1-by-1 |
| Parallel Async | 50 | 10 | tokio::task::spawn |
10 workers, async execution |
| Parallel Blocking | 50 | 10 | tokio::task::spawn_blocking |
10 workers, blocking threads |
Complete Example with All Features
new
.pool
.with_exchange
.queue
.single_active_consumer // Single active consumer mode
.retry // 3 retries, 5s delay
.prefetch // Must be 1 with SAC
.concurrency // Must be 1 with SAC
.parallelize // Async execution
.build
⚠️ Important: .concurrency() requires .parallelize() to be set
Middleware
Add middleware for logging, metrics, and distributed tracing:
use ;
use ExchangeKind;
// Define middleware functions
// Register with middleware
let worker = new
.register;
worker.run.await?;
Middleware execution order:
before()- Called before handler (for timing, etc.)- Handler function executed
after()- Called after handler (for logging, metrics, etc.)
Built-in middleware available:
examples/common/middleware::logging- Log message processingexamples/common/middleware::metrics- Track execution metrics with timingexamples/common/middleware::tracing- Extract and log trace IDs
Exchange Types Detail
Direct Exchange - Queue name auto-formatted with .job suffix:
new
.pool
.with_exchange
.queue // routing_key
.build
// Queue: "order.created.job"
// Binding: queue_bind("order.created.job", "order.events", "order.created")
Topic Exchange - Separate routing key and queue:
new
.pool
.with_exchange
.routing_key // routing pattern
.queue // queue name
.build
// Queue: "api_logs"
// Binding: queue_bind("api_logs", "logs", "order.*")
Fanout Exchange - Broadcast to all queues:
new
.pool
.with_exchange
.queue
.build
// Queue: "notification_q"
// Binding: queue_bind("notification_q", "events", "")
Distributed Tracing
Built-in support for distributed tracing with automatic or custom trace ID generation, perfect for tracking message flows through your system.
Publisher with Trace ID
use AmqpClient;
let client = new?;
// Option 1: Auto-generate trace ID (recommended for most cases)
client.publisher
.with_auto_trace_id
.publish
.await?;
// Option 2: Use custom trace ID (e.g., from OpenTelemetry)
client.publisher
.with_trace_id
.publish
.await?;
// Option 3: Generate standalone trace ID
use generate_trace_id;
let trace_id = generate_trace_id;
client.publisher
.with_trace_id
.publish
.await?;
Subscriber: Extract Trace ID
The subscriber automatically stores message headers in thread-local storage, accessible via easy_rmq::get_headers():
use Result;
// In your handler or middleware
async
Middleware: Automatic Trace ID Logging
Use the built-in tracing middleware for automatic trace ID extraction and logging:
use ;
use ExchangeKind;
// Add tracing middleware
let worker = new
.register;
Sample output:
INFO Message processed - trace-id: 19ca9a5f5e1-5e148b1f5008b7d8
WARN Message failed - trace-id: 19ca9a5f5e1-5e148b1f5008b7d8 | error: ...
OpenTelemetry Integration
For production distributed tracing with OpenTelemetry:
use TraceContextExt;
// Get trace ID from current OTel context
let context = current;
let span = context.span;
let trace_id = span.span_context.trace_id.to_string;
// Pass trace ID through message pipeline
client.publisher
.with_trace_id
.publish
.await?;
// Or auto-generate when no OTel context available
client.publisher
.with_auto_trace_id
.publish
.await?;
Benefits:
- ✅ Track messages across services
- ✅ Correlate logs with trace IDs
- ✅ Debug distributed systems
- ✅ Monitor message flows
- ✅ OTel-compatible
Trace ID format: {timestamp_hex}-{random_hex} (e.g., 19ca9a5f5e1-5e148b1f5008b7d8)
See also:
examples/common/middleware.rs- Built-in middleware implementations
Dependency Injection
This library supports two types of dependency injection:
Handler DI with Data<T> Wrapper
⚠️ Important: DI handlers must be async fn and receive Vec<u8> (owned bytes).
Inject dependencies into message handlers using the Data<T> wrapper:
use ;
use ExchangeKind;
// Handler with dependency injection
async
async
✅ Clean separation - Services defined separately from handlers
✅ Shared dependencies - Clone Data<T> across multiple workers
✅ Type safe - Compile-time dependency checking
✅ Testable - Easily inject mock services for testing
Publisher Trait-Based DI
Use traits for publisher dependency injection in services:
use ;
use Arc;
let client = new?;
let publisher: = new;
let order_service = new;
Complete DI Example: Publisher + Subscriber
Full example with multiple publishers and handlers with DI:
use ;
use ExchangeKind;
async
async
async
Examples
See examples/ folder for complete usage examples:
Core Examples
publisher.rs- Publisher with auto trace ID generationsubscriber.rs- Multi-worker with middleware, retry, prefetch, concurrency, and SAC
Dependency Injection Examples
dependency_injection.rs- Handler-level DI withData<T>dependency_injection_publisher.rs- Publisher trait-based DI pattern
Quick Start
# Terminal 1 - Start subscriber first
# Terminal 2 - Then publisher
# Terminal 1 - Dependency injection subscriber
# Terminal 2 - Dependency injection publisher
Common Middleware
Located in examples/common/middleware.rs:
logging- Log message processing resultsmetrics- Track success/error counts with execution timetracing- Extract and log trace IDs from message headers
Testing
Requirements
- Rust 1.70+
- RabbitMQ server (or Docker)
License
ISC