shove 0.10.2

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
use lapin::Channel;
use lapin::options::{ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions};
use lapin::types::{AMQPValue, FieldTable};

use crate::error::{Result, ShoveError};
use crate::topology::QueueTopology;

const X_DEAD_LETTER_EXCHANGE: &str = "x-dead-letter-exchange";
const X_DEAD_LETTER_ROUTING_KEY: &str = "x-dead-letter-routing-key";
const X_MESSAGE_TTL: &str = "x-message-ttl";
const X_SINGLE_ACTIVE_CONSUMER: &str = "x-single-active-consumer";

fn with_dlq_routing(args: &mut FieldTable, dlq: &str) {
    args.insert(
        X_DEAD_LETTER_EXCHANGE.into(),
        AMQPValue::LongString("".into()),
    );
    args.insert(
        X_DEAD_LETTER_ROUTING_KEY.into(),
        AMQPValue::LongString(dlq.into()),
    );
}

fn hold_queue_args(route_back_to: &str, ttl_ms: i64) -> FieldTable {
    let mut args = FieldTable::default();
    args.insert(X_MESSAGE_TTL.into(), AMQPValue::LongLongInt(ttl_ms));
    with_dlq_routing(&mut args, route_back_to);
    args
}

/// Declares RabbitMQ broker resources for a topic's topology.
///
/// All declarations are idempotent — safe to call on every startup.
pub struct RabbitMqTopologyDeclarer {
    channel: Channel,
}

impl RabbitMqTopologyDeclarer {
    pub fn new(channel: Channel) -> Self {
        Self { channel }
    }

    async fn declare_queue(&self, name: &str, args: FieldTable) -> Result<()> {
        self.channel
            .queue_declare(
                name.into(),
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                args,
            )
            .await
            .map_err(|e| ShoveError::Topology(format!("failed to declare queue '{name}': {e}")))?;
        Ok(())
    }

    async fn declare_unsequenced(&self, topology: &QueueTopology) -> Result<()> {
        if let Some(dlq) = topology.dlq() {
            self.declare_queue(dlq, FieldTable::default()).await?;
        }

        let mut main_args = FieldTable::default();
        if let Some(dlq) = topology.dlq() {
            with_dlq_routing(&mut main_args, dlq);
        }
        self.declare_queue(topology.queue(), main_args).await?;

        for hq in topology.hold_queues() {
            let args = hold_queue_args(topology.queue(), hq.delay().as_millis() as i64);
            self.declare_queue(hq.name(), args).await?;
        }

        Ok(())
    }

    async fn declare_sequenced(&self, topology: &QueueTopology) -> Result<()> {
        let seq = topology
            .sequencing()
            .expect("declare_sequenced called without sequencing config");

        if let Some(dlq) = topology.dlq() {
            self.declare_queue(dlq, FieldTable::default()).await?;
        }

        // 2. Declare consistent-hash exchange
        self.channel
            .exchange_declare(
                seq.exchange().into(),
                lapin::ExchangeKind::Custom("x-consistent-hash".to_string()),
                ExchangeDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                FieldTable::default(),
            )
            .await
            .map_err(|e| {
                ShoveError::Topology(format!(
                    "failed to declare exchange '{}': {e}",
                    seq.exchange()
                ))
            })?;

        // 3. Declare N sub-queues with single-active-consumer, bind to hash exchange
        for i in 0..seq.routing_shards() {
            let sub_queue = format!("{}-seq-{i}", topology.queue());

            // Per-shard hold queues dead-letter back to this sub-queue
            for hq in topology.shard_hold_queue_names(i) {
                let args = hold_queue_args(&sub_queue, hq.delay().as_millis() as i64);
                self.declare_queue(hq.name(), args).await?;
            }

            let mut args = FieldTable::default();
            args.insert(X_SINGLE_ACTIVE_CONSUMER.into(), AMQPValue::Boolean(true));
            if let Some(dlq) = topology.dlq() {
                with_dlq_routing(&mut args, dlq);
            }
            self.declare_queue(&sub_queue, args).await?;

            // Bind to hash exchange — routing weight "1" for even distribution
            self.channel
                .queue_bind(
                    sub_queue.as_str().into(),
                    seq.exchange().into(),
                    "1".into(),
                    QueueBindOptions::default(),
                    FieldTable::default(),
                )
                .await
                .map_err(|e| {
                    ShoveError::Topology(format!(
                        "failed to bind '{sub_queue}' to '{}': {e}",
                        seq.exchange()
                    ))
                })?;
        }

        Ok(())
    }
}

impl RabbitMqTopologyDeclarer {
    pub async fn declare(&self, topology: &QueueTopology) -> Result<()> {
        if topology.sequencing().is_some() {
            self.declare_sequenced(topology).await
        } else {
            self.declare_unsequenced(topology).await
        }
    }
}