rust-rabbit 0.3.1

A high-performance, production-ready RabbitMQ client library for Rust with zero-configuration simplicity, advanced retry patterns, enterprise messaging patterns, and comprehensive observability. Features one-line setup, automatic retry policies, Request-Response/Saga/Event Sourcing patterns, Prometheus metrics, and circuit breaker resilience.
Documentation

RustRabbit ๐Ÿฐ

Rust Crates.io Documentation License: MIT

A high-performance, production-ready RabbitMQ client library for Rust with zero-configuration simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.

๐Ÿš€ Quick Start - One Line Magic!

use rust_rabbit::{
    config::RabbitConfig,
    connection::ConnectionManager,
    consumer::{Consumer, ConsumerOptions},
    retry::RetryPolicy,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Connection
    let config = RabbitConfig::builder()
        .connection_string("amqp://user:pass@localhost:5672/vhost")
        .build();
    let connection = ConnectionManager::new(config).await?;

    // 2. Consumer with retry (1 line!)
    let options = ConsumerOptions {
        auto_ack: false,
        retry_policy: Some(RetryPolicy::fast()),
        ..Default::default()
    };

    // 3. Start consuming
    let consumer = Consumer::new(connection, options).await?;
    
    consumer.consume("queue_name", |delivery| async move {
        match your_processing_logic(&delivery.data).await {
            Ok(_) => delivery.ack(Default::default()).await?,
            Err(e) if is_retryable(&e) => delivery.nack(Default::default()).await?,
            Err(_) => delivery.reject(Default::default()).await?,
        }
        Ok(())
    }).await?;

    Ok(())
}

What RetryPolicy::fast() creates automatically:

  • โœ… 5 retries: 200ms โ†’ 300ms โ†’ 450ms โ†’ 675ms โ†’ 1s (capped at 10s)
  • โœ… Dead Letter Queue: Automatic DLX/DLQ setup for failed messages
  • โœ… Backoff + Jitter: Intelligent delay with randomization
  • โœ… Production Ready: Optimal settings for most use cases

๐Ÿ“ฆ Installation

[dependencies]

rust-rabbit = "0.3.0"

tokio = { version = "1.0", features = ["full"] }

serde = { version = "1.0", features = ["derive"] }

๐Ÿ’ก Core Features

๐ŸŽฏ Smart Automation

  • One-Line Setup: RetryPolicy::fast() configures everything
  • Auto Infrastructure: Queues, exchanges, and bindings created automatically
  • Intelligent Defaults: Production-ready settings without configuration
  • Dead Letter Automation: Automatic failure recovery and monitoring

๐Ÿ”„ Advanced Retry System

  • Multiple Presets: Fast, slow, linear, and custom patterns
  • Exponential Backoff: Smart delay calculations with jitter
  • Delayed Message Exchange: RabbitMQ x-delayed-message integration
  • Dead Letter Integration: Seamless failure handling

๐Ÿ—๏ธ Enterprise Messaging Patterns

  • Request-Response: RPC-style messaging with correlation IDs and timeouts
  • Saga Pattern: Distributed transaction coordination with compensation actions
  • Event Sourcing: CQRS implementation with event store and aggregate management
  • Message Deduplication: Multiple strategies for duplicate message detection
  • Priority Queues: Configurable priority-based message processing

๐Ÿ” Production Observability

  • Prometheus Metrics: Comprehensive metrics for throughput, latency, errors
  • Health Monitoring: Real-time connection health with auto-recovery
  • Circuit Breaker: Automatic failure detection and graceful degradation
  • Structured Logging: Distributed tracing with correlation IDs

๐Ÿ›ก๏ธ Reliability & Performance

  • Connection Pooling: Automatic connection management with health monitoring
  • Graceful Shutdown: Multi-phase shutdown with signal handling
  • Error Recovery: Comprehensive error handling with recovery strategies
  • Type Safety: Strongly typed message handling with serde integration
  • Minutes Retry Preset: One-line setup for business-critical operations
  • Intelligent Defaults: Production-ready settings out of the box
  • Dead Letter Handling: Automatic failure recovery and monitoring

๐Ÿ”„ Intelligent Retry Patterns

// Quick presets for common scenarios
RetryPolicy::fast()               // 1s, 2s, 4s, 8s, 16s (transient failures)
RetryPolicy::slow()               // 10s, 20s, 40s, 80s, 160s (resource-heavy)
RetryPolicy::aggressive()         // 15 retries with exponential backoff
RetryPolicy::minutes_exponential() // 1min, 2min, 4min, 8min, 16min (business-critical)

// Custom builder
RetryPolicy::builder()
    .max_retries(5)
    .initial_delay(Duration::from_secs(30))
    .backoff_multiplier(1.5)
    .jitter(0.2)  // 20% randomization prevents thundering herd
    .dead_letter_exchange("failed.orders")
    .build()

