use rabbitmq_http_client::{
api::Client,
commons::BindingDestinationType,
requests::BindingDeletionParams,
requests::{ExchangeParams, QueueParams},
};
use crate::test_helpers::{PASSWORD, USERNAME, endpoint};
#[tokio::test]
async fn test_async_list_all_bindings() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let cq = "rust.cq.durable.1";
let fanout = "amq.fanout";
let result1 = rc
.declare_queue(vh_name, &QueueParams::new_durable_classic_queue(cq, None))
.await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
let result2 = rc.bind_queue(vh_name, cq, fanout, None, None).await;
assert!(result2.is_ok(), "bind_queue returned {result2:?}");
let result3 = rc.list_bindings().await;
assert!(result3.is_ok(), "list_bindings returned {result3:?}");
let vec = result3.unwrap();
assert!(
vec.iter()
.any(|b| b.destination == cq && b.source == fanout)
);
let result4 = rc.list_bindings_in(vh_name).await;
assert!(result4.is_ok(), "list_bindings_in returned {result4:?}");
let vec = result4.unwrap();
assert!(
vec.iter()
.any(|vh| vh.vhost == vh_name && vh.source == fanout)
);
let _ = rc.delete_queue(vh_name, cq, false).await;
}
#[tokio::test]
async fn test_async_list_only_queue_bindings() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let cq = "rust.cq.durable.2";
let fanout = "amq.fanout";
let result1 = rc
.declare_queue(vh_name, &QueueParams::new_durable_classic_queue(cq, None))
.await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
let result2 = rc.bind_queue(vh_name, cq, fanout, None, None).await;
assert!(result2.is_ok(), "bind_queue returned {result2:?}");
let result3 = rc.list_queue_bindings(vh_name, cq).await;
assert!(result3.is_ok(), "list_queue_bindings returned {result3:?}");
let vec = result3.unwrap();
assert!(
vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Queue
&& b.vhost == vh_name
&& b.destination == cq
&& b.source == fanout)
);
let _ = rc.delete_queue(vh_name, cq, false).await;
}
#[tokio::test]
async fn test_async_list_only_exchange_bindings() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let cq = "rust.cq.durable.3";
let fanout1 = "amq.fanout";
let fanout2 = "rust.x.fanout";
let result1 = rc
.declare_exchange(
vh_name,
&ExchangeParams::fanout(fanout2, false, false, None),
)
.await;
assert!(result1.is_ok(), "declare_exchange returned {result1:?}");
let result2 = rc
.bind_exchange(vh_name, fanout1, fanout2, None, None)
.await;
assert!(result2.is_ok(), "bind_exchange returned {result2:?}");
let result3 = rc
.declare_queue(vh_name, &QueueParams::new_durable_classic_queue(cq, None))
.await;
assert!(result3.is_ok(), "declare_queue returned {result3:?}");
let result4 = rc.bind_queue(vh_name, cq, fanout1, None, None).await;
assert!(result4.is_ok(), "bind_queue returned {result4:?}");
let result5 = rc
.list_exchange_bindings_with_source(vh_name, fanout2)
.await;
assert!(
result5.is_ok(),
"list_exchange_bindings_with_source returned {result5:?}"
);
let vec = result5.unwrap();
assert!(
!vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Queue)
);
assert!(
vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Exchange
&& b.vhost == vh_name
&& b.destination == fanout1
&& b.source == fanout2)
);
let result6 = rc
.list_exchange_bindings_with_destination(vh_name, fanout1)
.await;
assert!(
result6.is_ok(),
"list_exchange_bindings_with_destination returned {result6:?}"
);
let vec = result6.unwrap();
assert!(
!vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Queue)
);
assert!(
vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Exchange
&& b.vhost == vh_name
&& b.destination == fanout1
&& b.source == fanout2)
);
let _ = rc.delete_queue(vh_name, cq, false).await;
let _ = rc.delete_exchange(vh_name, fanout2, false).await;
}
#[tokio::test]
async fn test_async_delete_queue_bindings() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let cq = "rust.cq.delete_queue_binding";
let fanout = "amq.fanout";
let result1 = rc
.declare_queue(vh_name, &QueueParams::new_durable_classic_queue(cq, None))
.await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
let result2 = rc.bind_queue(vh_name, cq, fanout, Some("foo"), None).await;
assert!(result2.is_ok(), "bind_queue returned {result2:?}");
let result3 = rc.list_queue_bindings(vh_name, cq).await;
assert!(result3.is_ok(), "list_queue_bindings returned {result3:?}");
let vec = result3.unwrap();
assert!(
vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Queue
&& b.vhost == vh_name
&& b.destination == cq
&& b.source == fanout)
);
let m: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
let bd_params = BindingDeletionParams {
virtual_host: vh_name,
source: fanout,
destination: cq,
destination_type: BindingDestinationType::Queue,
routing_key: "foo",
arguments: Some(m.clone()),
};
let result4 = rc.delete_binding(&bd_params, false).await;
assert!(result4.is_ok(), "delete_binding returned {result4:?}");
let result = rc.delete_binding(&bd_params, true).await;
assert!(result.is_ok());
let result = rc.delete_binding(&bd_params, false).await;
assert!(result.is_err());
let result5 = rc.list_queue_bindings(vh_name, cq).await;
assert!(result5.is_ok(), "list_queue_bindings returned {result5:?}");
let vec = result5.unwrap();
assert!(
!vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Queue
&& b.vhost == vh_name
&& b.destination == cq
&& b.source == fanout)
);
let _ = rc.delete_queue(vh_name, cq, false).await;
}
#[tokio::test]
async fn test_async_delete_exchange_bindings() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let fanout = "amq.fanout";
let direct = "amq.direct";
let result2 = rc
.bind_exchange(vh_name, direct, fanout, Some("foo"), None)
.await;
assert!(result2.is_ok(), "bind_queue returned {result2:?}");
let result3 = rc
.list_exchange_bindings_with_destination(vh_name, direct)
.await;
assert!(
result3.is_ok(),
"list_exchange_bindings_with_destination returned {result3:?}"
);
let vec = result3.unwrap();
assert!(
vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Exchange
&& b.vhost == vh_name
&& b.destination == direct
&& b.source == fanout)
);
let m: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
let bd_params = BindingDeletionParams {
virtual_host: vh_name,
source: fanout,
destination: direct,
destination_type: BindingDestinationType::Exchange,
routing_key: "foo",
arguments: Some(m.clone()),
};
let result4 = rc.delete_binding(&bd_params, false).await;
assert!(result4.is_ok(), "delete_binding returned {result4:?}");
let result = rc.delete_binding(&bd_params, true).await;
assert!(result.is_ok());
let result = rc.delete_binding(&bd_params, false).await;
assert!(result.is_err());
let result5 = rc
.list_exchange_bindings_with_destination(vh_name, direct)
.await;
assert!(
result5.is_ok(),
"list_exchange_bindings_with_destination returned {result5:?}"
);
let vec = result5.unwrap();
assert!(
!vec.iter()
.any(|b| b.destination_type == BindingDestinationType::Exchange
&& b.vhost == vh_name
&& b.destination == direct
&& b.source == fanout)
);
}