zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use deadpool_lapin::lapin;

use deadpool_lapin::lapin::{
    options::{
        BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, BasicQosOptions,
        ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
    },
    types::FieldTable,
    BasicProperties, Consumer, ExchangeKind, Queue,
};

use futures::StreamExt;

#[derive(Clone)]
pub struct RabbitClient {
    pub channel: lapin::Channel,
}

// direct: 如果路由键匹配的话消息就会被投递到对应的队列。
// fanout: 广播消息到多个队列, 即将消息投递到所有附加在此交换器上的所有队列。
// topic: 将多个不同源头的消息投递到同一个队列。
impl RabbitClient {
    // 创建队列
    // queue_declare 如果指定的队列不存在,则创建该队列;
    // 如果您为队列名称指定空字符串,RabbitMQ 将创建一个新队列、生成名称并返回它。
    // 如果存在且与指定的设置匹配,则不执行任何操作。
    // exclusive: 通过指定 Exclusive,队列将对消费者是独占的。
    pub async fn queue_declare(&self, queue_name: &str, exclusive: bool) -> anyhow::Result<Queue> {
        let opt = if exclusive {
            QueueDeclareOptions {
                // 通过指定 Exclusive,队列将对消费者是独占的。(独立队列)
                exclusive: true,
                ..Default::default()
            }
        } else {
            QueueDeclareOptions::default()
        };

        Ok(self
            .channel
            .queue_declare(queue_name, opt, FieldTable::default())
            .await?)
    }