๐Ÿ—๏ธ Enterprise Messaging Patterns (Phase 2 - NEW)

  • Request-Response: RPC-style messaging with correlation IDs and timeouts
  • Saga Pattern: Distributed transaction coordination with compensation
  • Event Sourcing: CQRS implementation with event store and snapshots
  • Message Deduplication: Multiple strategies for duplicate detection
  • Priority Queues: Configurable priority-based message processing

๐Ÿ” Production Observability

  • Prometheus Metrics: Throughput, latency, error rates, queue depths
  • Health Monitoring: Real-time connection health with auto-recovery
  • Circuit Breaker: Automatic failure detection and graceful degradation
  • Structured Logging: Distributed tracing with correlation IDs

๐Ÿ“‹ Usage Examples

โšก Retry Patterns & Setup

Quick Retry Presets

Preset Retries Initial Delay Max Delay Backoff Use Case
RetryPolicy::fast() 5 200ms 10s 1.5x API calls, DB queries
RetryPolicy::slow() 3 1s 1min 2.0x Heavy processing
RetryPolicy::linear(500ms, 3) 3 500ms 500ms 1.0x Fixed intervals

Consumer Setup Patterns

// ๐Ÿ”ฅ SUPER FAST - Copy/paste setup:
let config = RabbitConfig::builder()
    .connection_string("amqp://user:pass@localhost:5672/vhost")
    .build();
let connection = ConnectionManager::new(config).await?;
let options = ConsumerOptions {
    auto_ack: false,
    retry_policy: Some(RetryPolicy::fast()),
    prefetch_count: Some(10),
    ..Default::default()
};
let consumer = Consumer::new(connection, options).await?;

// โšก Custom with builder
let retry = RetryPolicy::builder()
    .fast_preset()          // Use preset as base
    .max_retries(3)         // Override retry count
    .build();

// ๐Ÿ›  Ultra custom
let retry = RetryPolicy::builder()
    .max_retries(5)
    .initial_delay(Duration::from_millis(100))
    .backoff_multiplier(2.0)
    .jitter(0.1)
    .dead_letter_exchange("my.dlx")
    .build();

Message Handling Pattern

consumer.consume("queue_name", |delivery| async move {
    match process_message(&delivery.data).await {
        Ok(_) => {
            // โœ… Success -> ACK
            delivery.ack(Default::default()).await?;
        }
        Err(e) if should_retry(&e) => {
            // โš ๏ธ Retryable error -> NACK (will retry)
            delivery.nack(Default::default()).await?;
        }
        Err(_) => {
            // โŒ Fatal error -> REJECT (send to DLQ)
            delivery.reject(Default::default()).await?;
        }
    }
    Ok(())
}).await?;

fn should_retry(error: &MyError) -> bool {
    match error {
        MyError::NetworkTimeout => true,     // Retry
        MyError::ApiRateLimit => true,       // Retry
        MyError::TempUnavailable => true,    // Retry
        MyError::InvalidData => false,       // Don't retry
        MyError::Unauthorized => false,      // Don't retry
    }
}

๏ฟฝ Consumer Retry Configuration & Message Handling

๐Ÿ“‹ Consumer Setup with Retry Policy

use rust_rabbit::{
    consumer::{Consumer, ConsumerOptions, MessageHandler, MessageResult, MessageContext},
    retry::RetryPolicy,
    connection::ConnectionManager,
    config::RabbitConfig,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

// 1. Define your message type
#[derive(Debug, Deserialize, Serialize)]
struct OrderMessage {
    order_id: String,
    customer_id: String,
    total: f64,
}

// 2. Create your message handler
struct OrderHandler;

#[async_trait]
impl MessageHandler<OrderMessage> for OrderHandler {
    async fn handle(&self, message: OrderMessage, context: MessageContext) -> MessageResult {
        info!("Processing order: {}", message.order_id);
        
        match process_order(&message).await {
            Ok(_) => {
                info!("โœ… Order {} processed successfully", message.order_id);
                MessageResult::Ack  // Success -> Acknowledge message
            }
            Err(e) => {
                match classify_error(&e) {
                    ErrorType::Retryable => {
                        warn!("โš ๏ธ Retryable error for order {}: {}", message.order_id, e);
                        MessageResult::Retry  // Will trigger retry mechanism
                    }
                    ErrorType::Fatal => {
                        error!("โŒ Fatal error for order {}: {}", message.order_id, e);
                        MessageResult::Reject  // Send to dead letter queue
                    }
                }
            }
        }
    }
}

// 3. Setup consumer with retry policy
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = RabbitConfig::builder()
        .connection_string("amqp://admin:password@localhost:5672/")
        .build();
    
    let connection_manager = ConnectionManager::new(config).await?;
    
    // Configure retry policy
    let retry_policy = RetryPolicy::builder()
        .max_retries(3)                              // Max 3 retry attempts
        .initial_delay(Duration::from_millis(500))   // Start with 500ms delay
        .max_delay(Duration::from_secs(30))          // Cap at 30 seconds
        .backoff_multiplier(2.0)                     // Double delay each retry
        .jitter(0.1)                                 // Add 10% randomization
        .dead_letter_exchange("orders.dlx".to_string())  // Failed messages go here
        .dead_letter_queue("orders.dlq".to_string())     // Dead letter queue
        .build();
    
    // Setup consumer with retry configuration
    let consumer_options = ConsumerOptions::builder("orders.processing")
        .auto_declare_queue()        // Create queue if not exists
        .auto_declare_exchange()     // Create exchange if not exists  
        .retry_policy(retry_policy)  // Enable retry mechanism
        .prefetch_count(10)          // Process up to 10 messages concurrently
        .build();
    
    let consumer = Consumer::new(connection_manager, consumer_options).await?;
    let handler = std::sync::Arc::new(OrderHandler);
    
    // Start consuming with retry support
    consumer.consume(handler).await?;
    
    Ok(())
}

