RustRabbit ๐ฐ
A high-performance, production-ready RabbitMQ client library for Rust with zero-configuration simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
๐ Quick Start - One Line Magic!
use ;
async
What RetryPolicy::fast() creates automatically:
- โ 5 retries: 200ms โ 300ms โ 450ms โ 675ms โ 1s (capped at 10s)
- โ Dead Letter Queue: Automatic DLX/DLQ setup for failed messages
- โ Backoff + Jitter: Intelligent delay with randomization
- โ Production Ready: Optimal settings for most use cases
๐ฆ Installation
[]
= "0.3.0"
= { = "1.0", = ["full"] }
= { = "1.0", = ["derive"] }
๐ก Core Features
๐ฏ Smart Automation
- One-Line Setup:
RetryPolicy::fast()configures everything - Auto Infrastructure: Queues, exchanges, and bindings created automatically
- Intelligent Defaults: Production-ready settings without configuration
- Dead Letter Automation: Automatic failure recovery and monitoring
๐ Advanced Retry System
- Multiple Presets: Fast, slow, linear, and custom patterns
- Exponential Backoff: Smart delay calculations with jitter
- Delayed Message Exchange: RabbitMQ x-delayed-message integration
- Dead Letter Integration: Seamless failure handling
๐๏ธ Enterprise Messaging Patterns
- Request-Response: RPC-style messaging with correlation IDs and timeouts
- Saga Pattern: Distributed transaction coordination with compensation actions
- Event Sourcing: CQRS implementation with event store and aggregate management
- Message Deduplication: Multiple strategies for duplicate message detection
- Priority Queues: Configurable priority-based message processing
๐ Production Observability
- Prometheus Metrics: Comprehensive metrics for throughput, latency, errors
- Health Monitoring: Real-time connection health with auto-recovery
- Circuit Breaker: Automatic failure detection and graceful degradation
- Structured Logging: Distributed tracing with correlation IDs
๐ก๏ธ Reliability & Performance
- Connection Pooling: Automatic connection management with health monitoring
- Graceful Shutdown: Multi-phase shutdown with signal handling
- Error Recovery: Comprehensive error handling with recovery strategies
- Type Safety: Strongly typed message handling with serde integration
- Minutes Retry Preset: One-line setup for business-critical operations
- Intelligent Defaults: Production-ready settings out of the box
- Dead Letter Handling: Automatic failure recovery and monitoring
๐ Intelligent Retry Patterns
// Quick presets for common scenarios
fast // 1s, 2s, 4s, 8s, 16s (transient failures)
slow // 10s, 20s, 40s, 80s, 160s (resource-heavy)
aggressive // 15 retries with exponential backoff
minutes_exponential // 1min, 2min, 4min, 8min, 16min (business-critical)
// Custom builder
builder
.max_retries
.initial_delay
.backoff_multiplier
.jitter // 20% randomization prevents thundering herd
.dead_letter_exchange
.build
๐๏ธ Enterprise Messaging Patterns (Phase 2 - NEW)
- Request-Response: RPC-style messaging with correlation IDs and timeouts
- Saga Pattern: Distributed transaction coordination with compensation
- Event Sourcing: CQRS implementation with event store and snapshots
- Message Deduplication: Multiple strategies for duplicate detection
- Priority Queues: Configurable priority-based message processing
๐ Production Observability
- Prometheus Metrics: Throughput, latency, error rates, queue depths
- Health Monitoring: Real-time connection health with auto-recovery
- Circuit Breaker: Automatic failure detection and graceful degradation
- Structured Logging: Distributed tracing with correlation IDs
๐ Usage Examples
โก Retry Patterns & Setup
Quick Retry Presets
| Preset | Retries | Initial Delay | Max Delay | Backoff | Use Case |
|---|---|---|---|---|---|
RetryPolicy::fast() |
5 | 200ms | 10s | 1.5x | API calls, DB queries |
RetryPolicy::slow() |
3 | 1s | 1min | 2.0x | Heavy processing |
RetryPolicy::linear(500ms, 3) |
3 | 500ms | 500ms | 1.0x | Fixed intervals |
Consumer Setup Patterns
// ๐ฅ SIรU NHANH - Copy/paste setup:
let config = builder
.connection_string
.build;
let connection = new.await?;
let options = ConsumerOptions ;
let consumer = new.await?;
// โก Custom vแปi builder
let retry = builder
.fast_preset // Dรนng preset lร m base
.max_retries // Override sแป lแบงn retry
.build;
// ๐ Ultra custom
let retry = builder
.max_retries
.initial_delay
.backoff_multiplier
.jitter
.dead_letter_exchange
.build;
Message Handling Pattern
consumer.consume.await?;
๐ Prefetch Count Debugging
Common Issue: prefetch_count khรดng hoแบกt ฤแปng?
โ Sai (prefetch_count bแป ignore):
ConsumerOptions
โ ฤรบng (prefetch_count hoแบกt ฤแปng):
ConsumerOptions
Tแบกi sao?
prefetch_countchแป hoแบกt ฤแปng khi cรณ messages "chฦฐa ฤฦฐแปฃc ACK"auto_ack: trueโ Messages ฤฦฐแปฃc ACK ngay โ Khรดng cรณ backpressureauto_ack: falseโ Messages ฤแปฃi manual ACK โ QoS limits work
Debug Commands
# Compile check
# Run vแปi RabbitMQ
# Test prefetch behavior
Smart Publisher with Auto-Declare
use ;
let publisher = rabbit.publisher;
// Auto-declare exchange when publishing
let options = builder
.auto_declare_exchange
.durable
.build;
publisher.publish_to_exchange.await?;
Advanced Retry Configuration
// Business-critical with custom settings
let custom_retry = builder
.max_retries
.initial_delay
.backoff_multiplier
.jitter // 20% randomization
.dead_letter_exchange
.dead_letter_queue
.build;
// Use with consumer
let options = builder
.auto_declare_queue
.auto_declare_exchange
.retry_policy
.prefetch_count // Reliable processing
.manual_ack // Explicit acknowledgment
.build;
๐๏ธ Advanced Patterns (Phase 2 - NEW)
Request-Response (RPC)
use *;
// Server side
let server = new;
server.handle_requests.await?;
// Client side
let client = new;
let response: CalculateResponse = client
.send_request
.with_timeout
.await?;
Saga Pattern (Distributed Transactions)
use *;
// Define compensation logic
async
async
// Execute saga
let mut coordinator = new;
let mut saga = new;
saga.add_step;
match coordinator.execute_saga.await
Event Sourcing (CQRS)
use *;
// Usage
let event_store = new;
let repository = new;
let mut account = new;
account.deposit?; // Generates MoneyDeposited event
repository.save.await?;
๐ Production Features
Health Monitoring
use HealthChecker;
let health_checker = new;
match health_checker.check_health.await
use rust_rabbit::metrics::PrometheusMetrics;
let metrics = PrometheusMetrics::new();
// Metrics are automatically collected:
// - rust_rabbit_messages_published_total
// - rust_rabbit_messages_consumed_total
// - rust_rabbit_message_processing_duration_seconds
// - rust_rabbit_connection_health
// - rust_rabbit_queue_depth
// Expose metrics endpoint warp::serve(metrics.metrics_handler()) .run(([0, 0, 0, 0], 9090)) .await;
## ๐ **Examples**
The library includes comprehensive examples for all features:
### **Core Examples**
- **`consumer_example.rs`** - Basic consumer setup and message handling
- **`publisher_example.rs`** - Publishing messages with different options
- **`fast_consumer_template.rs`** - Production-ready consumer template
### **Retry & Error Handling**
- **`quick_retry_consumer.rs`** - All retry preset demonstrations
- **`retry_example.rs`** - Custom retry logic implementation
- **`retry_policy_presets.rs`** - Retry policy patterns
### **Advanced Patterns**
- **`event_sourcing_example.rs`** - CQRS and event sourcing implementation
- **`saga_example.rs`** - Distributed transaction coordination
- **`phase2_patterns_example.rs`** - Enterprise messaging patterns
### **Production Features**
- **`health_monitoring_example.rs`** - Health checks and monitoring
- **`metrics_example.rs`** - Prometheus metrics integration
- **`comprehensive_demo.rs`** - Full-featured production example
### **Integration**
- **`actix_web_api_example.rs`** - Web API integration
- **`builder_pattern_example.rs`** - Configuration patterns
Run any example:
```bash
cargo run --example consumer_example
cargo run --example fast_consumer_template
๐งช Testing
Integration Testing with Docker
RustRabbit supports real RabbitMQ integration testing:
# Start RabbitMQ with required plugins
# Run integration tests
# Or use make
Unit Tests
# Run unit tests
# Run with coverage
# Test specific modules
๐จ Common Mistakes & Best Practices
โ Common Mistakes
-
Wrong prefetch_count setup:
auto_ack: true, // โ Sai! prefetch_count khรดng hoแบกt ฤแปng -
Not handling retry errors:
// โ Khรดng phรขn biแปt retryable vs non-retryable errors Err => delivery.nack.await?, -
Missing manual ACK:
// โ Quรชn ACK message // Message sแบฝ bแป stuck แป unACK'd state
โ Best Practices
-
Always use auto_ack: false for production:
ConsumerOptions -
Implement smart retry logic:
match error -
Set appropriate prefetch_count:
prefetch_count: Some, // For I/O bound prefetch_count: Some, // For CPU bound prefetch_count: Some, // For memory-heavy tasks -
Always handle graceful shutdown:
select!
๐ง Configuration
Connection Configuration
let config = builder
.connection_string
.connection_timeout
.heartbeat
.retry_config
.health_check_interval
.pool_config // min 5, max 10 connections
.build;
Consumer Configuration
ConsumerOptions
๐ Performance Tips
- prefetch_count: Set to
CPU cores ร 2-5for I/O bound tasks - concurrent_limit: 50-200 for I/O bound, fewer for CPU bound
- Connection pooling: Use 5-10 connections per application
- DLQ monitoring: Always monitor dead letter queues
- Metrics: Enable Prometheus for production monitoring
๐ค Contributing
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch:
git checkout -b my-feature - Make changes and add tests
- Run tests:
cargo test - Submit a pull request
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Links
- Documentation: docs.rs/rust-rabbit
- Crates.io: crates.io/crates/rust-rabbit
- GitHub: github.com/nghiaphamln/rust-rabbit
- Issues: github.com/nghiaphamln/rust-rabbit/issues
Made with โค๏ธ for the Rust community use rust_rabbit::metrics::RustRabbitMetrics;
let metrics = RustRabbitMetrics::new()?; let rabbit = RustRabbit::with_metrics(config, metrics.clone()).await?;
// Automatic metrics collection: // - Message throughput (published/consumed per second) // - Processing latency (P50, P90, P99) // - Error rates (failed messages, connection errors) // - Queue depths (pending messages) // - Connection health (active, reconnections)
### **Circuit Breaker**
```rust
use rust_rabbit::circuit_breaker::CircuitBreakerConfig;
let config = RabbitConfig::builder()
.connection_string("amqp://localhost:5672")
.circuit_breaker(CircuitBreakerConfig {
failure_threshold: 5,
failure_window: Duration::from_secs(60),
recovery_timeout: Duration::from_secs(30),
success_threshold: 3,
half_open_max_requests: 5,
})
.build();
// Circuit breaker automatically handles connection failures
๐ Performance
RustRabbit v0.3.0 Benchmarks:
| Metric | Value | Improvement vs v0.2.0 |
|---|---|---|
| Throughput | 75,000+ msgs/sec | +50% |
| Latency (P99) | < 8ms | -20% |
| Memory Usage | < 45MB baseline | -10% |
| Connection Pool | 10-100 connections | Stable |
Advanced Pattern Performance:
| Pattern | Throughput | Memory Overhead | Best Use Case |
|---|---|---|---|
| Request-Response | 25,000 req/sec | +5MB | RPC, API calls |
| Saga | 10,000 flows/sec | +8MB | Distributed transactions |
| Event Sourcing | 50,000 events/sec | +15MB | CQRS, audit trails |
| Priority Queue | 60,000 msgs/sec | +2MB | Task prioritization |
Benchmarks: Intel i7-10700K, 32GB RAM, RabbitMQ 3.12
๐ ๏ธ Configuration
Builder Pattern Configuration
use ;
// Environment-specific configs
let prod_config = builder
.connection_string
.connection_timeout
.retry
.health
.pool
.build;
// Consumer configurations
let reliable_options = builder
.consumer_tag
.minutes_retry // Auto-configure for reliability
.prefetch_count // Process one at a time
.build;
let high_throughput_options = builder
.consumer_tag
.high_throughput // Optimize for speed
.auto_declare_queue
.build;
๐งช Testing
RustRabbit includes comprehensive test coverage:
# Unit tests (58 tests)
# Integration tests with real RabbitMQ
# Examples compilation
# Performance benchmarks
Test Coverage:
- โ End-to-end message flows
- โ Retry mechanisms with delayed exchange
- โ Health monitoring and recovery
- โ All advanced patterns (Phase 2)
- โ Concurrent processing scenarios
- โ Error handling and edge cases
๐ Examples
Comprehensive examples in the examples/ directory:
# Core features
# Advanced patterns (Phase 2)
# Comparison examples
๐บ๏ธ Roadmap
โ Phase 1 (v0.2.0) - COMPLETED
- Prometheus metrics integration
- Circuit breaker pattern
- Health monitoring
- Connection pooling
โ Phase 2 (v0.3.0) - COMPLETED
- Request-Response pattern
- Saga pattern for distributed transactions
- Event sourcing with CQRS
- Message deduplication
- Priority queues
- Minutes retry preset - Zero-config production setup
๐ฎ Phase 3 (v0.4.0) - Enterprise
- Multi-broker support with failover
- Message encryption at rest
- Schema registry integration
- Advanced routing patterns
- Performance optimizations
๐ค Contributing
We welcome contributions! Areas for improvement:
- ๐ Bug fixes and performance improvements
- ๐ Documentation and examples
- โจ New features from roadmap
- ๐งช Additional test coverage
- ๐ Benchmarks and optimizations
๐ Support
- ๐ Documentation
- ๐ฌ GitHub Discussions
- ๐ Issue Tracker
- ๐ง Email: nghiaphamln3@gmail.com
๐ License
MIT License - see LICENSE file for details.
๐ Acknowledgments
- Inspired by MassTransit for .NET
- Built on lapin for AMQP protocol
- Powered by Prometheus for metrics
โญ Star us on GitHub if RustRabbit helps your project! โญ
GitHub โข Crates.io โข Docs.rs
Built with โค๏ธ for the Rust community