mitoo 0.3.0

mitoo is a Rust toolkit library that encapsulates methods such as configuration reading, file operations, encryption and decryption, transcoding, regular expressions, threading, collections, trees, sqlite, rabbitMQ, etc., and customizes or integrates various Util tool classes.
Documentation
#[cfg(feature = "mq")]
use amqprs::channel::{BasicAckArguments, BasicConsumeArguments, BasicPublishArguments, Channel, QueueBindArguments, QueueDeclareArguments};
#[cfg(feature = "mq")]
use amqprs::consumer::AsyncConsumer;
#[cfg(feature = "mq")]
use amqprs::{callbacks, connection::{Connection, OpenConnectionArguments}, BasicProperties, Deliver, DELIVERY_MODE_PERSISTENT};
use std::time::Duration;
use async_trait::async_trait;

/// rabbitMQ工具,简化rabbitmq的使用
///
/// # 示例
/// ```rust
/// use mitoo::RabbitMQUtil;
///
/// #[tokio::test]
/// async fn publish() {
///     // 初始化连接信息
///     let util = RabbitMQUtil::new("127.0.0.1", 5672, "guest", "guest").await;
///     // 定义一个队列
///     util.declare_queue("hello").await;
///     // 绑定exchange-queue-routing_key
///     util.queue_bind("amq.topic", "hello", "hello").await;
///     // 循环发送多个消息
///     for _i in 0..10 {
///         let _x = util.publish("amq.topic", "hello", "Hello World, good!").await;
///     }
///     // 测试时,需要保证发布之后,进程存活一定时间,让消息传递到rabbitmq
///     tokio::time::sleep(Duration::from_secs(2)).await;
/// }
///
///
/// #[tokio::test]
/// async fn consume() {
///     // 初始化连接信息
///     let util = RabbitMQUtil::new("127.0.0.1", 5672, "guest", "guest").await;
///     // 消费指定队列,并指定回调消费函数
///     util.consume("hello", |basic_properties, content| async move {
///         println!("===============>{}", String::from_utf8(content).unwrap());
///         println!("===============>{:?}", basic_properties);
///     }).await;
///     // 等待消费结束
///     tokio::time::sleep(Duration::from_secs(5)).await;
/// }
/// ```
#[cfg(feature = "mq")]
pub struct RabbitMQUtil {
    pub connection: Connection,
    /// 暴露的目的:便于进行自定义exchange及queue进行灵活绑定
    pub channel: Channel,
}

#[cfg(feature = "mq")]
impl RabbitMQUtil {

    /// 创建一个新的AMQP连接和通道
    /// 
    /// 该函数异步建立到AMQP服务器的连接,并打开一个通道用于消息操作。
    /// 
    /// # 参数
    /// * `host` - AMQP服务器主机地址
    /// * `port` - AMQP服务器端口号
    /// * `username` - 连接用户名
    /// * `password` - 连接密码
    /// 
    /// # 返回值
    /// 返回包含已建立连接和通道的实例
    pub async fn new(host: &str, port: u16, username: &str, password: &str) -> Self {

        // 构建新连接所需的参数
        let args = OpenConnectionArguments::new(host, port, username, password);

        // 使用给定参数打开AMQP连接
        let connection = Connection::open(&args).await.unwrap();

        // 注册连接级别的回调函数
        // 在生产环境中,用户应该创建自己的类型并实现ConnectionCallback trait
        connection.register_callback(callbacks::DefaultConnectionCallback).await.unwrap();

        // 在此连接上打开AMQP通道
        let channel = connection.open_channel(None).await.unwrap();
        // 注册通道级别的回调函数
        // 在生产环境中,用户应该创建自己的类型并实现ChannelCallback trait
        channel.register_callback(callbacks::DefaultChannelCallback).await.unwrap();

        Self {
            connection,
            channel,
        }
    }

    pub async fn declare_queue(&self, queue_name: &str) {
        let q_args = QueueDeclareArguments::durable_client_named(queue_name);
        let (_queue_name, _, _) = self.channel.queue_declare(q_args).await.unwrap().unwrap();
    }

    pub async fn queue_bind(&self, exchange_name: &str, queue_name: &str, routing_key: &str) {
        self.channel
            .queue_bind(QueueBindArguments::new(
                &queue_name,
                exchange_name,
                routing_key,
            ))
            .await
            .unwrap();
    }

