rust-rabbit 1.2.2

A simple, reliable RabbitMQ client library for Rust. Easy to use with flexible retry mechanisms and minimal configuration.
Documentation
//! Example demonstrating RabbitMQ delayed message exchange plugin for retry delays
//!
//! This example shows how to use the DelayedExchange strategy for retrying failed messages.
//!
//! **CRITICAL REQUIREMENT**: This example REQUIRES the `rabbitmq_delayed_message_exchange` plugin.
//! Without it, the application will crash when trying to send messages to the delay exchange.
//!
//! **Setup Steps**:
//! 1. Download: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
//! 2. Install: Place the .ez file in RabbitMQ plugins directory
//! 3. Enable plugin: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`
//! 4. Restart RabbitMQ: `systemctl restart rabbitmq-server` (or appropriate OS command)
//!
//! **If plugin is not installed**, you'll get error: `NOT_FOUND - operation not permitted on this exchange`
//!
//! **Flow**:
//! 1. Consumer receives message from queue
//! 2. Handler processes message (may fail)
//! 3. On error, message is published to delay exchange with x-delay header
//! 4. Delay exchange automatically routes message back to original queue after delay
//! 5. Consumer retries processing the message
//! 6. After max retries exceeded, message goes to DLQ

use rust_rabbit::{Connection, Consumer, DelayStrategy, Publisher, RetryConfig};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, warn};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Task {
    id: u32,
    name: String,
    priority: u8,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Setup tracing
    rust_rabbit::init_tracing();

    info!("=== RabbitMQ Delayed Message Exchange Retry Example ===");
    info!("This example requires the rabbitmq_delayed_message_exchange plugin");

    // Connection
    let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
    info!("Connected to RabbitMQ");

    // Setup publisher to publish test messages
    let publisher = Publisher::new(connection.clone());

    // Publish some test messages
    info!("\nPublishing test messages...");
    for i in 1..=3 {
        let task = Task {
            id: i,
            name: format!("Task {}", i),
            priority: (i % 3 + 1) as u8,
        };
        publisher
            .publish_to_queue("task_queue", &task, None)
            .await?;
        info!("  Published: {:?}", task);
    }

    // Consumer setup with DelayedExchange strategy
    // This will use the delayed message exchange plugin for retry delays
    let retry_config = RetryConfig::exponential(3, Duration::from_secs(2), Duration::from_secs(30))
        .with_delay_strategy(DelayStrategy::DelayedExchange); // Use delayed exchange!

    info!("\nConsumer Configuration:");
    info!("  - Strategy: DelayedExchange (rabbitmq_delayed_message_exchange plugin)");
    info!("  - Max retries: {}", retry_config.max_retries);
    info!("  - Backoff: Exponential (2s base, 30s max)");

    let consumer = Consumer::builder(connection.clone(), "task_queue")
        .with_retry(retry_config)
        .build();

    // Track how many times each message is processed
    let attempt_counter = Arc::new(AtomicU32::new(0));

    info!("\nStarting consumer with delayed exchange retry strategy...\n");

    let counter = attempt_counter.clone();
    consumer
        .consume(move |msg: Task| {
            let counter = counter.clone();
            async move {
                let attempt = counter.fetch_add(1, Ordering::SeqCst) + 1;
                info!(
                    "Processing task {} (attempt {}): {}",
                    msg.id, attempt, msg.name
                );

                // Simulate processing with occasional failures
                if msg.id.is_multiple_of(2) && attempt < 3 {
                    // Fail even-numbered tasks on first 2 attempts
                    warn!("  Processing failed for task {}", msg.id);
                    return Err("Processing error".into());
                }

                if msg.id == 3 && attempt == 1 {
                    // Fail task 3 on first attempt
                    warn!("  Processing failed for task 3");
                    return Err("Processing error".into());
                }

                // Success
                info!(
                    "  Task {} processed successfully after {} attempt(s)",
                    msg.id, attempt
                );
                Ok(())
            }
        })
        .await?;

    Ok(())
}

/// Example flow for different scenarios:
///
/// **Scenario 1: Immediate Success**
/// - Task arrives in queue
/// - Handler processes successfully
/// - Message acknowledged
/// - Done (no retries needed)
///
/// **Scenario 2: Success After Retry**
/// - Task arrives in queue
/// - Handler fails (attempt 1)
/// - Message published to delay exchange with exponential backoff (2s base, 30s max)
/// - After 2s, message requeued to task_queue
/// - Handler processes successfully (attempt 2)
/// - Done (1 retry used)
///
/// **Scenario 3: Exhausted Retries**
/// - Task arrives in queue
/// - Handler fails (attempts 1, 2, 3)
/// - After attempt 3 failure, max retries exceeded
/// - Message sent to Dead Letter Queue (task_queue.dlq)
/// - Manual intervention needed
///
/// **Advantages of DelayedExchange over TTL:**
/// - More precise timing (RabbitMQ manages delays server-side)
/// - Cleaner architecture (single delay exchange vs multiple retry queues)
/// - Better for high-volume scenarios
/// - Built-in reliability with delayed exchange plugin
///
/// **Disadvantages:**
/// - Requires external plugin installation
/// - Plugin adds complexity to RabbitMQ setup
/// - Slightly higher memory footprint on RabbitMQ
#[allow(dead_code)]
fn example_scenarios() {
    // This function serves as documentation anchor for the example flow above
}