# rust-rabbit π°
[](https://github.com/nghiaphamln/rust-rabbit/actions)
[](https://crates.io/crates/rust-rabbit)
[](https://docs.rs/rust-rabbit)
[](https://opensource.org/licenses/MIT)
A **simple, reliable** RabbitMQ client library for Rust. Easy to use with minimal configuration and flexible retry mechanisms.
## π **Key Features**
- **Simple API**: Just `Publisher` and `Consumer` with essential methods
- **Flexible Retry**: Exponential, linear, or custom retry mechanisms
- **Auto-Setup**: Automatic queue/exchange declaration and binding
- **Built-in Reliability**: Default ACK behavior with intelligent error handling
- **Zero Complexity**: No enterprise patterns, no metrics - just core messaging
## π¦ **Quick Start**
Add to your `Cargo.toml`:
```toml
[dependencies]
rust-rabbit = "1.1"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
## π― **Basic Usage**
### Publisher - Send Messages
```rust
use rust_rabbit::{Connection, Publisher, PublishOptions};
use serde::Serialize;
#[derive(Serialize)]
struct Order {
id: u32,
amount: f64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to RabbitMQ
let connection = Connection::new("amqp://localhost:5672").await?;
let publisher = Publisher::new(connection);
let order = Order { id: 123, amount: 99.99 };
// Publish to exchange (with routing)
publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
// Publish directly to queue (simple)
publisher.publish_to_queue("order_queue", &order, None).await?;
Ok(())
}
```
### Consumer - Receive Messages with Retry
```rust
use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
struct Order {
id: u32,
amount: f64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let connection = Connection::new("amqp://localhost:5672").await?;
let consumer = Consumer::builder(connection, "order_queue")
.with_retry(RetryConfig::exponential_default()) // 1s->2s->4s->8s->16s
.bind_to_exchange("orders", "new.order")
.with_prefetch(5)
.build();
consumer.consume(|msg: rust_rabbit::Message<Order>| async move {
println!("Processing order {}: ${}", msg.data.id, msg.data.amount);
// Your business logic here
if msg.data.amount > 1000.0 {
return Err("Amount too high".into()); // Will retry
}
Ok(()) // ACK message
}).await?;
Ok(())
}
```
## π **Retry Configurations**
### Built-in Retry Patterns
```rust
use rust_rabbit::RetryConfig;
use std::time::Duration;
// Exponential: 1s β 2s β 4s β 8s β 16s (5 retries)
let exponential = RetryConfig::exponential_default();
// Custom exponential: 2s β 4s β 8s β 16s β 32s (max 60s)
let custom_exp = RetryConfig::exponential(5, Duration::from_secs(2), Duration::from_secs(60));
// Linear: 10s β 10s β 10s (3 retries)
let linear = RetryConfig::linear(3, Duration::from_secs(10));
// Custom delays: 1s β 5s β 30s
let custom = RetryConfig::custom(vec![
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_secs(30),
]);
// No retries - fail immediately
let no_retry = RetryConfig::no_retry();
```
### How Retry Works
```rust
// Failed messages are automatically:
// 1. Sent to retry queue with delay (e.g., orders.retry.1)
// 2. After delay, returned to original queue for retry
// 3. If max retries exceeded, sent to dead letter queue (e.g., orders.dlq)
```
### Delay Strategies (TTL vs DelayedExchange)
rust-rabbit supports two strategies for implementing message delays:
#### 1. **TTL Strategy (Default)** - No Plugin Required
Uses RabbitMQ's TTL feature with temporary retry queues. Simple but less precise.
```rust
use rust_rabbit::{RetryConfig, DelayStrategy, Consumer, Connection};
use std::time::Duration;
let retry_config = RetryConfig::exponential_default()
.with_delay_strategy(DelayStrategy::TTL); // Default, no plugin needed
// Messages failed are sent to: orders.retry.1 (with TTL set)
// After TTL expires, automatically routed back to: orders
```
**Pros:**
- No external plugin required
- Works out-of-the-box with standard RabbitMQ
- Simple setup
**Cons:**
- Less precise timing (TTL granularity depends on RabbitMQ configuration)
- Creates many temporary queues (one per retry attempt)
- Requires cleanup of old retry queues
#### 2. **DelayedExchange Strategy** - Better Precision
Uses the `rabbitmq_delayed_message_exchange` plugin for server-side delay management. More reliable and cleaner architecture.
```rust
use rust_rabbit::{RetryConfig, DelayStrategy, Consumer, Connection};
use std::time::Duration;
let retry_config = RetryConfig::exponential_default()
.with_delay_strategy(DelayStrategy::DelayedExchange);
// Messages are published to: orders.delay (delay exchange)
// With x-delay header set to desired delay time
// Exchange automatically routes back to: orders after delay
```
**Setup Requirements:**
1. Install the plugin on RabbitMQ:
```bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
sudo systemctl restart rabbitmq-server
```
2. Use in code:
```rust
let retry_config = RetryConfig::linear(3, Duration::from_secs(5))
.with_delay_strategy(DelayStrategy::DelayedExchange);
let consumer = Consumer::builder(connection, "orders")
.with_retry(retry_config)
.build();
```
**β οΈ Important: Plugin is Required**
If you use `DelayStrategy::DelayedExchange` without installing the plugin on RabbitMQ:
- Your application will crash when trying to send messages to the delay exchange
- The delay exchange declaration will fail
- You'll get an error like: `NOT_FOUND - operation not permitted on this exchange`
**Always ensure the `rabbitmq_delayed_message_exchange` plugin is installed and enabled before deploying code that uses `DelayStrategy::DelayedExchange`.**
**Pros:**
- More precise timing (microsecond-level)
- Cleaner architecture (single delay exchange)
- Better for high-volume scenarios
- Lower memory footprint on RabbitMQ
- Built-in reliability
**Cons:**
- Requires external plugin installation
- Plugin adds complexity to RabbitMQ setup
- Small performance overhead for delay management
**Flow Diagram:**
```
DelayedExchange Strategy Flow:
ββββββββββββ
β Queue β (e.g., "orders")
ββββββ¬ββββββ
β
ββ> Handler fails βββ
β β
β Publish to
β Delay Exchange
β β
ββββββββββββ ββββββββΌβββββββ
β orders.delay β (x-delay: 5000ms)
ββββββββ¬ββββββββ
β
After delay
β
ββββββββΌβββββββ
β Queue β
β (orders) β
βββββββββββββββ
```
**Example:**
See [delayed_exchange_example.rs](examples/delayed_exchange_example.rs) for a complete working example.
## ποΈ **Dead Letter Queue (DLQ) with Auto-Cleanup**
For failed messages that exceed max retries, rust-rabbit automatically sends them to a Dead Letter Queue (DLQ). Now you can set automatic cleanup with TTL:
```rust
let retry_config = RetryConfig::exponential_default()
.with_dlq_ttl(Duration::from_secs(86400)); // Auto-cleanup after 1 day
let consumer = Consumer::builder(connection, "orders")
.with_retry(retry_config)
.build();
```
**Flow:**
```
Original Queue (orders)
β
Retries exhausted
β
Message β DLQ (orders.dlq) [TTL: 86400s]
β
After 1 day: Message auto-deleted by RabbitMQ
β
β No manual cleanup needed!
```
**Configuration Options:**
```rust
use std::time::Duration;
// 1 hour (fresh failed messages)
.with_dlq_ttl(Duration::from_secs(3600))
// 1 day (default retention)
.with_dlq_ttl(Duration::from_secs(86400))
// 1 week (long retention for analysis)
.with_dlq_ttl(Duration::from_secs(604800))
// No TTL (manual cleanup only - default)
// Don't call .with_dlq_ttl()
```
**Monitoring DLQ:**
1. Open RabbitMQ Management: http://localhost:15672
2. Go to "Queues" tab
3. Find "orders.dlq" queue
4. Check "x-message-ttl" in queue details
5. Monitor "Message count" to see failed messages
See [dlq_ttl_example.rs](examples/dlq_ttl_example.rs) for complete example.
## βοΈ **Advanced Features**
### MessageEnvelope System
For advanced retry tracking and error handling, use the MessageEnvelope system:
```rust
use rust_rabbit::{Connection, Publisher, Consumer, MessageEnvelope, RetryConfig};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone)]
struct Order {
id: u32,
amount: f64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let connection = Connection::new("amqp://localhost:5672").await?;
// Publisher with envelope
let publisher = Publisher::new(connection.clone());
let order = Order { id: 123, amount: 99.99 };
let envelope = MessageEnvelope::new(order, "order_queue")
.with_max_retries(3);
publisher.publish_envelope_to_queue("order_queue", &envelope, None).await?;
// Consumer with envelope processing
let consumer = Consumer::builder(connection, "order_queue")
.with_retry(RetryConfig::exponential_default())
.build();
consumer.consume_envelopes(|envelope: MessageEnvelope<Order>| async move {
println!("Processing order {} (attempt {})",
envelope.payload.id,
envelope.metadata.retry_attempt + 1);
// Access retry metadata
if !envelope.is_first_attempt() {
println!("This is a retry. Last error: {:?}", envelope.last_error());
}
Ok(())
}).await?;
Ok(())
}
```
### Connection Options
```rust
use rust_rabbit::Connection;
// Simple connection
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
// Different connection URLs
let local = Connection::new("amqp://localhost:5672").await?;
let remote = Connection::new("amqp://user:pass@remote-host:5672/vhost").await?;
let secure = Connection::new("amqps://user:pass@secure-host:5671").await?;
```
### Publisher Options
```rust
use rust_rabbit::PublishOptions;
let options = PublishOptions::new()
.mandatory() // Make delivery mandatory
.priority(5); // Message priority (0-255)
publisher.publish_to_queue("orders", &message, Some(options)).await?;
```
### Consumer Options
```rust
let consumer = Consumer::builder(connection, "order_queue")
.with_retry(RetryConfig::exponential_default())
.bind_to_exchange("order_exchange", "new.order") // Exchange binding with routing key
.with_prefetch(10) // Process 10 messages in parallel
.build();
```
## π **Documentation**
For detailed guides and advanced topics:
- **[Retry Configuration Guide](docs/retry-guide.md)** - Detailed retry patterns and configuration
- **[Exchange & Queue Management](docs/queues-exchanges.md)** - Queue binding, exchange types, and best practices
- **[Error Handling](docs/error-handling.md)** - Error types and handling strategies
- **[Best Practices](docs/best-practices.md)** - Production tips and patterns
## π οΈ **Examples**
See the [`examples/`](examples/) directory for complete working examples:
- **[Basic Publisher](examples/basic_publisher.rs)** - Simple message publishing
- **[Basic Consumer](examples/basic_consumer.rs)** - Simple message consumption
- **[Retry Examples](examples/retry_examples.rs)** - Different retry configurations
- **[Delayed Exchange Example](examples/delayed_exchange_example.rs)** - Using rabbitmq_delayed_message_exchange plugin
- **[DLQ TTL Example](examples/dlq_ttl_example.rs)** - Auto-cleanup Dead Letter Queue with TTL
- **[Production Setup](examples/production_setup.rs)** - Production-ready configuration
## π§ͺ **Testing**
Run the tests:
```bash
cargo test
```
For integration tests with real RabbitMQ:
```bash
# Start RabbitMQ with Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Run integration tests
cargo test --test integration
```
## π§ **Requirements**
- **Rust**: 1.70+
- **RabbitMQ**: 3.8+
- **Tokio**: Async runtime
## π¦ **Migration from v0.x**
If you're upgrading from the complex v0.x version:
```rust
// OLD (v0.x) - Complex
let consumer = Consumer::new(connection_manager, ConsumerOptions {
auto_ack: false,
retry_policy: Some(RetryPolicy::fast()),
prefetch_count: Some(10),
// ... many more options
}).await?;
// NEW (v1.0) - Simple
let consumer = Consumer::builder(connection, "queue")
.with_retry(RetryConfig::exponential_default())
.with_prefetch(10)
.build();
```
**Major Changes:**
- β
Simplified API with just `Publisher` and `Consumer`
- β
Removed enterprise patterns (saga, event sourcing, request-response)
- β
Removed metrics and health monitoring
- β
Unified retry system with flexible mechanisms
- β
Auto-declare queues and exchanges by default
## π― **Design Philosophy**
rust-rabbit v1.0 follows these principles:
1. **Simplicity First**: Only essential features, no bloat
2. **Reliability Built-in**: Automatic retry and error handling
3. **Easy Configuration**: Sensible defaults, minimal setup
4. **Production Ready**: Persistent messages, proper ACK handling
5. **Developer Friendly**: Clear errors, good documentation
## πΊοΈ **Roadmap**
See [ROADMAP.md](ROADMAP.md) for planned features:
- Connection pooling and load balancing
- Monitoring and metrics integration
- Advanced retry patterns and policies
- Performance optimizations
- Additional messaging patterns
## π **License**
MIT License - see [LICENSE](LICENSE) for details.
## π€ **Contributing**
Contributions welcome! Please read our [contributing guide](CONTRIBUTING.md) and submit pull requests.
## π¬ **Support**
- **Issues**: [GitHub Issues](https://github.com/nghiaphamln/rust-rabbit/issues)
- **Discussions**: [GitHub Discussions](https://github.com/nghiaphamln/rust-rabbit/discussions)
- **Documentation**: [docs.rs](https://docs.rs/rust-rabbit)
---
Made with β€οΈ by the rust-rabbit team