๐ŸŽฏ MessageResult Types - Action Guide

MessageResult Action Description Use Case
MessageResult::Ack โœ… Acknowledge Message processed successfully Successful processing
MessageResult::Retry ๐Ÿ”„ Retry Retry message with delay Temporary failures (network, rate limits)
MessageResult::Reject โŒ Dead Letter Send to DLQ, don't retry Permanent failures (invalid data)
MessageResult::Requeue ๐Ÿ”ƒ Requeue Put back in queue immediately Temporary resource unavailable

โš ๏ธ Error Classification Strategy

#[derive(Debug)]
enum ErrorType {
    Retryable,  // Can retry
    Fatal,      // Don't retry
}

fn classify_error(error: &ProcessingError) -> ErrorType {
    match error {
        // ๐Ÿ”„ RETRYABLE - Temporary issues
        ProcessingError::NetworkTimeout => ErrorType::Retryable,
        ProcessingError::ApiRateLimit => ErrorType::Retryable,
        ProcessingError::DatabaseConnectionLost => ErrorType::Retryable,
        ProcessingError::TemporaryUnavailable => ErrorType::Retryable,
        ProcessingError::ConcurrencyLimit => ErrorType::Retryable,
        
        // โŒ FATAL - Permanent issues
        ProcessingError::InvalidData => ErrorType::Fatal,
        ProcessingError::AuthenticationFailed => ErrorType::Fatal,
        ProcessingError::PermissionDenied => ErrorType::Fatal,
        ProcessingError::RecordNotFound => ErrorType::Fatal,
        ProcessingError::ValidationFailed(_) => ErrorType::Fatal,
    }
}

async fn process_order(message: &OrderMessage) -> Result<(), ProcessingError> {
    // Your business logic here
    validate_order(message)?;      // Can throw ValidationFailed (Fatal)
    call_payment_api(message).await?;  // Can throw NetworkTimeout (Retryable)  
    update_inventory(message).await?;  // Can throw DatabaseConnectionLost (Retryable)
    send_confirmation(message).await?; // Can throw ApiRateLimit (Retryable)
    
    Ok(())
}

๐Ÿ› ๏ธ Advanced Handler Patterns

Pattern 1: Conditional Retry with Context

#[async_trait]
impl MessageHandler<OrderMessage> for OrderHandler {
    async fn handle(&self, message: OrderMessage, context: MessageContext) -> MessageResult {
        // Check retry count from context
        let retry_count = context.headers
            .get("x-retry-count")
            .and_then(|v| v.as_long_int())
            .unwrap_or(0);
            
        if retry_count >= 2 {
            warn!("Order {} failed after {} retries, sending to manual review", 
                  message.order_id, retry_count);
            return MessageResult::Reject;  // Send to DLQ for manual review
        }
        
        match process_order(&message).await {
            Ok(_) => MessageResult::Ack,
            Err(e) if is_worth_retrying(&e, retry_count) => MessageResult::Retry,
            Err(_) => MessageResult::Reject,
        }
    }
}

Pattern 2: Circuit Breaker with Handler

use std::sync::atomic::{AtomicU32, Ordering};

struct OrderHandlerWithCircuitBreaker {
    failure_count: AtomicU32,
    circuit_open: AtomicBool,
}

#[async_trait]
impl MessageHandler<OrderMessage> for OrderHandlerWithCircuitBreaker {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        // Check circuit breaker
        if self.circuit_open.load(Ordering::Relaxed) {
            warn!("Circuit breaker open, rejecting message");
            return MessageResult::Requeue;  // Try again later
        }
        
        match process_order(&message).await {
            Ok(_) => {
                self.failure_count.store(0, Ordering::Relaxed);  // Reset on success
                MessageResult::Ack
            }
            Err(e) => {
                let failures = self.failure_count.fetch_add(1, Ordering::Relaxed);
                if failures >= 5 {
                    self.circuit_open.store(true, Ordering::Relaxed);
                    warn!("Circuit breaker triggered after {} failures", failures);
                }
                
                if is_retryable(&e) {
                    MessageResult::Retry
                } else {
                    MessageResult::Reject
                }
            }
        }
    }
}

๐Ÿ“Š Retry Policy Examples by Use Case

Fast API Calls (Network hiccups)

let api_retry = RetryPolicy::builder()
    .max_retries(5)
    .initial_delay(Duration::from_millis(100))
    .max_delay(Duration::from_secs(5))
    .backoff_multiplier(1.5)
    .jitter(0.2)
    .build();

