rust-rabbit 1.2.0

A simple, reliable RabbitMQ client library for Rust. Easy to use with flexible retry mechanisms and minimal configuration.
Documentation

rust-rabbit 🐰

Rust Crates.io Documentation License: MIT

A simple, reliable RabbitMQ client library for Rust. Easy to use with minimal configuration and flexible retry mechanisms.

πŸš€ Key Features

  • Simple API: Just Publisher and Consumer with essential methods
  • Flexible Retry: Exponential, linear, or custom retry mechanisms
  • Auto-Setup: Automatic queue/exchange declaration and binding
  • Built-in Reliability: Default ACK behavior with intelligent error handling
  • Zero Complexity: No enterprise patterns, no metrics - just core messaging

πŸ“¦ Quick Start

Add to your Cargo.toml:

[dependencies]
rust-rabbit = "1.2"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

🎯 Basic Usage

Publisher - Send Messages

use rust_rabbit::{Connection, Publisher, PublishOptions};
use serde::Serialize;

#[derive(Serialize)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to RabbitMQ
    let connection = Connection::new("amqp://localhost:5672").await?;
    let publisher = Publisher::new(connection);
    
    let order = Order { id: 123, amount: 99.99 };
    
    // Publish to exchange (with routing)
    publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
    
    // Publish directly to queue (simple)
    publisher.publish_to_queue("order_queue", &order, None).await?;
    
    Ok(())
}

Consumer - Receive Messages with Retry

use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
    
    let consumer = Consumer::builder(connection, "order_queue")
        .with_retry(RetryConfig::exponential_default()) // 1s->2s->4s->8s->16s
        .bind_to_exchange("orders", "new.order")
        .with_prefetch(5)
        .build();
    
    // Handler receives just the payload type (no wrapper)
    consumer.consume(|msg: Order| async move {
        println!("Processing order {}: ${}", msg.id, msg.amount);
        
        // Your business logic here
        if msg.amount > 1000.0 {
            return Err("Amount too high".into()); // Will retry
        }
        
        Ok(()) // ACK message
    }).await?;
    
    Ok(())
}

πŸ”„ Retry Configurations

Built-in Retry Patterns

use rust_rabbit::RetryConfig;
use std::time::Duration;

// Exponential: 1s β†’ 2s β†’ 4s β†’ 8s β†’ 16s (5 retries)
let exponential = RetryConfig::exponential_default();

// Custom exponential: 2s β†’ 4s β†’ 8s β†’ 16s β†’ 32s (max 60s)
let custom_exp = RetryConfig::exponential(5, Duration::from_secs(2), Duration::from_secs(60));

// Linear: 10s β†’ 10s β†’ 10s (3 retries)  
let linear = RetryConfig::linear(3, Duration::from_secs(10));

// Custom delays: 1s β†’ 5s β†’ 30s
let custom = RetryConfig::custom(vec![
    Duration::from_secs(1),
    Duration::from_secs(5), 
    Duration::from_secs(30),
]);

// No retries - fail immediately
let no_retry = RetryConfig::no_retry();

How Retry Works

// Failed messages are automatically:
// 1. Sent to retry queue with delay (e.g., orders.retry.1)
// 2. After delay, returned to original queue for retry
// 3. If max retries exceeded, sent to dead letter queue (e.g., orders.dlq)

Delay Strategies (TTL vs DelayedExchange)

rust-rabbit supports two strategies for implementing message delays:

1. TTL Strategy (Default) - No Plugin Required

Uses RabbitMQ's TTL feature with temporary retry queues. Simple but less precise.

use rust_rabbit::{RetryConfig, DelayStrategy, Consumer, Connection};
use std::time::Duration;

let retry_config = RetryConfig::exponential_default()
    .with_delay_strategy(DelayStrategy::TTL); // Default, no plugin needed

// Messages failed are sent to: orders.retry.1 (with TTL set)
// After TTL expires, automatically routed back to: orders

Pros:

  • No external plugin required
  • Works out-of-the-box with standard RabbitMQ
  • Simple setup

