use crate::reply::FutureRpcReply;
use futures_util::stream::StreamExt;
use lapin::{
message::Delivery,
options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
types::FieldTable,
BasicProperties, Channel, Connection, ConnectionProperties,
};
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_amqp::*;
use uuid::Uuid;
#[derive(Debug)]
pub struct RmqRpcClient {
conn: Connection,
channel: Channel,
fast_reply_task: tokio::task::JoinHandle<()>,
replies: Arc<Mutex<BTreeMap<String, FutureRpcReply<Delivery>>>>,
}
impl RmqRpcClient {
pub async fn connect(url: &str) -> Result<Self, lapin::Error> {
let conn = Connection::connect(url, ConnectionProperties::default().with_tokio()).await?;
let channel = conn.create_channel().await?;
let replies: Arc<Mutex<BTreeMap<String, FutureRpcReply<Delivery>>>> =
Arc::new(Mutex::new(BTreeMap::new()));
let cloned_replies = replies.clone();
let mut consumer = channel
.basic_consume(
"amq.rabbitmq.reply-to",
"",
BasicConsumeOptions {
no_ack: true,
..BasicConsumeOptions::default()
},
FieldTable::default(),
)
.await
.unwrap();
let fast_reply_task = tokio::spawn(async move {
while let Some(delivery) = consumer.next().await {
let (_, delivery) = delivery.unwrap();
match delivery.properties.correlation_id() {
None => unimplemented!(), Some(v) => match cloned_replies.lock().await.remove(&v.to_string()) {
Some(v) => v.resolve(delivery).await,
None => unimplemented!(), },
};
}
});
Ok(Self {
conn,
channel,
fast_reply_task,
replies,
})
}
pub async fn declare_queue(&self, queue: &str) -> Result<&Self, lapin::Error> {
self.channel
.queue_declare(queue, QueueDeclareOptions::default(), FieldTable::default())
.await?;
Ok(self)
}
pub async fn send_message(
&self,
routing_key: &str,
payload: Vec<u8>,
) -> Result<Delivery, lapin::Error> {
let correlation_id = Uuid::new_v4().to_string();
self.channel
.basic_publish(
"",
routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default()
.with_reply_to("amq.rabbitmq.reply-to".into())
.with_correlation_id(correlation_id.clone().into()),
)
.await?;
let reply_fut = FutureRpcReply::new();
self.replies
.lock()
.await
.insert(correlation_id, reply_fut.clone());
Ok(reply_fut.await)
}
pub async fn send_message_without_reply(
&self,
routing_key: &str,
payload: Vec<u8>,
) -> Result<(), lapin::Error> {
self.channel
.basic_publish(
"",
routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await?;
Ok(())
}
}
#[cfg(test)]
mod connect {
use super::*;
#[tokio::test]
async fn returns_client_if_connected_successfully() {
let got = RmqRpcClient::connect("amqp://127.0.0.1:5672").await;
assert!(got.is_ok())
}
#[tokio::test]
async fn errored_if_something_went_wrong() {
let got = RmqRpcClient::connect("amqp://127.0.0.1:5673").await;
assert!(got.is_err())
}
}
#[cfg(test)]
mod send_message {
use super::*;
use test_utils::{Queue, PING, PONG, URL};
#[tokio::test]
async fn message_sent() {
let queue = Queue::with_auto_reply().await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message(&queue.name, PING.as_bytes().to_vec())
.await
.unwrap();
let delivery = &queue.processed_deliveries().await[0];
assert_eq!(String::from_utf8_lossy(&delivery.data), PING.to_owned())
}
#[tokio::test]
async fn message_has_reply_to_queue() {
let queue = Queue::with_auto_reply().await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message(&queue.name, PING.as_bytes().to_vec())
.await
.unwrap();
let delivery = &queue.processed_deliveries().await[0];
assert!(delivery.properties.reply_to().is_some());
assert!(delivery
.properties
.reply_to()
.clone()
.unwrap()
.to_string()
.contains("amq.rabbitmq.reply-to"));
}
#[tokio::test]
async fn message_has_correlation_id() {
let queue = Queue::with_auto_reply().await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message(&queue.name, PONG.as_bytes().to_vec())
.await
.unwrap();
let delivery = &queue.processed_deliveries().await[0];
assert!(delivery.properties.correlation_id().is_some());
}
#[tokio::test]
async fn returns_response() {
let queue = Queue::with_auto_reply().await;
let client = RmqRpcClient::connect(URL).await.unwrap();
let got = client
.send_message(&queue.name, PING.as_bytes().to_vec())
.await;
assert!(got.is_ok());
assert_eq!(
String::from_utf8_lossy(&got.unwrap().data).to_string(),
PONG
)
}
}
#[cfg(test)]
mod send_message_without_reply {
use super::*;
use test_utils::{wait_a_moment, Queue, PING, PONG, URL};
#[tokio::test]
async fn message_sent() {
let queue = Queue::spawn_server(false).await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message_without_reply(&queue.name, PING.as_bytes().to_vec())
.await
.unwrap();
wait_a_moment().await;
let delivery = &queue.processed_deliveries().await[0];
assert_eq!(String::from_utf8_lossy(&delivery.data), PING.to_owned())
}
#[tokio::test]
async fn message_has_no_reply_to_queue() {
let queue = Queue::spawn_server(false).await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message_without_reply(&queue.name, PING.as_bytes().to_vec())
.await
.unwrap();
wait_a_moment().await;
let delivery = &queue.processed_deliveries().await[0];
assert!(delivery.properties.reply_to().is_none());
}
#[tokio::test]
async fn message_has_no_correlation_id() {
let queue = Queue::spawn_server(false).await;
let client = RmqRpcClient::connect(URL).await.unwrap();
client
.send_message_without_reply(&queue.name, PONG.as_bytes().to_vec())
.await
.unwrap();
wait_a_moment().await;
let delivery = &queue.processed_deliveries().await[0];
assert!(delivery.properties.correlation_id().is_none());
}
}