#[cfg(feature = "mq")]
use amqprs::channel::{BasicAckArguments, BasicConsumeArguments, BasicPublishArguments, Channel, QueueBindArguments, QueueDeclareArguments};
#[cfg(feature = "mq")]
use amqprs::consumer::AsyncConsumer;
#[cfg(feature = "mq")]
use amqprs::{callbacks, connection::{Connection, OpenConnectionArguments}, BasicProperties, Deliver, DELIVERY_MODE_PERSISTENT};
use std::time::Duration;
use async_trait::async_trait;
#[cfg(feature = "mq")]
pub struct RabbitMQUtil {
pub connection: Connection,
pub channel: Channel,
}
#[cfg(feature = "mq")]
impl RabbitMQUtil {
pub async fn new(host: &str, port: u16, username: &str, password: &str) -> Self {
let args = OpenConnectionArguments::new(host, port, username, password);
let connection = Connection::open(&args).await.unwrap();
connection.register_callback(callbacks::DefaultConnectionCallback).await.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel.register_callback(callbacks::DefaultChannelCallback).await.unwrap();
Self {
connection,
channel,
}
}
pub async fn declare_queue(&self, queue_name: &str) {
let q_args = QueueDeclareArguments::durable_client_named(queue_name);
let (_queue_name, _, _) = self.channel.queue_declare(q_args).await.unwrap().unwrap();
}
pub async fn queue_bind(&self, exchange_name: &str, queue_name: &str, routing_key: &str) {
self.channel
.queue_bind(QueueBindArguments::new(
&queue_name,
exchange_name,
routing_key,
))
.await
.unwrap();
}
pub async fn publish(&self, exchange: &str, routing_key: &str, mesage: &str) {
let payload = String::from(mesage).into_bytes();
let publish_args = BasicPublishArguments::new(exchange, routing_key);
let props = BasicProperties::default()
.with_delivery_mode(DELIVERY_MODE_PERSISTENT)
.finish();
self.channel.basic_publish(props, payload, publish_args).await.unwrap();
}
pub async fn consume<F, Fut>(&self, queue_name: &str, callback: F)
where
F: (Fn(BasicProperties, Vec<u8>) -> Fut) + 'static,
Fut: Future<Output = ()> + Send + 'static
{
let args = BasicConsumeArguments::new(&queue_name, "rabbitmq_util");
self.channel.basic_consume(CustomConsumer::new(args.no_ack, callback), args).await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
#[cfg(feature = "mq")]
pub struct CustomConsumer<F, Fut>
where
F: Fn(BasicProperties, Vec<u8>) -> Fut,
Fut: Future<Output = ()> + Send
{
no_ack: bool,
cb: F,
}
#[cfg(feature = "mq")]
impl <F, Fut> CustomConsumer<F, Fut>
where
F: Fn(BasicProperties, Vec<u8>) -> Fut,
Fut: Future<Output = ()> + Send
{
pub fn new(no_ack: bool, cb: F) -> Self {
Self {
no_ack,
cb
}
}
}
#[cfg(feature = "mq")]
unsafe impl <F, Fut> Send for CustomConsumer<F, Fut>
where
F: Fn(BasicProperties, Vec<u8>) -> Fut,
Fut: Future<Output = ()> + Send
{ }
#[cfg(feature = "mq")]
#[async_trait]
impl <F, Fut> AsyncConsumer for CustomConsumer<F, Fut>
where
F: Fn(BasicProperties, Vec<u8>) -> Fut,
Fut: Future<Output = ()> + Send
{
async fn consume(&mut self, channel: &Channel, deliver: Deliver, basic_properties: BasicProperties, content: Vec<u8>) {
(self.cb)(basic_properties, content).await;
self.no_ack = true;
if !self.no_ack {
let args = BasicAckArguments::new(deliver.delivery_tag(), true);
channel.basic_ack(args).await.unwrap();
}
}
}
#[cfg(feature = "mq")]
#[cfg(test)]
mod tests {
use crate::third::rabbitmq_util::RabbitMQUtil;
use std::time::Duration;
#[tokio::test]
async fn publish() {
let util = RabbitMQUtil::new("192.168.1.187", 5672, "guest", "guest").await;
util.declare_queue("hello").await;
util.queue_bind("amq.topic", "hello", "hello").await;
for _i in 0..10 {
let _x = util.publish("amq.topic", "hello", "Hello World, good!").await;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
#[tokio::test]
async fn consume() {
let util = RabbitMQUtil::new("192.168.1.187", 5672, "guest", "guest").await;
util.consume("hello", |basic_properties, content| async move {
println!("===============>{}", String::from_utf8(content).unwrap());
println!("===============>{:?}", basic_properties);
}).await;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}