use lapin::Channel;
use lapin::options::{ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions};
use lapin::types::{AMQPValue, FieldTable};
use crate::error::{Result, ShoveError};
use crate::topology::QueueTopology;
const X_DEAD_LETTER_EXCHANGE: &str = "x-dead-letter-exchange";
const X_DEAD_LETTER_ROUTING_KEY: &str = "x-dead-letter-routing-key";
const X_MESSAGE_TTL: &str = "x-message-ttl";
const X_SINGLE_ACTIVE_CONSUMER: &str = "x-single-active-consumer";
fn with_dlq_routing(args: &mut FieldTable, dlq: &str) {
args.insert(
X_DEAD_LETTER_EXCHANGE.into(),
AMQPValue::LongString("".into()),
);
args.insert(
X_DEAD_LETTER_ROUTING_KEY.into(),
AMQPValue::LongString(dlq.into()),
);
}
fn hold_queue_args(route_back_to: &str, ttl_ms: i64) -> FieldTable {
let mut args = FieldTable::default();
args.insert(X_MESSAGE_TTL.into(), AMQPValue::LongLongInt(ttl_ms));
with_dlq_routing(&mut args, route_back_to);
args
}
pub struct RabbitMqTopologyDeclarer {
channel: Channel,
}
impl RabbitMqTopologyDeclarer {
pub fn new(channel: Channel) -> Self {
Self { channel }
}
async fn declare_queue(&self, name: &str, args: FieldTable) -> Result<()> {
self.channel
.queue_declare(
name.into(),
QueueDeclareOptions {
durable: true,
..Default::default()
},
args,
)
.await
.map_err(|e| ShoveError::Topology(format!("failed to declare queue '{name}': {e}")))?;
Ok(())
}
async fn declare_unsequenced(&self, topology: &QueueTopology) -> Result<()> {
if let Some(dlq) = topology.dlq() {
self.declare_queue(dlq, FieldTable::default()).await?;
}
let mut main_args = FieldTable::default();
if let Some(dlq) = topology.dlq() {
with_dlq_routing(&mut main_args, dlq);
}
self.declare_queue(topology.queue(), main_args).await?;
for hq in topology.hold_queues() {
let args = hold_queue_args(topology.queue(), hq.delay().as_millis() as i64);
self.declare_queue(hq.name(), args).await?;
}
Ok(())
}
async fn declare_sequenced(&self, topology: &QueueTopology) -> Result<()> {
let seq = topology.sequencing().ok_or(ShoveError::Topology(
"declare_sequenced called without sequencing config".into(),
))?;
if let Some(dlq) = topology.dlq() {
self.declare_queue(dlq, FieldTable::default()).await?;
}
self.channel
.exchange_declare(
seq.exchange().into(),
lapin::ExchangeKind::Custom("x-consistent-hash".to_string()),
ExchangeDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await
.map_err(|e| {
ShoveError::Topology(format!(
"failed to declare exchange '{}': {e}",
seq.exchange()
))
})?;
for i in 0..seq.routing_shards() {
let sub_queue = format!("{}-seq-{i}", topology.queue());
for hq in topology.shard_hold_queue_names(i) {
let args = hold_queue_args(&sub_queue, hq.delay().as_millis() as i64);
self.declare_queue(hq.name(), args).await?;
}
let mut args = FieldTable::default();
args.insert(X_SINGLE_ACTIVE_CONSUMER.into(), AMQPValue::Boolean(true));
if let Some(dlq) = topology.dlq() {
with_dlq_routing(&mut args, dlq);
}
self.declare_queue(&sub_queue, args).await?;
self.channel
.queue_bind(
sub_queue.as_str().into(),
seq.exchange().into(),
"1".into(),
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| {
ShoveError::Topology(format!(
"failed to bind '{sub_queue}' to '{}': {e}",
seq.exchange()
))
})?;
}
Ok(())
}
}
impl RabbitMqTopologyDeclarer {
pub async fn declare(&self, topology: &QueueTopology) -> Result<()> {
if topology.sequencing().is_some() {
self.declare_sequenced(topology).await
} else {
self.declare_unsequenced(topology).await
}
}
}