    // direct 实现 使用不同的 routing key 多次绑定同一个 exchange 和 queue。
    // 直接交换类型的工作原理如下:
    // 1. 消息队列使用路由键 K 绑定到交换器。
    // 2. 发布者向交换器发送带有路由键 R 的消息。
    // 3. 如果 K = R,则消息将传递到消息队列。
    //    服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
    //    一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
    //
    // fanout 实现所谓的 Pub/Sub 模式(广播消息到多个队列),其中每个消费者都有一个独立的队列,并将一条消息发送到多个队列。
    //
    // Topic 交换器中的路由键可以使用句点 . 例如: aaa.bbb.ccc 等等
    // 可以用 * 匹配一个级别,也可以用 # 匹配任何级别。
    // 例如1: 消费者1 绑定 tracking.# 键。这将订阅任何跟踪域上的事件。
    // 例如2: 消费者2 绑定 *.tenant.* 键 (<domain>.<tenant>.<event>)。无论域或事件类型如何,都以租户 tenant 为目标。
    pub async fn exchange_declare(
        &self,
        exchange_name: &str,
        exchange_kind: ExchangeKind,
    ) -> anyhow::Result<()> {
        self.channel
            .exchange_declare(
                exchange_name,
                exchange_kind,
                ExchangeDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        Ok(())
    }

    pub async fn queue_bind(
        &self,
        queue_name: &str,
        exchange_name: &str,
        routing_key: &str,
    ) -> anyhow::Result<()> {
        // 通过绑定交换器和队列,消息将被发布。
        self.channel
            .queue_bind(
                queue_name,
                exchange_name,
                routing_key,
                QueueBindOptions::default(),
                FieldTable::default(),
            )
            .await?;

        Ok(())
    }

    // exchange: 如果将交换器指定一个空字符串,消息将发送到默认交换器。
    // routing_key: 如果 exchange 为空 且 routing_key 和 队列的名字 一样则消息将发送到同名队列
    // 即: 发送到默认交换器的消息,它将引用发布消息时的 routing_key,并将消息路由到和 routing_key 同名的队列中。
    pub async fn publish(
        &self,
        exchange: &str,
        routing_key: &str,
        message: &str,
    ) -> anyhow::Result<()> {
        self.channel
            .basic_publish(
                // ------------------------------
                // 交换器是虚拟主机内的消息路由代理。
                // ------------------------------
                // 交换器-AMQP 3.1.3: 理。
                // ------------------------------
                // 交换器实例(我们通常称为“交换器”)接受消息和路由信息(主要是路由键)并将消息传递到消息队列或内部服务。
                // 发布消息时,发布者将消息发布到这个交换器而不是队列。
                // 如果您为交换器指定一个空字符串,它将指向默认交换器。
                // 默认交换器是一种称为直接交换器的交换器,它引用发布时的 routing_key,将消息路由到同名的队列中。
                // 所以,最终,如果发布者为 routing_key 指定 queue,而不指定 exchange,则可以将消息发布到目标队列。
                // ------------------------------
                // 交换器-AMQP 3.1.3.1: (直接交换类型的工作原理如下)。
                // ------------------------------
                // 1. 消息队列使用路由键 K 绑定到交换器。
                // 2. 发布者向交换器发送带有路由密钥 R 的消息。
                // 3. 如果 K = R,则消息将传递到消息队列。
                // 服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
                // 一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
                exchange,
                routing_key,
                BasicPublishOptions::default(),
                message.as_bytes(),
                BasicProperties::default(),
            )
            .await?;

        Ok(())
    }

    pub async fn send_message(
        &self,
        exchange: &str,
        routing_key: &str,
        message: &str,
        response_queue_name: &str,
        correlation_id: &str,
    ) -> anyhow::Result<()> {
        self.channel
            .basic_publish(
                // ------------------------------
                // 交换器是虚拟主机内的消息路由代理。
                // ------------------------------
                // 交换器-AMQP 3.1.3: 理。
                // ------------------------------
                // 交换器实例(我们通常称为“交换器”)接受消息和路由信息(主要是路由键)并将消息传递到消息队列或内部服务。
                // 发布消息时,发布者将消息发布到这个交换器而不是队列。
                // 如果您为交换器指定一个空字符串,它将指向默认交换器。
                // 默认交换器是一种称为直接交换器的交换器,它引用发布时的 routing_key,将消息路由到同名的队列中。
                // 所以,最终,如果发布者为 routing_key 指定 queue,而不指定 exchange,则可以将消息发布到目标队列。
                // ------------------------------
                // 交换器-AMQP 3.1.3.1: (直接交换类型的工作原理如下)。
                // ------------------------------
                // 1. 消息队列使用路由键 K 绑定到交换器。
                // 2. 发布者向交换器发送带有路由密钥 R 的消息。
                // 3. 如果 K = R,则消息将传递到消息队列。
                // 服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
                // 一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
                exchange,
                routing_key,
                BasicPublishOptions::default(),
                message.as_bytes(),
                BasicProperties::default()
                    .with_reply_to(response_queue_name.into())
                    .with_correlation_id(correlation_id.into()),
            )
            .await?;

        Ok(())
    }

    // Spawn consumer task.
    // let consumer_handle = tokio::spawn({
    //     let channel = channel.clone();
    //     async move {
    //         if let Err(err) = consumer(channel).await {
    //             tracing::error!("{err:?}");
    //         }
    //     }
    // });
    //
    // Wait forever
    // consumer_handle.await?;
    pub async fn consumer(&self, queue_name: &str, consumer_tag: &str) -> anyhow::Result<Consumer> {
        // basic_qos 指定消费者一次从队列中检索多少条消息。由于这里指定了1.
        // 队列中的消息会按顺序分发给消费者1和2。
        self.channel
            .basic_qos(1, BasicQosOptions::default())
            .await?;

        let consumer = self
            .channel
            .basic_consume(
                queue_name,
                consumer_tag,
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;

        Ok(consumer)
    }

    pub async fn consumer_loop(
        &self,
        name: &str,
        queue_name: &str,
        mut consumer: Consumer,
    ) -> anyhow::Result<()> {
        if let Some(Ok(delivery)) = consumer.next().await {
            let data = String::from_utf8_lossy(delivery.data.as_slice());

            log::info!(
                "consumer: name={}, queue_name={}, data={}",
                name,
                queue_name,
                data
            );

            if let Some(correlation_id) = delivery.properties.correlation_id() {
                self.channel
                    .basic_publish(
                        "",
                        delivery.properties.reply_to().clone().unwrap().as_str(),
                        BasicPublishOptions::default(),
                        "Ok".as_bytes(),
                        BasicProperties::default().with_correlation_id(correlation_id.to_owned()),
                    )
                    .await?;
            } else {
                // 消费者通知 RabbitMQ 服务器消息处理成功,并从队列中删除该消息。
                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
                    log::error!("consumer-ack: queue_name={}, error={:?}", queue_name, e);
                }
            }
        }

        Ok(())
    }
}