use std::time::Duration;
#[derive(Debug, Clone)]
pub struct HoldQueue {
pub(crate) name: String,
pub(crate) delay: Duration,
}
impl HoldQueue {
pub fn name(&self) -> &str {
&self.name
}
pub fn delay(&self) -> Duration {
self.delay
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SequenceFailure {
Skip,
FailAll,
}
#[derive(Debug, Clone)]
pub struct SequenceConfig {
pub(crate) on_failure: SequenceFailure,
pub(crate) routing_shards: u16,
pub(crate) exchange: String, }
impl SequenceConfig {
pub fn on_failure(&self) -> SequenceFailure {
self.on_failure
}
pub fn routing_shards(&self) -> u16 {
self.routing_shards
}
pub fn exchange(&self) -> &str {
&self.exchange
}
}
#[derive(Debug, Clone)]
pub struct QueueTopology {
pub(crate) queue: String,
pub(crate) dlq: Option<String>,
pub(crate) hold_queues: Vec<HoldQueue>,
pub(crate) sequencing: Option<SequenceConfig>,
}
impl QueueTopology {
pub fn queue(&self) -> &str {
&self.queue
}
pub fn dlq(&self) -> Option<&str> {
self.dlq.as_deref()
}
pub fn hold_queues(&self) -> &[HoldQueue] {
&self.hold_queues
}
pub fn sequencing(&self) -> Option<&SequenceConfig> {
self.sequencing.as_ref()
}
pub fn shard_hold_queue_names(&self, shard_index: u16) -> Vec<HoldQueue> {
self.hold_queues
.iter()
.map(|hq| HoldQueue {
name: format!(
"{}-seq-{shard_index}-hold-{}s",
self.queue,
hq.delay.as_secs()
),
delay: hq.delay,
})
.collect()
}
}
pub struct TopologyBuilder {
queue: String,
dlq: Option<String>,
hold_queues: Vec<Duration>,
sequencing: Option<SequenceConfig>,
allow_message_loss: bool,
}
impl TopologyBuilder {
pub fn new(queue: impl Into<String>) -> Self {
Self {
queue: queue.into(),
dlq: None,
hold_queues: Vec::new(),
sequencing: None,
allow_message_loss: false,
}
}
pub fn sequenced(mut self, on_failure: SequenceFailure) -> Self {
let exchange = format!("{}-seq-hash", self.queue);
self.sequencing = Some(SequenceConfig {
on_failure,
routing_shards: 8,
exchange,
});
self
}
pub fn routing_shards(mut self, count: u16) -> Self {
let seq = self
.sequencing
.as_mut()
.expect("routing_shards() called before sequenced()");
seq.routing_shards = count;
self
}
pub fn hold_queue(mut self, delay: Duration) -> Self {
self.hold_queues.push(delay);
self
}
pub fn dlq(mut self) -> Self {
self.dlq = Some(format!("{}-dlq", self.queue));
self
}
pub fn dlq_named(mut self, name: impl Into<String>) -> Self {
self.dlq = Some(name.into());
self
}
pub fn allow_message_loss(mut self) -> Self {
self.allow_message_loss = true;
self
}
pub fn build(self) -> QueueTopology {
if let Some(ref seq) = self.sequencing {
assert!(
seq.routing_shards > 0,
"routing_shards must be greater than 0 when sequencing is enabled"
);
if !self.allow_message_loss {
assert!(
self.dlq.is_some(),
"sequenced topics require a DLQ — call .dlq() or .dlq_named() or .allow_message_loss() before .build()"
);
assert!(
!self.hold_queues.is_empty(),
"sequenced topics require at least one hold queue — call .hold_queue() or .allow_message_loss() before .build()"
);
}
}
if self.sequencing.is_none() && !self.allow_message_loss {
if !self.hold_queues.is_empty() && self.dlq.is_none() {
tracing::warn!(
queue = self.queue,
"topic has hold queues but no DLQ — messages exhausting max_retries will be silently discarded"
);
}
if self.dlq.is_some() && self.hold_queues.is_empty() {
tracing::warn!(
queue = self.queue,
"topic has a DLQ but no hold queues — retries will use broker redelivery with no delay"
);
}
}
let dlq = self.dlq;
let hold_queues = self
.hold_queues
.into_iter()
.map(|delay| HoldQueue {
name: format!("{}-hold-{}s", self.queue, delay.as_secs()),
delay,
})
.collect();
QueueTopology {
queue: self.queue,
dlq,
hold_queues,
sequencing: self.sequencing,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn builder_main_queue_name() {
let topology = TopologyBuilder::new("orders").build();
assert_eq!(topology.queue(), "orders");
}
#[test]
fn builder_dlq_named() {
let topology = TopologyBuilder::new("orders").dlq().build();
assert_eq!(topology.dlq(), Some("orders-dlq"));
}
#[test]
fn builder_no_dlq() {
let topology = TopologyBuilder::new("orders").build();
assert_eq!(topology.dlq(), None);
}
#[test]
fn builder_hold_queues() {
let topology = TopologyBuilder::new("orders")
.hold_queue(Duration::from_secs(30))
.hold_queue(Duration::from_secs(300))
.build();
let hqs = topology.hold_queues();
assert_eq!(hqs.len(), 2);
assert_eq!(hqs[0].name(), "orders-hold-30s");
assert_eq!(hqs[0].delay(), Duration::from_secs(30));
assert_eq!(hqs[1].name(), "orders-hold-300s");
assert_eq!(hqs[1].delay(), Duration::from_secs(300));
}
#[test]
fn builder_no_hold_queues() {
let topology = TopologyBuilder::new("orders").build();
assert!(topology.hold_queues().is_empty());
}
#[test]
fn builder_sequenced_defaults() {
let topology = TopologyBuilder::new("orders")
.sequenced(SequenceFailure::Skip)
.hold_queue(Duration::from_secs(5))
.dlq()
.build();
let seq = topology.sequencing().expect("sequencing should be set");
assert_eq!(seq.routing_shards(), 8);
assert_eq!(seq.exchange(), "orders-seq-hash");
assert_eq!(seq.on_failure(), SequenceFailure::Skip);
}
#[test]
fn builder_sequenced_custom_shards() {
let topology = TopologyBuilder::new("orders")
.sequenced(SequenceFailure::FailAll)
.routing_shards(16)
.hold_queue(Duration::from_secs(5))
.dlq()
.build();
let seq = topology.sequencing().expect("sequencing should be set");
assert_eq!(seq.routing_shards(), 16);
assert_eq!(seq.on_failure(), SequenceFailure::FailAll);
}
#[test]
#[should_panic(expected = "routing_shards() called before sequenced()")]
fn builder_routing_shards_before_sequenced_panics() {
let _ = TopologyBuilder::new("orders").routing_shards(4).build();
}
#[test]
#[should_panic(expected = "routing_shards must be greater than 0")]
fn builder_zero_shards_panics() {
let _ = TopologyBuilder::new("orders")
.sequenced(SequenceFailure::Skip)
.routing_shards(0)
.hold_queue(Duration::from_secs(5))
.dlq()
.build();
}
#[test]
fn builder_no_sequencing() {
let topology = TopologyBuilder::new("orders").build();
assert!(topology.sequencing().is_none());
}
#[test]
fn builder_full_topology() {
let topology = TopologyBuilder::new("payments")
.dlq()
.hold_queue(Duration::from_secs(60))
.hold_queue(Duration::from_secs(600))
.sequenced(SequenceFailure::FailAll)
.routing_shards(32)
.build();
assert_eq!(topology.queue(), "payments");
assert_eq!(topology.dlq(), Some("payments-dlq"));
let hqs = topology.hold_queues();
assert_eq!(hqs.len(), 2);
assert_eq!(hqs[0].name(), "payments-hold-60s");
assert_eq!(hqs[0].delay(), Duration::from_secs(60));
assert_eq!(hqs[1].name(), "payments-hold-600s");
assert_eq!(hqs[1].delay(), Duration::from_secs(600));
let seq = topology.sequencing().expect("sequencing should be set");
assert_eq!(seq.on_failure(), SequenceFailure::FailAll);
assert_eq!(seq.routing_shards(), 32);
assert_eq!(seq.exchange(), "payments-seq-hash");
}
#[test]
#[should_panic(expected = "sequenced topics require a DLQ")]
fn builder_sequenced_without_dlq_panics() {
let _ = TopologyBuilder::new("orders")
.sequenced(SequenceFailure::Skip)
.hold_queue(Duration::from_secs(5))
.build();
}
#[test]
#[should_panic(expected = "sequenced topics require at least one hold queue")]
fn builder_sequenced_without_hold_queue_panics() {
let _ = TopologyBuilder::new("orders")
.sequenced(SequenceFailure::FailAll)
.dlq()
.build();
}
#[test]
fn builder_allow_message_loss_suppresses_dlq_guard() {
let topology = TopologyBuilder::new("ephemeral")
.sequenced(SequenceFailure::Skip)
.hold_queue(Duration::from_secs(5))
.allow_message_loss()
.build();
assert!(topology.dlq().is_none());
assert!(topology.sequencing().is_some());
}
#[test]
fn builder_allow_message_loss_suppresses_hold_queue_guard() {
let topology = TopologyBuilder::new("ephemeral")
.sequenced(SequenceFailure::FailAll)
.dlq()
.allow_message_loss()
.build();
assert!(topology.hold_queues().is_empty());
assert!(topology.sequencing().is_some());
}
#[test]
fn shard_hold_queue_names() {
let topology = TopologyBuilder::new("payments")
.sequenced(SequenceFailure::FailAll)
.routing_shards(4)
.hold_queue(Duration::from_secs(5))
.hold_queue(Duration::from_secs(60))
.dlq()
.build();
let names = topology.shard_hold_queue_names(2);
assert_eq!(names.len(), 2);
assert_eq!(names[0].name(), "payments-seq-2-hold-5s");
assert_eq!(names[0].delay(), Duration::from_secs(5));
assert_eq!(names[1].name(), "payments-seq-2-hold-60s");
assert_eq!(names[1].delay(), Duration::from_secs(60));
}
#[test]
fn builder_allow_message_loss_suppresses_both_guards() {
let topology = TopologyBuilder::new("ephemeral")
.sequenced(SequenceFailure::Skip)
.allow_message_loss()
.build();
assert!(topology.dlq().is_none());
assert!(topology.hold_queues().is_empty());
assert!(topology.sequencing().is_some());
}
#[test]
fn builder_dlq_custom_name() {
let topology = TopologyBuilder::new("orders")
.dlq_named("orders-dead-letters")
.build();
assert_eq!(topology.dlq(), Some("orders-dead-letters"));
}
#[test]
fn builder_dlq_default_suffix_unchanged() {
let topology = TopologyBuilder::new("orders").dlq().build();
assert_eq!(topology.dlq(), Some("orders-dlq"));
}
#[test]
fn builder_dlq_name_overrides_dlq() {
let topology = TopologyBuilder::new("orders")
.dlq()
.dlq_named("custom-dead")
.build();
assert_eq!(topology.dlq(), Some("custom-dead"));
}
#[test]
fn builder_dlq_after_dlq_name_uses_default() {
let topology = TopologyBuilder::new("orders")
.dlq_named("custom-dead")
.dlq()
.build();
assert_eq!(topology.dlq(), Some("orders-dlq"));
}
#[test]
fn builder_dlq_name_with_sequenced() {
let topology = TopologyBuilder::new("events")
.sequenced(SequenceFailure::Skip)
.routing_shards(4)
.hold_queue(Duration::from_secs(5))
.dlq_named("events-failed")
.build();
assert_eq!(topology.dlq(), Some("events-failed"));
assert!(topology.sequencing().is_some());
}
}