use rust_rabbit::{Connection, Consumer, RetryConfig};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{error, info, warn};
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Order {
id: u32,
customer_id: u32,
amount: f64,
status: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Notification {
recipient: String,
subject: String,
priority: u8,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
rust_rabbit::init_tracing();
info!("Starting basic consumer examples");
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
let _handles = [
start_simple_consumer(connection.clone()),
start_retry_consumer(connection.clone()),
start_exchange_consumer(connection.clone()),
start_manual_ack_consumer(connection.clone()),
];
info!("All consumers started. Press Ctrl+C to stop.");
tokio::signal::ctrl_c().await?;
info!("Received shutdown signal, stopping consumers...");
Ok(())
}
fn start_simple_consumer(
connection: std::sync::Arc<Connection>,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let consumer = Consumer::builder(connection, "simple_orders")
.with_prefetch(5)
.build();
consumer
.consume(|msg: Order| async move {
info!("Simple - Processing order {} (${:.2})", msg.id, msg.amount);
if msg.amount <= 0.0 {
error!("Invalid amount for order {}", msg.id);
return Err("Invalid amount".into());
}
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Order {} processed", msg.id);
Ok(())
})
.await
.map_err(|e| e.into())
})
}
fn start_retry_consumer(
connection: std::sync::Arc<Connection>,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let consumer = Consumer::builder(connection, "retry_orders")
.with_retry(RetryConfig::exponential_default()) .with_prefetch(3)
.build();
consumer
.consume(|msg: Order| async move {
info!("Retry - Processing order {}", msg.id);
match msg.status.as_str() {
"invalid" => {
error!("Invalid order - not retrying");
Ok(()) }
"network_error" => {
warn!("Network error - will retry");
Err("Network temporarily unavailable".into())
}
_ => {
tokio::time::sleep(Duration::from_millis(200)).await;
info!("Order {} processed successfully", msg.id);
Ok(())
}
}
})
.await
.map_err(|e| e.into())
})
}
fn start_exchange_consumer(
connection: std::sync::Arc<Connection>,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let consumer = Consumer::builder(connection, "notifications")
.bind_to_exchange("notifications", "order.*")
.with_retry(RetryConfig::linear(2, Duration::from_secs(5)))
.with_prefetch(10)
.build();
consumer
.consume(|msg: Notification| async move {
info!("Exchange - Processing notification for {}", msg.recipient);
let processing_time = match msg.priority {
9..=10 => Duration::from_millis(50), 6..=8 => Duration::from_millis(100), _ => Duration::from_millis(200), };
tokio::time::sleep(processing_time).await;
if msg.recipient.contains("invalid") {
warn!("Invalid recipient - will retry");
return Err("Invalid recipient".into());
}
info!("Notification sent to {}", msg.recipient);
Ok(())
})
.await
.map_err(|e| e.into())
})
}
fn start_manual_ack_consumer(
connection: std::sync::Arc<Connection>,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let consumer = Consumer::builder(connection, "manual_orders")
.manual_ack() .with_prefetch(2)
.build();
consumer
.consume(|msg: Order| async move {
info!("Manual ACK - Processing order {}", msg.id);
tokio::time::sleep(Duration::from_millis(300)).await;
if msg.amount > 1000.0 {
info!("High value order {} - manual verification needed", msg.id);
info!(
"High value order {} processed (manual verification in real scenario)",
msg.id
);
} else {
info!("Order {} processed", msg.id);
}
Ok(())
})
.await
.map_err(|e| e.into())
})
}
#[allow(dead_code)]
async fn publish_test_messages() -> Result<(), Box<dyn std::error::Error>> {
use rust_rabbit::Publisher;
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
let publisher = Publisher::new(connection);
let test_orders = vec![
(
"simple_orders",
Order {
id: 1,
customer_id: 100,
amount: 99.99,
status: "pending".to_string(),
},
),
(
"retry_orders",
Order {
id: 2,
customer_id: 101,
amount: 199.99,
status: "network_error".to_string(),
},
),
(
"manual_orders",
Order {
id: 3,
customer_id: 102,
amount: 1999.99,
status: "pending".to_string(),
},
),
];
for (queue, order) in test_orders {
publisher.publish_to_queue(queue, &order, None).await?;
info!("Published order {} to {}", order.id, queue);
}
let notification = Notification {
recipient: "customer@example.com".to_string(),
subject: "Order confirmation".to_string(),
priority: 7,
};
publisher
.publish_to_exchange("notifications", "order.created", ¬ification, None)
.await?;
info!("Published notification to exchange");
Ok(())
}