use amqp_serde::types::AmqpMessageCount;
use super::Channel;
use crate::{
api::{error::Error, FieldTable, Result},
frame::{
BindQueue, BindQueueOk, DeclareQueue, DeclareQueueOk, DeleteQueue, DeleteQueueOk, Frame,
PurgeQueue, PurgeQueueOk, UnbindQueue, UnbindQueueOk,
},
};
#[cfg(feature = "compliance_assert")]
use crate::api::compliance_asserts::{assert_exchange_name, assert_queue_name};
#[derive(Debug, Clone, Default)]
pub struct QueueDeclareArguments {
queue: String,
passive: bool,
durable: bool,
exclusive: bool,
auto_delete: bool,
no_wait: bool,
arguments: FieldTable,
}
impl QueueDeclareArguments {
pub fn new(queue: &str) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(queue);
Self {
queue: queue.to_owned(),
passive: false,
durable: false,
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: FieldTable::new(),
}
}
pub fn durable_client_named(queue: &str) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(queue);
Self {
queue: queue.to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: FieldTable::new(),
}
}
pub fn exclusive_server_named() -> Self {
Self {
queue: "".to_owned(),
passive: false,
durable: false,
exclusive: true,
auto_delete: false,
no_wait: false,
arguments: FieldTable::new(),
}
}
pub fn transient_autodelete(queue: &str) -> Self {
Self {
queue: queue.to_owned(),
passive: false,
durable: false,
exclusive: false,
auto_delete: true,
no_wait: false,
arguments: FieldTable::new(),
}
}
impl_chainable_setter! {
queue, String
}
impl_chainable_setter! {
passive, bool
}
impl_chainable_setter! {
durable, bool
}
impl_chainable_setter! {
exclusive, bool
}
impl_chainable_setter! {
auto_delete, bool
}
impl_chainable_setter! {
no_wait, bool
}
impl_chainable_setter! {
arguments, FieldTable
}
pub fn finish(&mut self) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(&self.queue);
self.clone()
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueBindArguments {
pub queue: String,
pub exchange: String,
pub routing_key: String,
pub no_wait: bool,
pub arguments: FieldTable,
}
impl QueueBindArguments {
pub fn new(queue: &str, exchange: &str, routing_key: &str) -> Self {
#[cfg(feature = "compliance_assert")]
{
assert_queue_name(queue);
assert_exchange_name(exchange);
}
Self {
queue: queue.to_owned(),
exchange: exchange.to_owned(),
routing_key: routing_key.to_owned(),
no_wait: false,
arguments: FieldTable::new(),
}
}
impl_chainable_setter! {
queue, String
}
impl_chainable_setter! {
exchange, String
}
impl_chainable_setter! {
routing_key, String
}
impl_chainable_setter! {
no_wait, bool
}
impl_chainable_setter! {
arguments, FieldTable
}
pub fn finish(&mut self) -> Self {
#[cfg(feature = "compliance_assert")]
{
assert_queue_name(&self.queue);
assert_exchange_name(&self.exchange);
}
self.clone()
}
}
#[derive(Debug, Clone, Default)]
pub struct QueuePurgeArguments {
pub queue: String,
pub no_wait: bool,
}
impl QueuePurgeArguments {
pub fn new(queue: &str) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(queue);
Self {
queue: queue.to_owned(),
no_wait: false,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueDeleteArguments {
pub queue: String,
pub if_unused: bool,
pub if_empty: bool,
pub no_wait: bool,
}
impl QueueDeleteArguments {
pub fn new(queue: &str) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(queue);
Self {
queue: queue.to_owned(),
if_unused: false,
if_empty: false,
no_wait: false,
}
}
impl_chainable_setter! {
queue, String
}
impl_chainable_setter! {
if_unused, bool
}
impl_chainable_setter! {
if_empty, bool
}
impl_chainable_setter! {
no_wait, bool
}
pub fn finish(&mut self) -> Self {
#[cfg(feature = "compliance_assert")]
assert_queue_name(&self.queue);
self.clone()
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueUnbindArguments {
pub queue: String,
pub exchange: String,
pub routing_key: String,
pub arguments: FieldTable,
}
impl QueueUnbindArguments {
pub fn new(queue: &str, exchange: &str, routing_key: &str) -> Self {
#[cfg(feature = "compliance_assert")]
{
assert_queue_name(queue);
assert_exchange_name(exchange);
}
Self {
queue: queue.to_owned(),
exchange: exchange.to_owned(),
routing_key: routing_key.to_owned(),
arguments: FieldTable::new(),
}
}
impl_chainable_setter! {
queue, String
}
impl_chainable_setter! {
exchange, String
}
impl_chainable_setter! {
routing_key, String
}
impl_chainable_setter! {
arguments, FieldTable
}
pub fn finish(&mut self) -> Self {
#[cfg(feature = "compliance_assert")]
{
assert_queue_name(&self.queue);
assert_exchange_name(&self.exchange);
}
self.clone()
}
}
impl Channel {
pub async fn queue_declare(
&self,
args: QueueDeclareArguments,
) -> Result<Option<(String, AmqpMessageCount, u32)>> {
let mut declare = DeclareQueue::new(0, args.queue.try_into().unwrap(), args.arguments);
declare.set_passive(args.passive);
declare.set_durable(args.durable);
declare.set_exclusive(args.exclusive);
declare.set_auto_delete(args.auto_delete);
declare.set_no_wait(args.no_wait);
if args.no_wait {
self.shared
.outgoing_tx
.send((self.channel_id(), declare.into_frame()))
.await?;
Ok(None)
} else {
let responder_rx = self.register_responder(DeclareQueueOk::header()).await?;
let declare_ok = synchronous_request!(
self.shared.outgoing_tx,
(self.channel_id(), declare.into_frame()),
responder_rx,
Frame::DeclareQueueOk,
Error::ChannelUseError
)?;
Ok(Some((
declare_ok.queue.into(),
declare_ok.message_count,
declare_ok.consumer_count,
)))
}
}
pub async fn queue_bind(&self, args: QueueBindArguments) -> Result<()> {
let bind = BindQueue::new(
0,
args.queue.try_into().unwrap(),
args.exchange.try_into().unwrap(),
args.routing_key.try_into().unwrap(),
args.no_wait,
args.arguments,
);
if args.no_wait {
self.shared
.outgoing_tx
.send((self.channel_id(), bind.into_frame()))
.await?;
} else {
let responder_rx = self.register_responder(BindQueueOk::header()).await?;
synchronous_request!(
self.shared.outgoing_tx,
(self.channel_id(), bind.into_frame()),
responder_rx,
Frame::BindQueueOk,
Error::ChannelUseError
)?;
}
Ok(())
}
pub async fn queue_purge(&self, args: QueuePurgeArguments) -> Result<Option<AmqpMessageCount>> {
let purge = PurgeQueue::new(0, args.queue.try_into().unwrap(), args.no_wait);
if args.no_wait {
self.shared
.outgoing_tx
.send((self.channel_id(), purge.into_frame()))
.await?;
Ok(None)
} else {
let responder_rx = self.register_responder(PurgeQueueOk::header()).await?;
let purge_ok = synchronous_request!(
self.shared.outgoing_tx,
(self.channel_id(), purge.into_frame()),
responder_rx,
Frame::PurgeQueueOk,
Error::ChannelUseError
)?;
Ok(Some(purge_ok.message_count))
}
}
pub async fn queue_delete(
&self,
args: QueueDeleteArguments,
) -> Result<Option<AmqpMessageCount>> {
let mut delete = DeleteQueue::new(0, args.queue.try_into().unwrap());
delete.set_if_unused(args.if_unused);
delete.set_if_empty(args.if_empty);
delete.set_no_wait(args.no_wait);
if args.no_wait {
self.shared
.outgoing_tx
.send((self.channel_id(), delete.into_frame()))
.await?;
Ok(None)
} else {
let responder_rx = self.register_responder(DeleteQueueOk::header()).await?;
let delete_ok = synchronous_request!(
self.shared.outgoing_tx,
(self.channel_id(), delete.into_frame()),
responder_rx,
Frame::DeleteQueueOk,
Error::ChannelUseError
)?;
Ok(Some(delete_ok.message_count))
}
}
pub async fn queue_unbind(&self, args: QueueUnbindArguments) -> Result<()> {
let unbind = UnbindQueue::new(
0,
args.queue.try_into().unwrap(),
args.exchange.try_into().unwrap(),
args.routing_key.try_into().unwrap(),
args.arguments,
);
let responder_rx = self.register_responder(UnbindQueueOk::header()).await?;
synchronous_request!(
self.shared.outgoing_tx,
(self.channel_id(), unbind.into_frame()),
responder_rx,
Frame::UnbindQueueOk,
Error::ChannelUseError
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
connection::{Connection, OpenConnectionArguments},
};
use super::{
QueueBindArguments, QueueDeclareArguments, QueueDeleteArguments, QueuePurgeArguments,
QueueUnbindArguments,
};
#[tokio::test]
async fn test_queue_apis() {
let args = OpenConnectionArguments::new("localhost", 5672, "user", "bitnami");
let connection = Connection::open(&args).await.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
let (queue_name, ..) = channel
.queue_declare(QueueDeclareArguments::default())
.await
.unwrap()
.unwrap();
channel
.queue_bind(QueueBindArguments::new(
&queue_name,
"amq.topic",
"eiffel.#",
))
.await
.unwrap();
channel
.queue_purge(QueuePurgeArguments::new(&queue_name))
.await
.unwrap();
channel
.queue_unbind(QueueUnbindArguments::new(
&queue_name,
"amq.topic",
"eiffel.#",
))
.await
.unwrap();
channel
.queue_delete(QueueDeleteArguments::new(&queue_name))
.await
.unwrap();
channel.close().await.unwrap();
connection.close().await.unwrap();
}
}