Database Operations (Connection issues)

let db_retry = RetryPolicy::builder()
    .max_retries(3)
    .initial_delay(Duration::from_millis(500))
    .max_delay(Duration::from_secs(30))
    .backoff_multiplier(2.0)
    .jitter(0.1)
    .build();

Business Critical (Financial transactions)

let critical_retry = RetryPolicy::builder()
    .max_retries(3)
    .initial_delay(Duration::from_secs(30))   // Wait longer initially
    .max_delay(Duration::from_secs(300))      // Max 5 minutes
    .backoff_multiplier(1.2)                  // Gentle backoff
    .jitter(0.0)                              // No randomization for predictability
    .dead_letter_exchange("financial.dlx".to_string())
    .dead_letter_queue("financial.manual.review".to_string())
    .build();

๐Ÿ” Debugging Tips

Log Retry Information

#[async_trait]
impl MessageHandler<OrderMessage> for OrderHandler {
    async fn handle(&self, message: OrderMessage, context: MessageContext) -> MessageResult {
        let retry_count = context.get_retry_count();
        let original_timestamp = context.get_original_timestamp();
        
        info!("Processing order {} (retry: {}, age: {:?})", 
              message.order_id, retry_count, 
              Utc::now() - original_timestamp);
        
        // Your processing logic...
        MessageResult::Ack
    }
}

Monitor Retry Queue

# Check retry exchange bindings

sudo rabbitmqctl list_bindings | grep retry


# Monitor message flow

sudo rabbitmqctl list_queues name messages


# Check dead letter queue

sudo rabbitmqctl list_queues | grep dlq

๐Ÿ› Prefetch Count Debugging

Common Issue: prefetch_count not working?

โŒ WRONG (prefetch_count ignored):

ConsumerOptions {
    auto_ack: true,         // โ† Wrong! Messages are ACKed immediately
    prefetch_count: Some(5), // โ†’ No effect
    ..Default::default()
}

๏ฟฝ๐Ÿ› Prefetch Count Debugging

Common Issue: prefetch_count not working?

โŒ WRONG (prefetch_count ignored):

ConsumerOptions {
    auto_ack: true,         // โ† Wrong! Messages are ACKed immediately
    prefetch_count: Some(5), // โ†’ No effect
    ..Default::default()
}

โœ… CORRECT (prefetch_count works):

ConsumerOptions {
    auto_ack: false,        // โ† Correct! Messages must be ACKed manually
    prefetch_count: Some(5), // โ†’ Limit 5 unACKed messages
    ..Default::default()
}

Why?

  • prefetch_count only works when there are "unACKed" messages
  • auto_ack: true โ†’ Messages ACKed immediately โ†’ No backpressure
  • auto_ack: false โ†’ Messages wait for manual ACK โ†’ QoS limits work

Debug Commands

# Compile check

cargo check


# Run with RabbitMQ

cargo run --example fast_consumer_template


# Test prefetch behavior

cargo run --example prefetch_verification

Smart Publisher with Auto-Declare

use rust_rabbit::publisher::{Publisher, PublishOptions};

let publisher = rabbit.publisher();

// Auto-declare exchange when publishing
let options = PublishOptions::builder()
    .auto_declare_exchange()
    .durable()
    .build();

publisher.publish_to_exchange(
    "orders.processing",  // exchange (auto-created)
    "orders.processing",  // routing key  
    &order_message,
    Some(options)
).await?;

Advanced Retry Configuration

// Business-critical with custom settings
let custom_retry = RetryPolicy::builder()
    .max_retries(3)
    .initial_delay(Duration::from_secs(30))
    .backoff_multiplier(1.5)
    .jitter(0.2)  // 20% randomization
    .dead_letter_exchange("failed.orders.dlx")
    .dead_letter_queue("failed.orders.dlq")
    .build();

// Use with consumer
let options = ConsumerOptions::builder("orders.processing")
    .auto_declare_queue()
    .auto_declare_exchange()
    .retry_policy(custom_retry)
    .prefetch_count(1)    // Reliable processing
    .manual_ack()         // Explicit acknowledgment
    .build();

๐Ÿ—๏ธ Advanced Patterns (Phase 2 - NEW)

Request-Response (RPC)

use rust_rabbit::patterns::request_response::*;

// Server side
let server = RequestResponseServer::new(rabbit.clone(), "calc_queue".to_string());
server.handle_requests(|req: CalculateRequest| async move {
    Ok(CalculateResponse { result: req.x + req.y })
}).await?;

// Client side
let client = RequestResponseClient::new(rabbit, "calc_queue".to_string());
let response: CalculateResponse = client
    .send_request(&CalculateRequest { x: 5, y: 3 })
    .with_timeout(Duration::from_secs(30))
    .await?;

Saga Pattern (Distributed Transactions)

use rust_rabbit::patterns::saga::*;

// Define compensation logic
async fn reserve_inventory(order_id: &str) -> Result<(), Box<dyn std::error::Error>> {
    println!("โœ… Reserved inventory for order {}", order_id);
    Ok(())
}

