use std::collections::HashMap;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use shove::sns::SnsConfig;
use shove::{
Broker, ConsumerOptions, DeadMessageMetadata, MessageHandler, MessageMetadata, Outcome, Sqs,
TopologyBuilder, define_topic,
};
use testcontainers::ImageExt;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::localstack::LocalStack;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderEvent {
order_id: String,
amount_cents: u64,
}
define_topic!(
MinimalOrder,
OrderEvent,
TopologyBuilder::new("sqs-minimal-orders").build()
);
define_topic!(
DlqOrder,
OrderEvent,
TopologyBuilder::new("sqs-dlq-orders").dlq().build()
);
define_topic!(
RetryOrder,
OrderEvent,
TopologyBuilder::new("sqs-retry-orders")
.hold_queue(Duration::from_secs(5))
.hold_queue(Duration::from_secs(30))
.hold_queue(Duration::from_secs(120))
.dlq()
.build()
);
struct AckHandler;
impl MessageHandler<MinimalOrder> for AckHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[minimal] order={} amount=${:.2} attempt={}",
msg.order_id,
msg.amount_cents as f64 / 100.0,
metadata.retry_count + 1,
);
Outcome::Ack
}
}
struct RejectHandler;
impl MessageHandler<DlqOrder> for RejectHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, _metadata: MessageMetadata, _: &()) -> Outcome {
println!("[dlq] rejecting order={} → DLQ", msg.order_id);
Outcome::Reject
}
async fn handle_dead(&self, msg: OrderEvent, metadata: DeadMessageMetadata, _: &()) {
println!(
"[dlq] dead-letter: order={} reason={} deaths={}",
msg.order_id,
metadata.reason.as_deref().unwrap_or("unknown"),
metadata.death_count,
);
}
}
struct RetryHandler;
impl MessageHandler<RetryOrder> for RetryHandler {
type Context = ();
async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[retry] order={} attempt={}",
msg.order_id,
metadata.retry_count + 1,
);
if metadata.retry_count == 0 {
println!("[retry] → transient failure, will Retry");
Outcome::Retry
} else {
println!("[retry] → success on retry");
Outcome::Ack
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let auth_token = match std::env::var("LOCALSTACK_AUTH_TOKEN") {
Ok(t) => t,
Err(_) => {
eprintln!(
"LOCALSTACK_AUTH_TOKEN is not set. This example requires a LocalStack Pro auth \
token:\n\n export LOCALSTACK_AUTH_TOKEN=...\n"
);
std::process::exit(1);
}
};
unsafe {
std::env::set_var("AWS_ACCESS_KEY_ID", "test");
std::env::set_var("AWS_SECRET_ACCESS_KEY", "test");
std::env::set_var("AWS_REGION", "us-east-1");
}
let container = LocalStack::default()
.with_env_var("LOCALSTACK_AUTH_TOKEN", auth_token)
.start()
.await?;
let port = container.get_host_port_ipv4(4566).await?;
let endpoint = format!("http://localhost:{port}");
let broker = Broker::<Sqs>::new(SnsConfig {
region: Some("us-east-1".into()),
endpoint_url: Some(endpoint),
})
.await?;
let topology = broker.topology();
topology.declare::<MinimalOrder>().await?;
topology.declare::<DlqOrder>().await?;
topology.declare::<RetryOrder>().await?;
println!("topologies declared\n");
let publisher = broker.publisher().await?;
let order = OrderEvent {
order_id: "ORD-001".into(),
amount_cents: 5000,
};
publisher.publish::<MinimalOrder>(&order).await?;
publisher.publish::<DlqOrder>(&order).await?;
publisher.publish::<RetryOrder>(&order).await?;
let mut headers = HashMap::new();
headers.insert("x-source".into(), "example".into());
headers.insert("x-priority".into(), "high".into());
let order2 = OrderEvent {
order_id: "ORD-002".into(),
amount_cents: 9900,
};
publisher
.publish_with_headers::<MinimalOrder>(&order2, headers)
.await?;
let batch = vec![
OrderEvent {
order_id: "ORD-003".into(),
amount_cents: 1000,
},
OrderEvent {
order_id: "ORD-004".into(),
amount_cents: 2500,
},
OrderEvent {
order_id: "ORD-005".into(),
amount_cents: 7777,
},
];
publisher.publish_batch::<MinimalOrder>(&batch).await?;
println!("messages published\n");
let mut supervisor = broker.consumer_supervisor();
supervisor.register::<MinimalOrder, _>(AckHandler, ConsumerOptions::<Sqs>::new())?;
supervisor.register::<DlqOrder, _>(RejectHandler, ConsumerOptions::<Sqs>::new())?;
supervisor.register::<RetryOrder, _>(
RetryHandler,
ConsumerOptions::<Sqs>::new().with_max_retries(3),
)?;
let outcome = supervisor
.run_until_timeout(
async {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {}
_ = tokio::signal::ctrl_c() => {}
}
},
Duration::from_secs(5),
)
.await;
println!("done");
std::process::exit(outcome.exit_code());
}