use std::collections::HashMap;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use shove::rabbitmq::RabbitMqConfig;
use shove::{
Broker, ConsumerOptions, DeadMessageMetadata, JsonCodec, MessageHandler, MessageMetadata,
Outcome, QueueTopology, RabbitMq, Topic, TopologyBuilder, define_topic,
};
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::rabbitmq::RabbitMq as RabbitMqImage;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderEvent {
order_id: String,
amount_cents: u64,
}
define_topic!(
MinimalOrder,
OrderEvent,
TopologyBuilder::new("ex-minimal-orders")
.hold_queue(Duration::from_secs(30))
.build()
);
define_topic!(
DlqOrder,
OrderEvent,
TopologyBuilder::new("ex-dlq-orders").dlq().build()
);
define_topic!(
RetryOrder,
OrderEvent,
TopologyBuilder::new("ex-retry-orders")
.hold_queue(Duration::from_secs(5))
.hold_queue(Duration::from_secs(30))
.hold_queue(Duration::from_secs(120))
.dlq()
.build()
);
struct ScheduledOrder;
impl Topic for ScheduledOrder {
type Message = OrderEvent;
type Codec = JsonCodec;
fn topology() -> &'static QueueTopology {
static TOPOLOGY: std::sync::OnceLock<QueueTopology> = std::sync::OnceLock::new();
TOPOLOGY.get_or_init(|| {
TopologyBuilder::new("ex-scheduled-orders")
.hold_queue(Duration::from_secs(10))
.dlq()
.build()
})
}
}
struct AckHandler;
impl MessageHandler<MinimalOrder> for AckHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[minimal] order={} amount=${:.2} attempt={}",
msg.order_id,
msg.amount_cents as f64 / 100.0,
metadata.retry_count + 1,
);
Outcome::Ack
}
}
struct RejectHandler;
impl MessageHandler<DlqOrder> for RejectHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, _metadata: MessageMetadata, _: &()) -> Outcome {
println!("[dlq] rejecting order={} → DLQ", msg.order_id);
Outcome::Reject
}
async fn handle_dead(&self, msg: OrderEvent, metadata: DeadMessageMetadata, _: &()) {
println!(
"[dlq] dead-letter: order={} reason={} deaths={}",
msg.order_id,
metadata.reason.as_deref().unwrap_or("unknown"),
metadata.death_count,
);
}
}
struct RetryHandler;
impl MessageHandler<RetryOrder> for RetryHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[retry] order={} attempt={}",
msg.order_id,
metadata.retry_count + 1,
);
if metadata.retry_count == 0 {
println!("[retry] → transient failure, will Retry");
Outcome::Retry
} else {
println!("[retry] → success on retry");
Outcome::Ack
}
}
}
struct DeferHandler;
impl MessageHandler<ScheduledOrder> for DeferHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[defer] order={} attempt={} redelivered={}",
msg.order_id,
metadata.retry_count + 1,
metadata.redelivered,
);
if metadata.retry_count == 0 && !metadata.redelivered {
println!("[defer] → not ready yet, deferring to hold queue");
Outcome::Defer
} else {
println!("[defer] → processing now");
Outcome::Ack
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let container = RabbitMqImage::default().start().await?;
let port = container.get_host_port_ipv4(5672).await?;
let uri = format!("amqp://guest:guest@localhost:{port}/%2f");
let broker = Broker::<RabbitMq>::new(RabbitMqConfig::new(&uri)).await?;
let topology = broker.topology();
topology.declare::<MinimalOrder>().await?;
topology.declare::<DlqOrder>().await?;
topology.declare::<RetryOrder>().await?;
topology.declare::<ScheduledOrder>().await?;
println!("topologies declared\n");
let publisher = broker.publisher().await?;
let order = OrderEvent {
order_id: "ORD-001".into(),
amount_cents: 5000,
};
publisher.publish::<MinimalOrder>(&order).await?;
publisher.publish::<DlqOrder>(&order).await?;
publisher.publish::<RetryOrder>(&order).await?;
publisher.publish::<ScheduledOrder>(&order).await?;
let mut headers = HashMap::new();
headers.insert("x-source".into(), "example".into());
headers.insert("x-priority".into(), "high".into());
let order2 = OrderEvent {
order_id: "ORD-002".into(),
amount_cents: 9900,
};
publisher
.publish_with_headers::<MinimalOrder>(&order2, headers)
.await?;
let batch = vec![
OrderEvent {
order_id: "ORD-003".into(),
amount_cents: 1000,
},
OrderEvent {
order_id: "ORD-004".into(),
amount_cents: 2500,
},
OrderEvent {
order_id: "ORD-005".into(),
amount_cents: 7777,
},
];
publisher.publish_batch::<MinimalOrder>(&batch).await?;
println!("messages published\n");
let mut supervisor = broker.consumer_supervisor();
supervisor.register::<MinimalOrder, _>(AckHandler, ConsumerOptions::<RabbitMq>::new())?;
supervisor.register::<DlqOrder, _>(RejectHandler, ConsumerOptions::<RabbitMq>::new())?;
supervisor.register::<RetryOrder, _>(
RetryHandler,
ConsumerOptions::<RabbitMq>::new()
.with_max_retries(3)
.with_prefetch_count(20),
)?;
supervisor.register::<ScheduledOrder, _>(DeferHandler, ConsumerOptions::<RabbitMq>::new())?;
let outcome = supervisor
.run_until_timeout(
async {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
_ = tokio::signal::ctrl_c() => {}
}
},
Duration::from_secs(5),
)
.await;
println!("done");
std::process::exit(outcome.exit_code());
}