async fn compensate_inventory(order_id: &str) -> Result<(), Box<dyn std::error::Error>> {
    println!("๐Ÿ”„ Released inventory for order {}", order_id);
    Ok(())
}

// Execute saga
let mut coordinator = SagaCoordinator::new(rabbit);
let mut saga = SagaInstance::new("order-saga-123".to_string());

saga.add_step(
    "reserve_inventory",
    |data| Box::pin(reserve_inventory(&data)),
    |data| Box::pin(compensate_inventory(&data))

);

match coordinator.execute_saga(saga, "order-456".to_string()).await {
    Ok(_) => println!("โœ… Saga completed successfully"),
    Err(e) => println!("โŒ Saga failed, compensation completed: {}", e),
}

Event Sourcing (CQRS)

use rust_rabbit::patterns::event_sourcing::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct BankAccount {
    id: AggregateId,
    sequence: EventSequence,
    balance: f64,
}

impl AggregateRoot for BankAccount {
    fn apply_event(&mut self, event: &DomainEvent) -> Result<()> {
        match event.event_type.as_str() {
            "MoneyDeposited" => {
                let amount: f64 = serde_json::from_slice(&event.event_data)?;
                self.balance += amount;
            }
            "MoneyWithdrawn" => {
                let amount: f64 = serde_json::from_slice(&event.event_data)?;
                self.balance -= amount;
            }
            _ => {}
        }
        Ok(())
    }
}

// Usage
let event_store = Arc::new(InMemoryEventStore::new());
let repository = EventSourcingRepository::<BankAccount>::new(event_store);

let mut account = BankAccount::new(AggregateId::new());
account.deposit(100.0)?; // Generates MoneyDeposited event
repository.save(&mut account).await?;

๐Ÿ” Production Features

Health Monitoring

use rust_rabbit::health::HealthChecker;

let health_checker = HealthChecker::new(connection_manager.clone());

match health_checker.check_health().await {
    Ok(status) => println!("Connection healthy: {:?}", status),
    Err(e) => println!("Connection issues: {}", e),
}

use rust_rabbit::metrics::PrometheusMetrics;

let metrics = PrometheusMetrics::new();

// Metrics are automatically collected: // - rust_rabbit_messages_published_total // - rust_rabbit_messages_consumed_total
// - rust_rabbit_message_processing_duration_seconds // - rust_rabbit_connection_health // - rust_rabbit_queue_depth

// Expose metrics endpoint warp::serve(metrics.metrics_handler()) .run(([0, 0, 0, 0], 9090)) .await;


## ๐Ÿ“ **Examples**

The library includes comprehensive examples for all features:

### **Core Examples**
- **`consumer_example.rs`** - Basic consumer setup and message handling
- **`publisher_example.rs`** - Publishing messages with different options
- **`fast_consumer_template.rs`** - Production-ready consumer template

### **Retry & Error Handling**
- **`quick_retry_consumer.rs`** - All retry preset demonstrations
- **`retry_example.rs`** - Custom retry logic implementation
- **`retry_policy_presets.rs`** - Retry policy patterns

### **Advanced Patterns**
- **`event_sourcing_example.rs`** - CQRS and event sourcing implementation
- **`saga_example.rs`** - Distributed transaction coordination
- **`phase2_patterns_example.rs`** - Enterprise messaging patterns

### **Production Features**
- **`health_monitoring_example.rs`** - Health checks and monitoring
- **`metrics_example.rs`** - Prometheus metrics integration
- **`comprehensive_demo.rs`** - Full-featured production example

### **Integration**
- **`actix_web_api_example.rs`** - Web API integration
- **`builder_pattern_example.rs`** - Configuration patterns

