use rabbitmq_http_client::{
blocking_api::Client,
commons::{PaginationParams, QueueType},
requests::QueueParams,
};
use serde_json::{Map, Value, json};
use crate::test_helpers::{PASSWORD, USERNAME, endpoint, rabbitmq_version_is_at_least};
#[test]
fn test_blocking_declare_and_redeclare_a_classic_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.cq.69373293479827";
let _ = rc.delete_queue(vhost, name, false);
let result1 = rc.get_queue_info(vhost, name);
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length".to_owned(), json!(10_000));
let optional_args = Some(map);
let params = QueueParams::new_durable_classic_queue(name, optional_args.clone());
let result2 = rc.declare_queue(vhost, ¶ms);
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let params2 = QueueParams::new(name, QueueType::Classic, true, false, optional_args.clone());
let result3 = rc.declare_queue(vhost, ¶ms2);
assert!(result3.is_ok(), "declare_queue returned {result3:?}");
let _ = rc.delete_queue(vhost, name, false);
}
#[test]
fn test_blocking_declare_a_quorum_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.qq.182374982374";
let _ = rc.delete_queue(vhost, name, false);
let result1 = rc.get_queue_info(vhost, name);
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length".to_owned(), json!(10_000));
let optional_args = Some(map);
let params = QueueParams::new_quorum_queue(name, optional_args);
let result2 = rc.declare_queue(vhost, ¶ms);
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let _ = rc.delete_queue(vhost, name, false);
}
#[test]
fn test_blocking_declare_a_stream_with_declare_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.qq.927348926347988623";
let _ = rc.delete_queue(vhost, name, false);
let result1 = rc.get_queue_info(vhost, name);
assert!(result1.is_err());
let mut map = Map::<String, Value>::new();
map.insert("x-max-length-bytes".to_owned(), json!(10_000_000));
let optional_args = Some(map);
let params = QueueParams::new_stream(name, optional_args);
let result2 = rc.declare_queue(vhost, ¶ms);
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
let _ = rc.delete_queue(vhost, name, false);
}
#[test]
fn test_blocking_delete_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.cq.982734982364982364896";
let _ = rc.delete_queue(vhost, name, false);
let result1 = rc.get_queue_info(vhost, name);
assert!(result1.is_err());
let params = QueueParams::new_durable_classic_queue(name, None);
let result2 = rc.declare_queue(vhost, ¶ms);
assert!(result2.is_ok(), "declare_queue returned {result2:?}");
rc.delete_queue(vhost, name, false).unwrap();
rc.delete_queue(vhost, name, true).unwrap();
assert!(rc.delete_queue(vhost, name, false).is_err());
let result3 = rc.get_queue_info(vhost, name);
assert!(result3.is_err());
}
#[test]
fn test_blocking_list_all_queues() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.23487866", None);
let result1 = rc.declare_queue(vh_name, ¶ms);
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::await_queue_metric_emission();
let result2 = rc.list_queues();
assert!(result2.is_ok(), "list_queues returned {result2:?}");
rc.delete_queue(vh_name, params.name, false).unwrap();
}
#[test]
fn test_blocking_list_queues_in_a_virtual_host() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.64692734867", None);
let result1 = rc.declare_queue(vh_name, ¶ms);
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::await_queue_metric_emission();
let result2 = rc.list_queues_in(vh_name);
assert!(result2.is_ok(), "list_queues_in returned {result2:?}");
rc.delete_queue(vh_name, params.name, false).unwrap();
}
#[test]
pub fn test_blocking_list_queues_with_details() {
if !rabbitmq_version_is_at_least(3, 13, 0) {
return;
}
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh_name = "/";
let params =
QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.blocking.18273486", None);
let result1 = rc.declare_queue(vh_name, ¶ms);
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
crate::test_helpers::await_queue_metric_emission();
let result2 = rc.list_queues_with_details();
assert!(
result2.is_ok(),
"list_queues_with_details returned {result2:?}"
);
let detailed_queues = result2.unwrap();
assert!(
!detailed_queues.is_empty(),
"Expected at least one queue in detailed list"
);
let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
assert!(
test_queue.is_some(),
"Expected to find our test queue in detailed results"
);
let queue = test_queue.unwrap();
assert_eq!(queue.name, params.name);
assert_eq!(queue.vhost, vh_name);
assert_eq!(queue.durable, true);
if let Some(gc) = &queue.garbage_collection {
assert!(gc.fullsweep_after > 1000);
}
rc.delete_queue(vh_name, params.name, false).unwrap();
}
#[test]
fn test_blocking_list_queues_paged() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let params = PaginationParams::first_page(10);
let result = rc.list_queues_paged(¶ms);
assert!(result.is_ok(), "list_queues_paged returned {result:?}");
let result_in = rc.list_queues_in_paged(vhost, ¶ms);
assert!(
result_in.is_ok(),
"list_queues_in_paged returned {result_in:?}"
);
}
#[test]
fn test_blocking_list_quorum_queues() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.qq.list_quorum_queues.blocking";
let _ = rc.delete_queue(vhost, name, true);
let params = QueueParams::new_quorum_queue(name, None);
rc.declare_queue(vhost, ¶ms).unwrap();
crate::test_helpers::await_queue_metric_emission();
let result = rc.list_quorum_queues();
assert!(result.is_ok(), "list_quorum_queues returned {result:?}");
let queues = result.unwrap();
assert!(queues.iter().any(|q| q.name == name));
let result_in = rc.list_quorum_queues_in(vhost);
assert!(
result_in.is_ok(),
"list_quorum_queues_in returned {result_in:?}"
);
let queues_in = result_in.unwrap();
assert!(queues_in.iter().any(|q| q.name == name));
rc.delete_queue(vhost, name, false).unwrap();
}
#[test]
fn test_blocking_list_classic_queues() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.cq.list_classic_queues.blocking";
let _ = rc.delete_queue(vhost, name, true);
let params = QueueParams::new_durable_classic_queue(name, None);
rc.declare_queue(vhost, ¶ms).unwrap();
crate::test_helpers::await_queue_metric_emission();
let result = rc.list_classic_queues();
assert!(result.is_ok(), "list_classic_queues returned {result:?}");
let queues = result.unwrap();
assert!(queues.iter().any(|q| q.name == name));
let result_in = rc.list_classic_queues_in(vhost);
assert!(
result_in.is_ok(),
"list_classic_queues_in returned {result_in:?}"
);
let queues_in = result_in.unwrap();
assert!(queues_in.iter().any(|q| q.name == name));
rc.delete_queue(vhost, name, false).unwrap();
}
#[test]
fn test_blocking_list_streams() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.stream.list_streams.blocking";
let _ = rc.delete_queue(vhost, name, true);
let params = QueueParams::new_stream(name, None);
rc.declare_queue(vhost, ¶ms).unwrap();
crate::test_helpers::await_queue_metric_emission();
let result = rc.list_streams();
assert!(result.is_ok(), "list_streams returned {result:?}");
let streams = result.unwrap();
assert!(streams.iter().any(|q| q.name == name));
let result_in = rc.list_streams_in(vhost);
assert!(result_in.is_ok(), "list_streams_in returned {result_in:?}");
let streams_in = result_in.unwrap();
assert!(streams_in.iter().any(|q| q.name == name));
rc.delete_queue(vhost, name, false).unwrap();
}
#[test]
fn test_blocking_delete_queues_bulk() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let names = [
"rust.tests.cq.bulk.1",
"rust.tests.cq.bulk.2",
"rust.tests.cq.bulk.3",
];
for name in &names {
let _ = rc.delete_queue(vhost, name, true);
let params = QueueParams::new_durable_classic_queue(name, None);
rc.declare_queue(vhost, ¶ms).unwrap();
}
let result = rc.delete_queues(vhost, &names, false);
assert!(result.is_ok(), "delete_queues returned {result:?}");
for name in &names {
let info = rc.get_queue_info(vhost, name);
assert!(info.is_err(), "Queue {} should have been deleted", name);
}
let result_idempotent = rc.delete_queues(vhost, &names, true);
assert!(
result_idempotent.is_ok(),
"Idempotent delete_queues should succeed"
);
}