use rabbitmq_http_client::{api::Client, commons::QueueType, requests::QueueParams};
use serde_json::{Map, Value, json};
use crate::test_helpers::{PASSWORD, USERNAME, async_rabbitmq_version_is_at_least, endpoint};
#[tokio::test]
async fn test_async_declare_and_redeclare_a_classic_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.cq.69373293479827";
let _ = rc.delete_queue(vhost, name, false).await;
let result1 = rc.get_queue_info(vhost, name).await;
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length".to_owned(), json!(10_000));
let optional_args = Some(map);
let params = QueueParams::new_durable_classic_queue(name, optional_args.clone());
let result2 = rc.declare_queue(vhost, ¶ms).await;
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let params2 = QueueParams::new(name, QueueType::Classic, true, false, optional_args.clone());
let result3 = rc.declare_queue(vhost, ¶ms2).await;
assert!(result3.is_ok(), "declare_queue returned {result3:?}");
let _ = rc.delete_queue(vhost, name, false).await;
}
#[tokio::test]
async fn test_async_declare_a_quorum_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.qq.182374982374";
let _ = rc.delete_queue(vhost, name, false).await;
let result1 = rc.get_queue_info(vhost, name).await;
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length".to_owned(), json!(10_000));
let optional_args = Some(map);
let params = QueueParams::new_quorum_queue(name, optional_args);
let result2 = rc.declare_queue(vhost, ¶ms).await;
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let _ = rc.delete_queue(vhost, name, false).await;
}
#[tokio::test]
async fn test_async_declare_a_stream_with_declare_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.qq.927348926347988623";
let _ = rc.delete_queue(vhost, name, false).await;
let result1 = rc.get_queue_info(vhost, name).await;
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length-bytes".to_owned(), json!(10_000_000));
let optional_args = Some(map);
let params = QueueParams::new_stream(name, optional_args);
let result2 = rc.declare_queue(vhost, ¶ms).await;
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let _ = rc.delete_queue(vhost, name, false).await;
}
#[tokio::test]
async fn test_async_delete_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.cq.982734982364982364896";
let _ = rc.delete_queue(vhost, name, false).await;
let result1 = rc.get_queue_info(vhost, name).await;
assert!(result1.is_err());
let params = QueueParams::new_durable_classic_queue(name, None);
let result2 = rc.declare_queue(vhost, ¶ms).await;
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
rc.delete_queue(vhost, name, false).await.unwrap();
rc.delete_queue(vhost, name, true).await.unwrap();
assert!(rc.delete_queue(vhost, name, false).await.is_err());
let result3 = rc.get_queue_info(vhost, name).await;
assert!(result3.is_err());
}
#[tokio::test]
async fn test_async_list_all_queues() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.23487866", None);
let result1 = rc.declare_queue(vh_name, ¶ms).await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::async_await_queue_metric_emission().await;
let result2 = rc.list_queues().await;
assert!(result2.is_ok(), "list_queues returned {result2:?}");
rc.delete_queue(vh_name, params.name, false).await.unwrap();
}
#[tokio::test]
async fn test_async_list_queues_in_a_virtual_host() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.64692734867", None);
let result1 = rc.declare_queue(vh_name, ¶ms).await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::async_await_queue_metric_emission().await;
let result2 = rc.list_queues_in(vh_name).await;
assert!(result2.is_ok(), "list_queues_in returned {result2:?}");
rc.delete_queue(vh_name, params.name, false).await.unwrap();
}
#[tokio::test]
async fn test_async_list_queues_with_details() {
if !async_rabbitmq_version_is_at_least(3, 13, 0).await {
return;
}
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.92734827364", None);
let result1 = rc.declare_queue(vh_name, ¶ms).await;
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::async_await_queue_metric_emission().await;
let result2 = rc.list_queues_with_details().await;
assert!(
result2.is_ok(),
"list_queues_with_details returned {result2:?}"
);
let detailed_queues = result2.unwrap();
assert!(
!detailed_queues.is_empty(),
"Expected at least one queue in detailed list"
);
let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
assert!(
test_queue.is_some(),
"Expected to find our test queue in detailed results"
);
let queue = test_queue.unwrap();
assert_eq!(queue.name, params.name);
assert_eq!(queue.vhost, vh_name);
assert_eq!(queue.durable, true);
if let Some(gc) = &queue.garbage_collection {
assert!(gc.fullsweep_after > 1000);
}
rc.delete_queue(vh_name, params.name, false).await.unwrap();
}