Cons:

  • Less precise timing (TTL granularity depends on RabbitMQ configuration)
  • Creates many temporary queues (one per retry attempt)
  • Requires cleanup of old retry queues

2. DelayedExchange Strategy - Better Precision

Uses the rabbitmq_delayed_message_exchange plugin for server-side delay management. More reliable and cleaner architecture.

use rust_rabbit::{RetryConfig, DelayStrategy, Consumer, Connection};
use std::time::Duration;

let retry_config = RetryConfig::exponential_default()
    .with_delay_strategy(DelayStrategy::DelayedExchange);

// Messages are published to: orders.delay (delay exchange)
// With x-delay header set to desired delay time
// Exchange automatically routes back to: orders after delay

Setup Requirements:

  1. Install the plugin on RabbitMQ:

    # Download from: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
    # Place .ez file in RabbitMQ plugins directory
    
    # Enable plugin
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    # Restart RabbitMQ
    sudo systemctl restart rabbitmq-server
    
  2. Use in code:

    let retry_config = RetryConfig::linear(3, Duration::from_secs(5))
        .with_delay_strategy(DelayStrategy::DelayedExchange);
    
    let consumer = Consumer::builder(connection, "orders")
        .with_retry(retry_config)
        .build();
    

⚠️ Important: Plugin is Required

If you use DelayStrategy::DelayedExchange without installing the plugin on RabbitMQ:

  • Your application will crash when trying to send messages to the delay exchange
  • The delay exchange declaration will fail
  • You'll get an error like: NOT_FOUND - operation not permitted on this exchange

Always ensure the rabbitmq_delayed_message_exchange plugin is installed and enabled before deploying code that uses DelayStrategy::DelayedExchange.

Pros:

  • More precise timing (microsecond-level)
  • Cleaner architecture (single delay exchange)
  • Better for high-volume scenarios
  • Lower memory footprint on RabbitMQ
  • Built-in reliability

Cons:

  • Requires external plugin installation
  • Plugin adds complexity to RabbitMQ setup
  • Small performance overhead for delay management

Flow Diagram:

DelayedExchange Strategy Flow:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Queue   β”‚  (e.g., "orders")
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
     β”‚
     β”œβ”€> Handler fails ──┐
     β”‚                   β”‚
     β”‚              Publish to
     β”‚              Delay Exchange
     β”‚                   β”‚
     └─────────── β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
                  β”‚ orders.delay β”‚ (x-delay: 5000ms)
                  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                    After delay
                         β”‚
                  β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
                  β”‚  Queue      β”‚
                  β”‚  (orders)   β”‚
                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Example:

See delayed_exchange_example.rs for a complete working example.

πŸ—‘οΈ Dead Letter Queue (DLQ) with Auto-Cleanup

For failed messages that exceed max retries, rust-rabbit automatically sends them to a Dead Letter Queue (DLQ). Now you can set automatic cleanup with TTL:

let retry_config = RetryConfig::exponential_default()
    .with_dlq_ttl(Duration::from_secs(86400));  // Auto-cleanup after 1 day

let consumer = Consumer::builder(connection, "orders")
    .with_retry(retry_config)
    .build();

Flow:

Original Queue (orders)
    ↓
Retries exhausted
    ↓
Message β†’ DLQ (orders.dlq) [TTL: 86400s]
    ↓
After 1 day: Message auto-deleted by RabbitMQ
    ↓
βœ“ No manual cleanup needed!

Configuration Options:

use std::time::Duration;

// 1 hour (fresh failed messages)
.with_dlq_ttl(Duration::from_secs(3600))

// 1 day (default retention)
.with_dlq_ttl(Duration::from_secs(86400))

// 1 week (long retention for analysis)
.with_dlq_ttl(Duration::from_secs(604800))

// No TTL (manual cleanup only - default)
// Don't call .with_dlq_ttl()

Monitoring DLQ:

  1. Open RabbitMQ Management: http://localhost:15672
  2. Go to "Queues" tab
  3. Find "orders.dlq" queue
  4. Check "x-message-ttl" in queue details
  5. Monitor "Message count" to see failed messages