Run any example:
```bash
cargo run --example consumer_example
cargo run --example fast_consumer_template

๐Ÿงช Testing

Integration Testing with Docker

RustRabbit supports real RabbitMQ integration testing:

# Start RabbitMQ with required plugins

docker-compose -f docker-compose.test.yml up -d


# Run integration tests  

cargo test --test integration_example -- --test-threads=1


# Or use make

make test-integration

Unit Tests

# Run unit tests

cargo test


# Run with coverage

cargo test --all-features


# Test specific modules

cargo test consumer::tests

cargo test retry::tests

๐Ÿšจ Common Mistakes & Best Practices

โŒ Common Mistakes

  1. Wrong prefetch_count setup:

    auto_ack: true,  // โ† Wrong! prefetch_count won't work
    
  2. Not handling retry errors:

    // โŒ Not distinguishing retryable vs non-retryable errors
    Err(_) => delivery.nack(Default::default()).await?,
    
  3. Missing manual ACK:

    // โŒ Forgot to ACK message
    // Message will be stuck in unACK'd state
    

โœ… Best Practices

  1. Always use auto_ack: false for production:

    ConsumerOptions {
        auto_ack: false,  // Required for retry and backpressure
        retry_policy: Some(RetryPolicy::fast()),
        ..Default::default()
    }
    
  2. Implement smart retry logic:

    match error {
        ApiError::Timeout => delivery.nack(Default::default()).await?, // Retry
        ApiError::Unauthorized => delivery.reject(Default::default()).await?, // DLQ
    }
    
  3. Set appropriate prefetch_count:

    prefetch_count: Some(cpu_cores * 2), // For I/O bound
    prefetch_count: Some(cpu_cores),     // For CPU bound
    prefetch_count: Some(1),             // For memory-heavy tasks
    
  4. Always handle graceful shutdown:

    tokio::select! {
        _ = consumer.consume("queue", handler) => {}
        _ = tokio::signal::ctrl_c() => {
            println!("Shutting down gracefully...");
        }
    }
    

๐Ÿ”ง Configuration

Connection Configuration

let config = RabbitConfig::builder()
    .connection_string("amqp://user:pass@localhost:5672/vhost")
    .connection_timeout(Duration::from_secs(30))
    .heartbeat(Duration::from_secs(10))
    .retry_config(RetryConfig::default())
    .health_check_interval(Duration::from_secs(30))
    .pool_config(PoolConfig::new(5, 10)) // min 5, max 10 connections
    .build();

Consumer Configuration

ConsumerOptions {
    auto_ack: false,                        // Manual ACK for reliability
    prefetch_count: Some(10),               // QoS prefetch limit
    retry_policy: Some(RetryPolicy::fast()), // Retry configuration
    concurrent_limit: Some(50),             // Max concurrent messages
    ..Default::default()
}

๐Ÿš€ Performance Tips

  • prefetch_count: Set to CPU cores ร— 2-5 for I/O bound tasks
  • concurrent_limit: 50-200 for I/O bound, fewer for CPU bound
  • Connection pooling: Use 5-10 connections per application
  • DLQ monitoring: Always monitor dead letter queues
  • Metrics: Enable Prometheus for production monitoring

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guide for details.

  1. Fork the repository
  2. Create a feature branch: git checkout -b my-feature
  3. Make changes and add tests
  4. Run tests: cargo test
  5. Submit a pull request

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ”— Links


Made with โค๏ธ for the Rust community use rust_rabbit::metrics::RustRabbitMetrics;

let metrics = RustRabbitMetrics::new()?; let rabbit = RustRabbit::with_metrics(config, metrics.clone()).await?;

// Automatic metrics collection: // - Message throughput (published/consumed per second) // - Processing latency (P50, P90, P99) // - Error rates (failed messages, connection errors) // - Queue depths (pending messages) // - Connection health (active, reconnections)


### **Circuit Breaker**

```rust
use rust_rabbit::circuit_breaker::CircuitBreakerConfig;

let config = RabbitConfig::builder()
    .connection_string("amqp://localhost:5672")
    .circuit_breaker(CircuitBreakerConfig {
        failure_threshold: 5,
        failure_window: Duration::from_secs(60),
        recovery_timeout: Duration::from_secs(30),
        success_threshold: 3,
        half_open_max_requests: 5,
    })
    .build();

// Circuit breaker automatically handles connection failures

๐Ÿ“Š Performance

RustRabbit v0.3.0 Benchmarks:

Metric Value Improvement vs v0.2.0
Throughput 75,000+ msgs/sec +50%
Latency (P99) < 8ms -20%
Memory Usage < 45MB baseline -10%
Connection Pool 10-100 connections Stable

Advanced Pattern Performance:

Pattern Throughput Memory Overhead Best Use Case
Request-Response 25,000 req/sec +5MB RPC, API calls
Saga 10,000 flows/sec +8MB Distributed transactions
Event Sourcing 50,000 events/sec +15MB CQRS, audit trails
Priority Queue 60,000 msgs/sec +2MB Task prioritization

Benchmarks: Intel i7-10700K, 32GB RAM, RabbitMQ 3.12

๐Ÿ› ๏ธ Configuration

Builder Pattern Configuration

use rust_rabbit::{RabbitConfig, consumer::ConsumerOptions};

// Environment-specific configs
let prod_config = RabbitConfig::builder()
    .connection_string("amqp://prod-server:5672")
    .connection_timeout(Duration::from_secs(30))
    .retry(|retry| retry.aggressive())
    .health(|health| health.frequent())
    .pool(|pool| pool.high_throughput())
    .build();

// Consumer configurations
let reliable_options = ConsumerOptions::builder("critical-orders")
    .consumer_tag("critical-processor")
    .minutes_retry()      // Auto-configure for reliability
    .prefetch_count(1)    // Process one at a time
    .build();

let high_throughput_options = ConsumerOptions::builder("bulk-orders")
    .consumer_tag("bulk-processor")
    .high_throughput()    // Optimize for speed
    .auto_declare_queue()
    .build();

๐Ÿงช Testing

RustRabbit includes comprehensive test coverage:

# Unit tests (58 tests)

cargo test --lib


# Integration tests with real RabbitMQ

docker-compose -f docker-compose.test.yml up -d

cargo test --test integration_example -- --test-threads=1


# Examples compilation

cargo check --examples


