use rust_rabbit::{Connection, Consumer, DelayStrategy, Publisher, RetryConfig};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, warn};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Task {
id: u32,
name: String,
priority: u8,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
rust_rabbit::init_tracing();
info!("=== RabbitMQ Delayed Message Exchange Retry Example ===");
info!("This example requires the rabbitmq_delayed_message_exchange plugin");
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
info!("Connected to RabbitMQ");
let publisher = Publisher::new(connection.clone());
info!("\nPublishing test messages...");
for i in 1..=3 {
let task = Task {
id: i,
name: format!("Task {}", i),
priority: (i % 3 + 1) as u8,
};
publisher
.publish_to_queue("task_queue", &task, None)
.await?;
info!(" Published: {:?}", task);
}
let retry_config = RetryConfig::exponential(3, Duration::from_secs(2), Duration::from_secs(30))
.with_delay_strategy(DelayStrategy::DelayedExchange);
info!("\nConsumer Configuration:");
info!(" - Strategy: DelayedExchange (rabbitmq_delayed_message_exchange plugin)");
info!(" - Max retries: {}", retry_config.max_retries);
info!(" - Backoff: Exponential (2s base, 30s max)");
let consumer = Consumer::builder(connection.clone(), "task_queue")
.with_retry(retry_config)
.build();
let attempt_counter = Arc::new(AtomicU32::new(0));
info!("\nStarting consumer with delayed exchange retry strategy...\n");
let counter = attempt_counter.clone();
consumer
.consume(move |msg: Task| {
let counter = counter.clone();
async move {
let attempt = counter.fetch_add(1, Ordering::SeqCst) + 1;
info!(
"Processing task {} (attempt {}): {}",
msg.id, attempt, msg.name
);
if msg.id.is_multiple_of(2) && attempt < 3 {
warn!(" Processing failed for task {}", msg.id);
return Err("Processing error".into());
}
if msg.id == 3 && attempt == 1 {
warn!(" Processing failed for task 3");
return Err("Processing error".into());
}
info!(
" Task {} processed successfully after {} attempt(s)",
msg.id, attempt
);
Ok(())
}
})
.await?;
Ok(())
}
#[allow(dead_code)]
fn example_scenarios() {
}