See dlq_ttl_example.rs for complete example.

πŸ”— MassTransit Integration (v1.2.0)

rust-rabbit seamlessly integrates with C# services using MassTransit. You can both consume and publish MassTransit-compatible messages.

Publishing to MassTransit Services

New in v1.2.0: Publish messages that MassTransit will accept and route correctly. Use the with_masstransit() option:

use rust_rabbit::{Connection, Publisher, PublishOptions};
use serde::Serialize;

#[derive(Serialize)]
struct OrderCreated {
    order_id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
    let publisher = Publisher::new(connection);
    
    let order = OrderCreated { order_id: 123, amount: 99.99 };
    
    // Simple: Just enable MassTransit with message type
    publisher.publish_to_exchange(
        "order-exchange",
        "order.created",
        &order,
        Some(PublishOptions::new().with_masstransit("Contracts:OrderCreated"))
    ).await?;
    
    // Advanced: Full MassTransit options
    use rust_rabbit::MassTransitOptions;
    let mt_options = MassTransitOptions::new("Contracts:OrderCreated")
        .with_correlation_id("correlation-12345");
    
    publisher.publish_to_queue(
        "order-queue",
        &order,
        Some(PublishOptions::new().with_masstransit_options(mt_options))
    ).await?;
    
    Ok(())
}

Key Features:

  • βœ… Message Type Format: Accepts "Namespace:TypeName" or "urn:message:Namespace:TypeName" (auto-converts to URN)
  • βœ… Full Compatibility: Sets messageType array in envelope body and MT-Host-MessageType header
  • βœ… Validation: Messages pass MassTransit's strict validation (won't go to skip queue)
  • βœ… Backward Compatible: Existing code works without changes (MassTransit is optional)

See masstransit_option_example.rs for complete examples.

Consuming MassTransit Messages

When a C# service publishes messages via MassTransit's IBus, rust-rabbit automatically:

  • Detects MassTransit's camelCase JSON envelope format
  • Extracts the actual message payload from the message field
  • Preserves correlationId for tracking operations
  • Maintains all existing retry logic
use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct OrderMessage {
    order_id: u32,
    customer_name: String,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
    
    let consumer = Consumer::builder(connection, "order_queue")
        .with_retry(RetryConfig::exponential_default())
        .build();
    
    // Handler receives just the payload (no wrapper)
    consumer.consume(|msg: OrderMessage| async move {
        println!("Order ID: {}", msg.order_id);
        // Your business logic here
        Ok(())
    }).await?;
    
    Ok(())
}

How It Works

Publishing:

  1. When with_masstransit() is used, your message is wrapped in MassTransit's envelope format
  2. Message type is set as an array of URNs: ["urn:message:Namespace:TypeName"]
  3. All required fields are auto-populated (messageId, sentTime, sourceAddress, etc.)
  4. MassTransit services will accept and route the message correctly

Consuming:

  1. Automatic Format Detection: The consumer tries to deserialize as MassTransit format first, then falls back to rust-rabbit format
  2. Message Extraction: MassTransit's message field is automatically extracted to your Rust type
  3. Correlation Tracking: The correlationId from MassTransit is preserved for retries
  4. Retry Compatibility: Failed messages are retried using rust-rabbit's format (simpler, maintains retry tracking)

MassTransit Message Format

MassTransit wraps messages in this structure (camelCase JSON):

{
  "messageId": "guid",
  "correlationId": "guid",
  "sourceAddress": "rabbitmq://...",
  "destinationAddress": "rabbitmq://...",
  "messageType": ["urn:message:Contracts:OrderCreated"],
  "sentTime": "2024-01-01T12:00:00Z",
  "message": {
    // Your actual message payload here
    "orderId": 123,
    "amount": 99.99
  }
}

rust-rabbit automatically handles both publishing and consuming this format seamlessly.

See examples:

βš™οΈ Advanced Features

MessageEnvelope System

For advanced retry tracking and error handling, use the MessageEnvelope system:

