use anyhow::Result;
use rust_rabbit::{Connection, Consumer, PublishOptions, Publisher, RetryConfig};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{info, warn};
#[derive(Serialize, Deserialize, Debug, Clone)]
struct OrderEvent {
order_id: String,
user_id: String,
amount: f64,
event_type: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct NotificationTask {
recipient: String,
message: String,
priority: u8, }
#[derive(Default, Debug)]
struct Stats {
orders_processed: u64,
notifications_sent: u64,
errors: u64,
}
#[tokio::main]
async fn main() -> Result<()> {
rust_rabbit::init_tracing();
info!("Starting production setup");
let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
let stats = Arc::new(RwLock::new(Stats::default()));
let _order_handle = tokio::spawn(start_order_processor(connection.clone(), stats.clone()));
let _notification_handle = tokio::spawn(start_notification_processor(
connection.clone(),
stats.clone(),
));
let _metrics_handle = tokio::spawn(start_metrics_reporter(stats.clone()));
let _producer_handle = tokio::spawn(start_message_producer(connection.clone()));
info!("All services started. Press Ctrl+C to stop.");
tokio::signal::ctrl_c().await?;
info!("Shutdown signal received");
tokio::time::sleep(Duration::from_secs(2)).await;
info!("Application shutdown complete");
Ok(())
}
async fn start_order_processor(
connection: Arc<Connection>,
stats: Arc<RwLock<Stats>>,
) -> Result<()> {
let consumer = Consumer::builder(connection, "orders")
.with_retry(RetryConfig::exponential_default())
.with_prefetch(10)
.build();
consumer
.consume(move |msg: OrderEvent| {
let stats = Arc::clone(&stats);
async move {
info!("Processing order {}: ${}", msg.order_id, msg.amount);
if msg.amount > 1000.0 {
tokio::time::sleep(Duration::from_millis(500)).await;
}
if msg.order_id.ends_with("999") {
return Err("Processing failed - will retry".into());
}
{
let mut s = stats.write().await;
s.orders_processed += 1;
}
info!("Order {} processed successfully", msg.order_id);
Ok(())
}
})
.await
.map_err(|e| e.into())
}
async fn start_notification_processor(
connection: Arc<Connection>,
stats: Arc<RwLock<Stats>>,
) -> Result<()> {
let consumer = Consumer::builder(connection, "notifications")
.bind_to_exchange("notifications", "notification.*")
.with_retry(RetryConfig::linear(3, Duration::from_secs(5)))
.with_prefetch(15)
.build();
consumer
.consume(move |msg: NotificationTask| {
let stats = Arc::clone(&stats);
async move {
info!("Sending notification to {}", msg.recipient);
let processing_time = match msg.priority {
9..=10 => Duration::from_millis(100), 6..=8 => Duration::from_millis(200), _ => Duration::from_millis(300), };
tokio::time::sleep(processing_time).await;
{
let mut s = stats.write().await;
s.notifications_sent += 1;
}
info!("Notification sent to {}", msg.recipient);
Ok(())
}
})
.await
.map_err(|e| e.into())
}
async fn start_metrics_reporter(stats: Arc<RwLock<Stats>>) -> Result<()> {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let s = stats.read().await;
info!(
"Stats - Orders: {}, Notifications: {}, Errors: {}",
s.orders_processed, s.notifications_sent, s.errors
);
}
}
async fn start_message_producer(connection: Arc<Connection>) -> Result<()> {
let publisher = Publisher::new(connection);
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut counter = 1;
loop {
interval.tick().await;
let order = OrderEvent {
order_id: format!("ORD-{:06}", counter),
user_id: format!("user-{}", counter % 100),
amount: (counter as f64) * 99.99,
event_type: "created".to_string(),
};
if let Err(e) = publisher.publish_to_queue("orders", &order, None).await {
warn!("Failed to publish order: {}", e);
}
let notification = NotificationTask {
recipient: format!("user{}@example.com", counter % 50),
message: format!("Order {} has been created", order.order_id),
priority: if counter % 10 == 0 { 9 } else { 5 },
};
let options = if notification.priority >= 9 {
Some(PublishOptions::new().priority(9))
} else {
None
};
if let Err(e) = publisher
.publish_to_exchange(
"notifications",
"notification.order",
¬ification,
options,
)
.await
{
warn!("Failed to publish notification: {}", e);
}
counter += 1;
}
}