# Performance benchmarks

cargo bench

Test Coverage:

  • โœ… End-to-end message flows
  • โœ… Retry mechanisms with delayed exchange
  • โœ… Health monitoring and recovery
  • โœ… All advanced patterns (Phase 2)
  • โœ… Concurrent processing scenarios
  • โœ… Error handling and edge cases

๐Ÿ“š Examples

Comprehensive examples in the examples/ directory:

# Core features

cargo run --example minutes_retry_preset        # NEW: One-line retry setup

cargo run --example simple_auto_consumer_example

cargo run --example retry_policy_demo


# Advanced patterns (Phase 2)

cargo run --example phase2_patterns_example     # Comprehensive demo

cargo run --example saga_example               # E-commerce workflow

cargo run --example event_sourcing_example     # Bank account CQRS


# Comparison examples

cargo run --example before_vs_after_setup      # Shows complexity reduction

โœ… Retry Best Practices & Common Pitfalls

๐Ÿš€ Production-Ready Patterns

1. Idempotent Message Processing

use std::collections::HashSet;

struct IdempotentOrderHandler {
    processed_ids: Arc<Mutex<HashSet<String>>>,
}

#[async_trait]
impl MessageHandler<OrderMessage> for IdempotentOrderHandler {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        // Check if already processed
        {
            let processed = self.processed_ids.lock().await;
            if processed.contains(&message.order_id) {
                warn!("Order {} already processed, skipping", message.order_id);
                return MessageResult::Ack;  // Don't process again
            }
        }
        
        match process_order(&message).await {
            Ok(_) => {
                // Mark as processed
                self.processed_ids.lock().await.insert(message.order_id.clone());
                MessageResult::Ack
            }
            Err(e) => {
                if is_retryable(&e) {
                    MessageResult::Retry
                } else {
                    MessageResult::Reject
                }
            }
        }
    }
}

2. Graceful Shutdown with Retry

use tokio::signal;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let shutdown_flag = Arc::new(AtomicBool::new(false));
    let shutdown_flag_clone = shutdown_flag.clone();
    
    // Setup graceful shutdown
    tokio::spawn(async move {
        signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
        info!("Shutdown signal received, finishing current messages...");
        shutdown_flag_clone.store(true, Ordering::Relaxed);
    });
    
    let handler = GracefulOrderHandler { shutdown_flag };
    let consumer = Consumer::new(connection_manager, consumer_options).await?;
    
    consumer.consume(handler).await?;
    Ok(())
}

struct GracefulOrderHandler {
    shutdown_flag: Arc<AtomicBool>,
}

#[async_trait]
impl MessageHandler<OrderMessage> for GracefulOrderHandler {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        // Check shutdown flag
        if self.shutdown_flag.load(Ordering::Relaxed) {
            warn!("Shutdown in progress, requeuing message {}", message.order_id);
            return MessageResult::Requeue;  // Let another instance handle it
        }
        
        // Normal processing
        match process_order(&message).await {
            Ok(_) => MessageResult::Ack,
            Err(e) if is_retryable(&e) => MessageResult::Retry,
            Err(_) => MessageResult::Reject,
        }
    }
}

โš ๏ธ Common Pitfalls to Avoid

โŒ Pitfall 1: Auto-ACK with Retry Policy

// ๐Ÿšซ WRONG - Retry won't work!
let options = ConsumerOptions::builder("orders")
    .auto_ack(true)           // โ† This breaks retry mechanism
    .retry_policy(retry)      // โ† Will be ignored!
    .build();

// โœ… CORRECT - Manual ACK enables retry
let options = ConsumerOptions::builder("orders")
    .auto_ack(false)          // โ† Must be false for retry to work
    .retry_policy(retry)      // โ† Now retry works properly
    .build();

โŒ Pitfall 2: Poison Messages with Infinite Retry

// ๐Ÿšซ WRONG - Can create poison messages
#[async_trait]
impl MessageHandler<OrderMessage> for BadHandler {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        match serde_json::from_str::<OrderMessage>(&message.data) {
            Ok(order) => process_order(order).await,
            Err(_) => MessageResult::Retry,  // โ† Will retry forever on invalid JSON!
        }
    }
}

// โœ… CORRECT - Classify errors properly
#[async_trait]
impl MessageHandler<OrderMessage> for GoodHandler {
    async fn handle(&self, message: OrderMessage, context: MessageContext) -> MessageResult {
        let retry_count = context.get_retry_count();
        
        match serde_json::from_str::<OrderMessage>(&message.data) {
            Ok(order) => {
                match process_order(order).await {
                    Ok(_) => MessageResult::Ack,
                    Err(e) if is_retryable(&e) => MessageResult::Retry,
                    Err(_) => MessageResult::Reject,
                }
            }
            Err(_) => {
                error!("Invalid JSON format, sending to DLQ");
                MessageResult::Reject  // โ† Don't retry parse errors
            }
        }
    }
}

โŒ Pitfall 3: Blocking Operations in Handler