    /// 调用该方法需要注意,调用之后需要保持程序存活一段时候,让数据发送到rabbitmq上
    pub async fn publish(&self, exchange: &str, routing_key: &str, mesage: &str) {
        let payload = String::from(mesage).into_bytes();
        let publish_args = BasicPublishArguments::new(exchange, routing_key);

        // publish messages as persistent
        let props = BasicProperties::default()
            .with_delivery_mode(DELIVERY_MODE_PERSISTENT)
            .finish();
        self.channel.basic_publish(props, payload, publish_args).await.unwrap();

        // Check connection should still open and no network i/o failure after publish
        // match tokio::time::timeout(Duration::from_millis(20), self.connection.listen_network_io_failure())
        //     .await
        // {
        //     Ok(is_failure) => {
        //         panic!("Unexpected network I/O failure: {is_failure}, connection is_open status: {}",
        //             self.connection.is_open()
        //         );
        //     }
        //     Err(_) => {
        //         println!("Network I/O OK after publish");
        //         assert!(self.connection.is_open(), "Connection should be still open");
        //     }
        // }
    }

    /// 消费指定队列的消息
    ///
    /// 该函数用于从指定的队列中消费消息,并通过回调函数处理接收到的消息。
    /// 函数会创建一个消费者并将其绑定到指定队列,当有消息到达时会调用提供的回调函数。
    ///
    /// # 参数
    /// * `queue_name` - 要消费消息的队列名称
    /// * `callback` - 处理消息的回调函数,接收消息属性和消息体作为参数
    pub async fn consume<F, Fut>(&self, queue_name: &str, callback: F)
    where
        F: (Fn(BasicProperties, Vec<u8>) -> Fut) + 'static,
        Fut: Future<Output = ()> + Send + 'static
    {
        let args = BasicConsumeArguments::new(&queue_name, "rabbitmq_util");
        self.channel.basic_consume(CustomConsumer::new(args.no_ack, callback), args).await.unwrap();
        tokio::time::sleep(Duration::from_secs(5)).await;
    }

}

#[cfg(feature = "mq")]
pub struct CustomConsumer<F, Fut>
where
    F: Fn(BasicProperties, Vec<u8>) -> Fut,
    Fut: Future<Output = ()> + Send
{
    no_ack: bool,
    cb: F,
}

#[cfg(feature = "mq")]
impl <F, Fut> CustomConsumer<F, Fut>
where
    F: Fn(BasicProperties, Vec<u8>) -> Fut,
    Fut: Future<Output = ()> + Send
{
    pub fn new(no_ack: bool, cb: F) -> Self {
        Self {
            no_ack,
            cb
        }
    }
}

#[cfg(feature = "mq")]
unsafe impl <F, Fut> Send for CustomConsumer<F, Fut>
where
    F: Fn(BasicProperties, Vec<u8>) -> Fut,
    Fut: Future<Output = ()> + Send
{ }

#[cfg(feature = "mq")]
#[async_trait]
impl <F, Fut> AsyncConsumer for CustomConsumer<F, Fut>
where
    F: Fn(BasicProperties, Vec<u8>) -> Fut,
    Fut: Future<Output = ()> + Send
{
    async fn consume(&mut self, channel: &Channel, deliver: Deliver, basic_properties: BasicProperties, content: Vec<u8>) {

        // 通过回调函数进行处理
        (self.cb)(basic_properties, content).await;
        self.no_ack = true;

        // ack explicitly if manual ack
        if !self.no_ack {
            let args = BasicAckArguments::new(deliver.delivery_tag(), true);
            // should call blocking version of API because we are in blocing context
            channel.basic_ack(args).await.unwrap();
        }
    }
}

#[cfg(feature = "mq")]
#[cfg(test)]
mod tests {
    use crate::third::rabbitmq_util::RabbitMQUtil;
    use std::time::Duration;

    #[tokio::test]
    async fn publish() {
        let util = RabbitMQUtil::new("192.168.1.187", 5672, "guest", "guest").await;
        util.declare_queue("hello").await;
        util.queue_bind("amq.topic", "hello", "hello").await;
        for _i in 0..10 {
            let _x = util.publish("amq.topic", "hello", "Hello World, good!").await;
        }
        // 测试时,需要保证发布之后,进程存活一定时间,让消息传递到rabbitmq
        tokio::time::sleep(Duration::from_secs(2)).await;
    }

    #[tokio::test]
    async fn consume() {
        let util = RabbitMQUtil::new("192.168.1.187", 5672, "guest", "guest").await;
        util.consume("hello", |basic_properties, content| async move {
            println!("===============>{}", String::from_utf8(content).unwrap());
            println!("===============>{:?}", basic_properties);
        }).await;
        tokio::time::sleep(Duration::from_secs(5)).await;
    }


}