use rust_rabbit::{Connection, Publisher, Consumer, MessageEnvelope, RetryConfig};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Clone)]
struct Order {
    id: u32,
    amount: f64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = Connection::new("amqp://localhost:5672").await?;
    
    // Publisher with envelope
    let publisher = Publisher::new(connection.clone());
    let order = Order { id: 123, amount: 99.99 };
    let envelope = MessageEnvelope::new(order, "order_queue")
        .with_max_retries(3);
    
    publisher.publish_envelope_to_queue("order_queue", &envelope, None).await?;
    
    // Consumer with envelope processing
    let consumer = Consumer::builder(connection, "order_queue")
        .with_retry(RetryConfig::exponential_default())
        .build();
    
    consumer.consume_envelopes(|envelope: MessageEnvelope<Order>| async move {
        println!("Processing order {} (attempt {})", 
                 envelope.payload.id, 
                 envelope.metadata.retry_attempt + 1);
        
        // Access retry metadata
        if !envelope.is_first_attempt() {
            println!("This is a retry. Last error: {:?}", envelope.last_error());
        }
        
        Ok(())
    }).await?;
    
    Ok(())
}

Connection Options

use rust_rabbit::Connection;

// Simple connection
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;

// Different connection URLs
let local = Connection::new("amqp://localhost:5672").await?;
let remote = Connection::new("amqp://user:pass@remote-host:5672/vhost").await?;
let secure = Connection::new("amqps://user:pass@secure-host:5671").await?;

Publisher Options

use rust_rabbit::PublishOptions;

let options = PublishOptions::new()
    .mandatory()                         // Make delivery mandatory
    .priority(5);                        // Message priority (0-255)

publisher.publish_to_queue("orders", &message, Some(options)).await?;

Consumer Options

let consumer = Consumer::builder(connection, "order_queue")
    .with_retry(RetryConfig::exponential_default())
    .bind_to_exchange("order_exchange", "new.order")  // Exchange binding with routing key
    .with_prefetch(10)                     // Process 10 messages in parallel
    .build();

πŸ“š Documentation

For detailed guides and advanced topics:

πŸ› οΈ Examples

See the examples/ directory for complete working examples:

πŸ§ͺ Testing

Run the tests:

cargo test

For integration tests with real RabbitMQ:

# Start RabbitMQ with Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# Run integration tests
cargo test --test integration

πŸ”§ Requirements

  • Rust: 1.70+
  • RabbitMQ: 3.8+
  • Tokio: Async runtime

🚦 Migration from v0.x

If you're upgrading from the complex v0.x version:

// OLD (v0.x) - Complex
let consumer = Consumer::new(connection_manager, ConsumerOptions {
    auto_ack: false,
    retry_policy: Some(RetryPolicy::fast()),
    prefetch_count: Some(10),
    // ... many more options
}).await?;

// NEW (v1.0) - Simple  
let consumer = Consumer::builder(connection, "queue")
    .with_retry(RetryConfig::exponential_default())
    .with_prefetch(10)
    .build();

Major Changes:

  • βœ… Simplified API with just Publisher and Consumer
  • βœ… Removed enterprise patterns (saga, event sourcing, request-response)
  • βœ… Removed metrics and health monitoring
  • βœ… Unified retry system with flexible mechanisms
  • βœ… Auto-declare queues and exchanges by default

🎯 Design Philosophy

rust-rabbit v1.0 follows these principles:

  1. Simplicity First: Only essential features, no bloat
  2. Reliability Built-in: Automatic retry and error handling
  3. Easy Configuration: Sensible defaults, minimal setup
  4. Production Ready: Persistent messages, proper ACK handling
  5. Developer Friendly: Clear errors, good documentation

πŸ—ΊοΈ Roadmap

See ROADMAP.md for planned features:

  • Connection pooling and load balancing
  • Monitoring and metrics integration
  • Advanced retry patterns and policies
  • Performance optimizations
  • Additional messaging patterns

πŸ“„ License

MIT License - see LICENSE for details.

🀝 Contributing

Contributions welcome! Please read our contributing guide and submit pull requests.

πŸ’¬ Support


Made with ❀️ by the rust-rabbit team