// ๐Ÿšซ WRONG - Blocks event loop
#[async_trait]
impl MessageHandler<OrderMessage> for BlockingHandler {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        // This blocks the async runtime!
        std::thread::sleep(Duration::from_secs(10));
        MessageResult::Ack
    }
}

// โœ… CORRECT - Use async operations
#[async_trait]  
impl MessageHandler<OrderMessage> for AsyncHandler {
    async fn handle(&self, message: OrderMessage, _context: MessageContext) -> MessageResult {
        // Non-blocking async delay
        tokio::time::sleep(Duration::from_secs(10)).await;
        MessageResult::Ack
    }
}

โŒ Pitfall 4: Missing Dead Letter Queue

// ๐Ÿšซ WRONG - Failed messages lost forever
let retry = RetryPolicy::builder()
    .max_retries(3)
    .initial_delay(Duration::from_millis(500))
    // Missing DLQ configuration!
    .build();

// โœ… CORRECT - Always configure DLQ
let retry = RetryPolicy::builder()
    .max_retries(3)
    .initial_delay(Duration::from_millis(500))
    .dead_letter_exchange("orders.dlx".to_string())
    .dead_letter_queue("orders.dlq".to_string())  // โ† Failed messages go here
    .build();

๐Ÿ“Š Performance Tuning Tips

Optimize Prefetch Count

// High throughput, low latency messages
let options = ConsumerOptions::builder("fast_queue")
    .prefetch_count(100)    // Batch more messages
    .build();

// Heavy processing, reliable delivery
let options = ConsumerOptions::builder("heavy_queue")
    .prefetch_count(1)      // Process one at a time
    .build();
    
// Balanced approach
let options = ConsumerOptions::builder("balanced_queue")
    .prefetch_count(10)     // Good default for most cases
    .build();

Monitor Retry Metrics

use rust_rabbit::metrics::RustRabbitMetrics;

let metrics = RustRabbitMetrics::new();

// Monitor in your handler
#[async_trait]
impl MessageHandler<OrderMessage> for MetricsHandler {
    async fn handle(&self, message: OrderMessage, context: MessageContext) -> MessageResult {
        let start = Instant::now();
        let retry_count = context.get_retry_count();
        
        // Track retry counts
        metrics.retry_attempts.with_label_values(&[&retry_count.to_string()]).inc();
        
        let result = match process_order(&message).await {
            Ok(_) => {
                metrics.messages_processed_success.inc();
                MessageResult::Ack
            }
            Err(e) if is_retryable(&e) => {
                metrics.messages_retry.inc();
                MessageResult::Retry
            }
            Err(_) => {
                metrics.messages_dead_letter.inc();
                MessageResult::Reject
            }
        };
        
        // Track processing time
        metrics.processing_duration
            .observe(start.elapsed().as_secs_f64());
            
        result
    }
}

๐Ÿ”ง Debugging Checklist

When Retry Doesn't Work:

  1. โœ… Check auto_ack: false in ConsumerOptions
  2. โœ… Verify retry policy is configured
  3. โœ… Ensure handler returns MessageResult::Retry
  4. โœ… Check RabbitMQ has x-delayed-message plugin
  5. โœ… Verify exchange bindings exist

When Messages Disappear:

  1. โœ… Check dead letter exchange/queue configuration
  2. โœ… Monitor DLQ for rejected messages
  3. โœ… Verify queue declarations (durable, auto-delete)
  4. โœ… Check network connectivity
  5. โœ… Review consumer exceptions

Performance Issues:

  1. โœ… Tune prefetch_count based on message size/processing time
  2. โœ… Monitor connection pool usage
  3. โœ… Check for blocking operations in handlers
  4. โœ… Review retry delays (too aggressive?)
  5. โœ… Monitor queue depth and consumer lag

๐Ÿ—บ๏ธ Roadmap

โœ… Phase 1 (v0.2.0) - COMPLETED

  • Prometheus metrics integration
  • Circuit breaker pattern
  • Health monitoring
  • Connection pooling

โœ… Phase 2 (v0.3.0) - COMPLETED

  • Request-Response pattern
  • Saga pattern for distributed transactions
  • Event sourcing with CQRS
  • Message deduplication
  • Priority queues
  • Minutes retry preset - Zero-config production setup

๐Ÿ”ฎ Phase 3 (v0.4.0) - Enterprise

  • Multi-broker support with failover
  • Message encryption at rest
  • Schema registry integration
  • Advanced routing patterns
  • Performance optimizations

๐Ÿค Contributing

We welcome contributions! Areas for improvement:

  • ๐Ÿ› Bug fixes and performance improvements
  • ๐Ÿ“š Documentation and examples
  • โœจ New features from roadmap
  • ๐Ÿงช Additional test coverage
  • ๐Ÿ“Š Benchmarks and optimizations

๐Ÿ†˜ Support

๐Ÿ“„ License

MIT License - see LICENSE file for details.

๐Ÿ™ Acknowledgments


โญ Star us on GitHub if RustRabbit helps your project! โญ

GitHub โ€ข Crates.io โ€ข Docs.rs

Built with โค๏ธ for the Rust community