# RustRabbit ๐ฐ
[](https://github.com/nghiaphamln/rust-rabbit/actions)
[](https://crates.io/crates/rust-rabbit)
[](https://docs.rs/rust-rabbit)
[](https://opensource.org/licenses/MIT)
A **high-performance, production-ready** RabbitMQ client library for Rust with **zero-configuration** simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
## ๐ **Quick Start - One Line Magic!**
```rust
use rust_rabbit::{
config::RabbitConfig,
connection::ConnectionManager,
consumer::{Consumer, ConsumerOptions},
retry::RetryPolicy,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Connection
let config = RabbitConfig::builder()
.connection_string("amqp://user:pass@localhost:5672/vhost")
.build();
let connection = ConnectionManager::new(config).await?;
// 2. Consumer vแปi retry (1 dรฒng!)
let options = ConsumerOptions {
auto_ack: false,
retry_policy: Some(RetryPolicy::fast()),
..Default::default()
};
// 3. Start consuming
let consumer = Consumer::new(connection, options).await?;
consumer.consume("queue_name", |delivery| async move {
match your_processing_logic(&delivery.data).await {
Ok(_) => delivery.ack(Default::default()).await?,
Err(e) if is_retryable(&e) => delivery.nack(Default::default()).await?,
Err(_) => delivery.reject(Default::default()).await?,
}
Ok(())
}).await?;
Ok(())
}
```
**What `RetryPolicy::fast()` creates automatically:**
- โ
**5 retries**: 200ms โ 300ms โ 450ms โ 675ms โ 1s (capped at 10s)
- โ
**Dead Letter Queue**: Automatic DLX/DLQ setup for failed messages
- โ
**Backoff + Jitter**: Intelligent delay with randomization
- โ
**Production Ready**: Optimal settings for most use cases
## ๐ฆ **Installation**
```toml
[dependencies]
rust-rabbit = "0.3.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
## ๐ก **Core Features**
### ๐ฏ **Smart Automation**
- **One-Line Setup**: `RetryPolicy::fast()` configures everything
- **Auto Infrastructure**: Queues, exchanges, and bindings created automatically
- **Intelligent Defaults**: Production-ready settings without configuration
- **Dead Letter Automation**: Automatic failure recovery and monitoring
### ๐ **Advanced Retry System**
- **Multiple Presets**: Fast, slow, linear, and custom patterns
- **Exponential Backoff**: Smart delay calculations with jitter
- **Delayed Message Exchange**: RabbitMQ x-delayed-message integration
- **Dead Letter Integration**: Seamless failure handling
### ๐๏ธ **Enterprise Messaging Patterns**
- **Request-Response**: RPC-style messaging with correlation IDs and timeouts
- **Saga Pattern**: Distributed transaction coordination with compensation actions
- **Event Sourcing**: CQRS implementation with event store and aggregate management
- **Message Deduplication**: Multiple strategies for duplicate message detection
- **Priority Queues**: Configurable priority-based message processing
### ๐ **Production Observability**
- **Prometheus Metrics**: Comprehensive metrics for throughput, latency, errors
- **Health Monitoring**: Real-time connection health with auto-recovery
- **Circuit Breaker**: Automatic failure detection and graceful degradation
- **Structured Logging**: Distributed tracing with correlation IDs
### ๐ก๏ธ **Reliability & Performance**
- **Connection Pooling**: Automatic connection management with health monitoring
- **Graceful Shutdown**: Multi-phase shutdown with signal handling
- **Error Recovery**: Comprehensive error handling with recovery strategies
- **Type Safety**: Strongly typed message handling with serde integration
- **Minutes Retry Preset**: One-line setup for business-critical operations
- **Intelligent Defaults**: Production-ready settings out of the box
- **Dead Letter Handling**: Automatic failure recovery and monitoring
### ๐ **Intelligent Retry Patterns**
```rust
// Quick presets for common scenarios
RetryPolicy::fast() // 1s, 2s, 4s, 8s, 16s (transient failures)
RetryPolicy::slow() // 10s, 20s, 40s, 80s, 160s (resource-heavy)
RetryPolicy::aggressive() // 15 retries with exponential backoff
RetryPolicy::minutes_exponential() // 1min, 2min, 4min, 8min, 16min (business-critical)
// Custom builder
RetryPolicy::builder()
.max_retries(5)
.initial_delay(Duration::from_secs(30))
.backoff_multiplier(1.5)
.jitter(0.2) // 20% randomization prevents thundering herd
.dead_letter_exchange("failed.orders")
.build()
```
### ๐๏ธ **Enterprise Messaging Patterns** *(Phase 2 - NEW)*
- **Request-Response**: RPC-style messaging with correlation IDs and timeouts
- **Saga Pattern**: Distributed transaction coordination with compensation
- **Event Sourcing**: CQRS implementation with event store and snapshots
- **Message Deduplication**: Multiple strategies for duplicate detection
- **Priority Queues**: Configurable priority-based message processing
### ๐ **Production Observability**
- **Prometheus Metrics**: Throughput, latency, error rates, queue depths
- **Health Monitoring**: Real-time connection health with auto-recovery
- **Circuit Breaker**: Automatic failure detection and graceful degradation
- **Structured Logging**: Distributed tracing with correlation IDs
## ๐ **Usage Examples**
## โก **Retry Patterns & Setup**
### **Quick Retry Presets**
| `RetryPolicy::fast()` | 5 | 200ms | 10s | 1.5x | API calls, DB queries |
| `RetryPolicy::slow()` | 3 | 1s | 1min | 2.0x | Heavy processing |
| `RetryPolicy::linear(500ms, 3)` | 3 | 500ms | 500ms | 1.0x | Fixed intervals |
### **Consumer Setup Patterns**
```rust
// ๐ฅ SIรU NHANH - Copy/paste setup:
let config = RabbitConfig::builder()
.connection_string("amqp://user:pass@localhost:5672/vhost")
.build();
let connection = ConnectionManager::new(config).await?;
let options = ConsumerOptions {
auto_ack: false,
retry_policy: Some(RetryPolicy::fast()),
prefetch_count: Some(10),
..Default::default()
};
let consumer = Consumer::new(connection, options).await?;
// โก Custom vแปi builder
let retry = RetryPolicy::builder()
.fast_preset() // Dรนng preset lร m base
.max_retries(3) // Override sแป lแบงn retry
.build();
// ๐ Ultra custom
let retry = RetryPolicy::builder()
.max_retries(5)
.initial_delay(Duration::from_millis(100))
.backoff_multiplier(2.0)
.jitter(0.1)
.dead_letter_exchange("my.dlx")
.build();
```
### **Message Handling Pattern**
```rust
consumer.consume("queue_name", |delivery| async move {
match process_message(&delivery.data).await {
Ok(_) => {
// โ
Success -> ACK
delivery.ack(Default::default()).await?;
}
Err(e) if should_retry(&e) => {
// โ ๏ธ Retryable error -> NACK (will retry)
delivery.nack(Default::default()).await?;
}
Err(_) => {
// โ Fatal error -> REJECT (send to DLQ)
delivery.reject(Default::default()).await?;
}
}
Ok(())
}).await?;
fn should_retry(error: &MyError) -> bool {
match error {
MyError::NetworkTimeout => true, // Retry
MyError::ApiRateLimit => true, // Retry
MyError::TempUnavailable => true, // Retry
MyError::InvalidData => false, // Don't retry
MyError::Unauthorized => false, // Don't retry
}
}
```
## ๐ **Prefetch Count Debugging**
**Common Issue**: `prefetch_count` khรดng hoแบกt ฤแปng?
### โ **Sai** (prefetch_count bแป ignore):
```rust
ConsumerOptions {
auto_ack: true, // โ Sai! Messages ฤฦฐแปฃc ACK ngay lแบญp tแปฉc
prefetch_count: Some(5), // โ Khรดng cรณ tรกc dแปฅng
..Default::default()
}
```
### โ
**ฤรบng** (prefetch_count hoแบกt ฤแปng):
```rust
ConsumerOptions {
auto_ack: false, // โ ฤรบng! Messages phแบฃi ACK thแปง cรดng
prefetch_count: Some(5), // โ Giแปi hแบกn 5 messages chฦฐa ACK
..Default::default()
}
```
**Tแบกi sao?**
- `prefetch_count` chแป hoแบกt ฤแปng khi cรณ messages "chฦฐa ฤฦฐแปฃc ACK"
- `auto_ack: true` โ Messages ฤฦฐแปฃc ACK ngay โ Khรดng cรณ backpressure
- `auto_ack: false` โ Messages ฤแปฃi manual ACK โ QoS limits work
### **Debug Commands**
```bash
# Compile check
cargo check
# Run vแปi RabbitMQ
cargo run --example fast_consumer_template
# Test prefetch behavior
cargo run --example prefetch_verification
```
### **Smart Publisher with Auto-Declare**
```rust
use rust_rabbit::publisher::{Publisher, PublishOptions};
let publisher = rabbit.publisher();
// Auto-declare exchange when publishing
let options = PublishOptions::builder()
.auto_declare_exchange()
.durable()
.build();
publisher.publish_to_exchange(
"orders.processing", // exchange (auto-created)
"orders.processing", // routing key
&order_message,
Some(options)
).await?;
```
### **Advanced Retry Configuration**
```rust
// Business-critical with custom settings
let custom_retry = RetryPolicy::builder()
.max_retries(3)
.initial_delay(Duration::from_secs(30))
.backoff_multiplier(1.5)
.jitter(0.2) // 20% randomization
.dead_letter_exchange("failed.orders.dlx")
.dead_letter_queue("failed.orders.dlq")
.build();
// Use with consumer
let options = ConsumerOptions::builder("orders.processing")
.auto_declare_queue()
.auto_declare_exchange()
.retry_policy(custom_retry)
.prefetch_count(1) // Reliable processing
.manual_ack() // Explicit acknowledgment
.build();
```
## ๐๏ธ **Advanced Patterns** *(Phase 2 - NEW)*
### **Request-Response (RPC)**
```rust
use rust_rabbit::patterns::request_response::*;
// Server side
let server = RequestResponseServer::new(rabbit.clone(), "calc_queue".to_string());
server.handle_requests(|req: CalculateRequest| async move {
Ok(CalculateResponse { result: req.x + req.y })
}).await?;
// Client side
let client = RequestResponseClient::new(rabbit, "calc_queue".to_string());
let response: CalculateResponse = client
.send_request(&CalculateRequest { x: 5, y: 3 })
.with_timeout(Duration::from_secs(30))
.await?;
```
### **Saga Pattern (Distributed Transactions)**
```rust
use rust_rabbit::patterns::saga::*;
// Define compensation logic
async fn reserve_inventory(order_id: &str) -> Result<(), Box<dyn std::error::Error>> {
println!("โ
Reserved inventory for order {}", order_id);
Ok(())
}
async fn compensate_inventory(order_id: &str) -> Result<(), Box<dyn std::error::Error>> {
println!("๐ Released inventory for order {}", order_id);
Ok(())
}
// Execute saga
let mut coordinator = SagaCoordinator::new(rabbit);
let mut saga = SagaInstance::new("order-saga-123".to_string());
saga.add_step(
"reserve_inventory",
|data| Box::pin(reserve_inventory(&data)),
|data| Box::pin(compensate_inventory(&data))
);
match coordinator.execute_saga(saga, "order-456".to_string()).await {
Ok(_) => println!("โ
Saga completed successfully"),
Err(e) => println!("โ Saga failed, compensation completed: {}", e),
}
```
### **Event Sourcing (CQRS)**
```rust
use rust_rabbit::patterns::event_sourcing::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BankAccount {
id: AggregateId,
sequence: EventSequence,
balance: f64,
}
impl AggregateRoot for BankAccount {
fn apply_event(&mut self, event: &DomainEvent) -> Result<()> {
match event.event_type.as_str() {
"MoneyDeposited" => {
let amount: f64 = serde_json::from_slice(&event.event_data)?;
self.balance += amount;
}
"MoneyWithdrawn" => {
let amount: f64 = serde_json::from_slice(&event.event_data)?;
self.balance -= amount;
}
_ => {}
}
Ok(())
}
}
// Usage
let event_store = Arc::new(InMemoryEventStore::new());
let repository = EventSourcingRepository::<BankAccount>::new(event_store);
let mut account = BankAccount::new(AggregateId::new());
account.deposit(100.0)?; // Generates MoneyDeposited event
repository.save(&mut account).await?;
```
## ๐ **Production Features**
### **Health Monitoring**
```rust
use rust_rabbit::health::HealthChecker;
let health_checker = HealthChecker::new(connection_manager.clone());
match health_checker.check_health().await {
Ok(status) => println!("Connection healthy: {:?}", status),
Err(e) => println!("Connection issues: {}", e),
}
```
use rust_rabbit::metrics::PrometheusMetrics;
let metrics = PrometheusMetrics::new();
// Metrics are automatically collected:
// - rust_rabbit_messages_published_total
// - rust_rabbit_messages_consumed_total
// - rust_rabbit_message_processing_duration_seconds
// - rust_rabbit_connection_health
// - rust_rabbit_queue_depth
// Expose metrics endpoint
warp::serve(metrics.metrics_handler())
.run(([0, 0, 0, 0], 9090))
.await;
```
## ๐ **Examples**
The library includes comprehensive examples for all features:
### **Core Examples**
- **`consumer_example.rs`** - Basic consumer setup and message handling
- **`publisher_example.rs`** - Publishing messages with different options
- **`fast_consumer_template.rs`** - Production-ready consumer template
### **Retry & Error Handling**
- **`quick_retry_consumer.rs`** - All retry preset demonstrations
- **`retry_example.rs`** - Custom retry logic implementation
- **`retry_policy_presets.rs`** - Retry policy patterns
### **Advanced Patterns**
- **`event_sourcing_example.rs`** - CQRS and event sourcing implementation
- **`saga_example.rs`** - Distributed transaction coordination
- **`phase2_patterns_example.rs`** - Enterprise messaging patterns
### **Production Features**
- **`health_monitoring_example.rs`** - Health checks and monitoring
- **`metrics_example.rs`** - Prometheus metrics integration
- **`comprehensive_demo.rs`** - Full-featured production example
### **Integration**
- **`actix_web_api_example.rs`** - Web API integration
- **`builder_pattern_example.rs`** - Configuration patterns
Run any example:
```bash
cargo run --example consumer_example
cargo run --example fast_consumer_template
```
## ๐งช **Testing**
### **Integration Testing with Docker**
RustRabbit supports real RabbitMQ integration testing:
```bash
# Start RabbitMQ with required plugins
docker-compose -f docker-compose.test.yml up -d
# Run integration tests
cargo test --test integration_example -- --test-threads=1
# Or use make
make test-integration
```
### **Unit Tests**
```bash
# Run unit tests
cargo test
# Run with coverage
cargo test --all-features
# Test specific modules
cargo test consumer::tests
cargo test retry::tests
```
## ๐จ **Common Mistakes & Best Practices**
### โ **Common Mistakes**
1. **Wrong prefetch_count setup**:
```rust
auto_ack: true, ```
2. **Not handling retry errors**:
```rust
Err(_) => delivery.nack(Default::default()).await?,
```
3. **Missing manual ACK**:
```rust
```
### โ
**Best Practices**
1. **Always use auto_ack: false for production**:
```rust
ConsumerOptions {
auto_ack: false, retry_policy: Some(RetryPolicy::fast()),
..Default::default()
}
```
2. **Implement smart retry logic**:
```rust
match error {
ApiError::Timeout => delivery.nack(Default::default()).await?, ApiError::Unauthorized => delivery.reject(Default::default()).await?, }
```
3. **Set appropriate prefetch_count**:
```rust
prefetch_count: Some(cpu_cores * 2), prefetch_count: Some(cpu_cores), prefetch_count: Some(1), ```
4. **Always handle graceful shutdown**:
```rust
tokio::select! {
_ = consumer.consume("queue", handler) => {}
_ = tokio::signal::ctrl_c() => {
println!("Shutting down gracefully...");
}
}
```
## ๐ง **Configuration**
### **Connection Configuration**
```rust
let config = RabbitConfig::builder()
.connection_string("amqp://user:pass@localhost:5672/vhost")
.connection_timeout(Duration::from_secs(30))
.heartbeat(Duration::from_secs(10))
.retry_config(RetryConfig::default())
.health_check_interval(Duration::from_secs(30))
.pool_config(PoolConfig::new(5, 10)) // min 5, max 10 connections
.build();
```
### **Consumer Configuration**
```rust
ConsumerOptions {
auto_ack: false, // Manual ACK for reliability
prefetch_count: Some(10), // QoS prefetch limit
retry_policy: Some(RetryPolicy::fast()), // Retry configuration
concurrent_limit: Some(50), // Max concurrent messages
..Default::default()
}
```
## ๐ **Performance Tips**
- **prefetch_count**: Set to `CPU cores ร 2-5` for I/O bound tasks
- **concurrent_limit**: 50-200 for I/O bound, fewer for CPU bound
- **Connection pooling**: Use 5-10 connections per application
- **DLQ monitoring**: Always monitor dead letter queues
- **Metrics**: Enable Prometheus for production monitoring
## ๐ค **Contributing**
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
1. Fork the repository
2. Create a feature branch: `git checkout -b my-feature`
3. Make changes and add tests
4. Run tests: `cargo test`
5. Submit a pull request
## ๐ **License**
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## ๐ **Links**
- **Documentation**: [docs.rs/rust-rabbit](https://docs.rs/rust-rabbit)
- **Crates.io**: [crates.io/crates/rust-rabbit](https://crates.io/crates/rust-rabbit)
- **GitHub**: [github.com/nghiaphamln/rust-rabbit](https://github.com/nghiaphamln/rust-rabbit)
- **Issues**: [github.com/nghiaphamln/rust-rabbit/issues](https://github.com/nghiaphamln/rust-rabbit/issues)
---
**Made with โค๏ธ for the Rust community**
use rust_rabbit::metrics::RustRabbitMetrics;
let metrics = RustRabbitMetrics::new()?;
let rabbit = RustRabbit::with_metrics(config, metrics.clone()).await?;
// Automatic metrics collection:
// - Message throughput (published/consumed per second)
// - Processing latency (P50, P90, P99)
// - Error rates (failed messages, connection errors)
// - Queue depths (pending messages)
// - Connection health (active, reconnections)
```
### **Circuit Breaker**
```rust
use rust_rabbit::circuit_breaker::CircuitBreakerConfig;
let config = RabbitConfig::builder()
.connection_string("amqp://localhost:5672")
.circuit_breaker(CircuitBreakerConfig {
failure_threshold: 5,
failure_window: Duration::from_secs(60),
recovery_timeout: Duration::from_secs(30),
success_threshold: 3,
half_open_max_requests: 5,
})
.build();
// Circuit breaker automatically handles connection failures
```
## ๐ **Performance**
**RustRabbit v0.3.0 Benchmarks:**
| **Throughput** | 75,000+ msgs/sec | +50% |
| **Latency (P99)** | < 8ms | -20% |
| **Memory Usage** | < 45MB baseline | -10% |
| **Connection Pool** | 10-100 connections | Stable |
**Advanced Pattern Performance:**
| **Request-Response** | 25,000 req/sec | +5MB | RPC, API calls |
| **Saga** | 10,000 flows/sec | +8MB | Distributed transactions |
| **Event Sourcing** | 50,000 events/sec | +15MB | CQRS, audit trails |
| **Priority Queue** | 60,000 msgs/sec | +2MB | Task prioritization |
*Benchmarks: Intel i7-10700K, 32GB RAM, RabbitMQ 3.12*
## ๐ ๏ธ **Configuration**
### **Builder Pattern Configuration**
```rust
use rust_rabbit::{RabbitConfig, consumer::ConsumerOptions};
// Environment-specific configs
let prod_config = RabbitConfig::builder()
.connection_string("amqp://prod-server:5672")
.connection_timeout(Duration::from_secs(30))
.retry(|retry| retry.aggressive())
.health(|health| health.frequent())
.pool(|pool| pool.high_throughput())
.build();
// Consumer configurations
let reliable_options = ConsumerOptions::builder("critical-orders")
.consumer_tag("critical-processor")
.minutes_retry() // Auto-configure for reliability
.prefetch_count(1) // Process one at a time
.build();
let high_throughput_options = ConsumerOptions::builder("bulk-orders")
.consumer_tag("bulk-processor")
.high_throughput() // Optimize for speed
.auto_declare_queue()
.build();
```
## ๐งช **Testing**
RustRabbit includes comprehensive test coverage:
```bash
# Unit tests (58 tests)
cargo test --lib
# Integration tests with real RabbitMQ
docker-compose -f docker-compose.test.yml up -d
cargo test --test integration_example -- --test-threads=1
# Examples compilation
cargo check --examples
# Performance benchmarks
cargo bench
```
**Test Coverage:**
- โ
End-to-end message flows
- โ
Retry mechanisms with delayed exchange
- โ
Health monitoring and recovery
- โ
All advanced patterns (Phase 2)
- โ
Concurrent processing scenarios
- โ
Error handling and edge cases
## ๐ **Examples**
Comprehensive examples in the `examples/` directory:
```bash
# Core features
cargo run --example minutes_retry_preset # NEW: One-line retry setup
cargo run --example simple_auto_consumer_example
cargo run --example retry_policy_demo
# Advanced patterns (Phase 2)
cargo run --example phase2_patterns_example # Comprehensive demo
cargo run --example saga_example # E-commerce workflow
cargo run --example event_sourcing_example # Bank account CQRS
# Comparison examples
cargo run --example before_vs_after_setup # Shows complexity reduction
```
## ๐บ๏ธ **Roadmap**
### โ
**Phase 1 (v0.2.0) - COMPLETED**
- Prometheus metrics integration
- Circuit breaker pattern
- Health monitoring
- Connection pooling
### โ
**Phase 2 (v0.3.0) - COMPLETED**
- Request-Response pattern
- Saga pattern for distributed transactions
- Event sourcing with CQRS
- Message deduplication
- Priority queues
- **Minutes retry preset** - Zero-config production setup
### ๐ฎ **Phase 3 (v0.4.0) - Enterprise**
- Multi-broker support with failover
- Message encryption at rest
- Schema registry integration
- Advanced routing patterns
- Performance optimizations
## ๐ค **Contributing**
We welcome contributions! Areas for improvement:
- ๐ Bug fixes and performance improvements
- ๐ Documentation and examples
- โจ New features from roadmap
- ๐งช Additional test coverage
- ๐ Benchmarks and optimizations
## ๐ **Support**
- ๐ [Documentation](https://docs.rs/rust-rabbit)
- ๐ฌ [GitHub Discussions](https://github.com/nghiaphamln/rust-rabbit/discussions)
- ๐ [Issue Tracker](https://github.com/nghiaphamln/rust-rabbit/issues)
- ๐ง Email: nghiaphamln3@gmail.com
## ๐ **License**
MIT License - see [LICENSE](LICENSE) file for details.
## ๐ **Acknowledgments**
- Inspired by [MassTransit](https://masstransit-project.com/) for .NET
- Built on [lapin](https://github.com/amqp-rs/lapin) for AMQP protocol
- Powered by [Prometheus](https://prometheus.io/) for metrics
---
<div align="center">
**โญ Star us on GitHub if RustRabbit helps your project! โญ**
[GitHub](https://github.com/nghiaphamln/rust-rabbit) โข [Crates.io](https://crates.io/crates/rust-rabbit) โข [Docs.rs](https://docs.rs/rust-rabbit)
*Built with โค๏ธ for the Rust community*
</div>