use rabbitmq_http_client::blocking_api::Client;
use crate::test_helpers::{
PASSWORD, USERNAME, await_metric_emission, await_queue_metric_emission, endpoint,
};
use rabbitmq_http_client::commons::PolicyTarget;
use rabbitmq_http_client::requests::{
ExchangeParams, PolicyParams, QueueParams, VirtualHostParams,
};
use serde_json::{Map, Value, json};
#[test]
fn test_blocking_export_definitions_as_string() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let result = rc.export_cluster_wide_definitions_as_string();
assert!(
result.is_ok(),
"export_definitions_as_string returned {result:?}"
);
}
#[test]
fn test_blocking_export_cluster_wide_definitions_as_data() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust/http/api/blocking/definitions";
rc.delete_vhost(vh, true).unwrap();
let vh_params = VirtualHostParams::named(vh);
rc.create_vhost(&vh_params).unwrap();
let x_name = "definitions_test.x.fanout";
let mut x_args_m = Map::<String, Value>::new();
x_args_m.insert("x-alternate-exchange".to_owned(), json!("amq.fanout"));
let x_args = Some(x_args_m);
let xp = ExchangeParams::durable_fanout(x_name, x_args);
rc.declare_exchange(vh_params.name, &xp).unwrap();
let qq_pol_name = "definitions_test.policies.qq.length";
let mut qq_pol_def_m = Map::<String, Value>::new();
qq_pol_def_m.insert("max-length".to_string(), json!(99));
let pol_result = rc.declare_policy(&PolicyParams {
vhost: vh_params.name,
name: qq_pol_name,
pattern: "definitions.qq.limited",
apply_to: PolicyTarget::QuorumQueues,
priority: 1,
definition: qq_pol_def_m,
});
assert!(pol_result.is_ok());
let q_name = "definitions_test.qq.test_export_definitions_as_data";
let q_result = rc.declare_queue(
vh_params.name,
&QueueParams::new_durable_classic_queue(q_name, None),
);
assert!(q_result.is_ok(), "failed to declare queue {q_name}");
let _ = rc.bind_queue(vh_params.name, q_name, x_name, None, None);
await_metric_emission(1000);
let result = rc.export_cluster_wide_definitions_as_data();
assert!(
result.is_ok(),
"export_definitions_as_data returned {result:?}"
);
let defs = result.unwrap();
assert!(
!defs.virtual_hosts.is_empty(),
"expected more than zero virtual hosts in definitions"
);
assert!(
!defs.users.is_empty(),
"expected more than zero users in definitions"
);
assert!(
!defs.exchanges.is_empty(),
"expected more than zero exchanges in definitions"
);
let x_found = defs.exchanges.iter().any(|x| x.name == x_name);
assert!(x_found, "expected to find exchange {x_name} in definitions");
let qq_pol_found = defs.policies.iter().any(|p| p.name == qq_pol_name);
assert!(
qq_pol_found,
"expected to find policy {qq_pol_name} in definitions"
);
let b_found = defs
.bindings
.iter()
.any(|b| b.destination_type == "queue".into() && b.destination == q_name);
assert!(
b_found,
"expected to find a binding for queue {q_name} in definitions"
);
rc.delete_exchange(vh, x_name, false).unwrap();
rc.delete_policy(vh, qq_pol_name, false).unwrap();
rc.delete_policy(vh, qq_pol_name, true).unwrap();
assert!(rc.delete_policy(vh, qq_pol_name, false).is_err());
rc.delete_vhost(vh, true).unwrap();
}
#[test]
fn test_blocking_export_vhost_definitions_as_data() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust/http/api/blocking/vhost.definitions";
rc.delete_vhost(vh, true).unwrap();
let vh_params = VirtualHostParams::named(vh);
rc.create_vhost(&vh_params).unwrap();
let x_name = "vhost.definitions_test.x.fanout";
let mut x_args_m = Map::<String, Value>::new();
x_args_m.insert("x-alternate-exchange".to_owned(), json!("amq.fanout"));
let x_args = Some(x_args_m);
let xp = ExchangeParams::durable_fanout(x_name, x_args);
rc.declare_exchange(vh_params.name, &xp).unwrap();
let qq_pol_name = "vhost.definitions_test.policies.qq.length";
let mut qq_pol_def_m = Map::<String, Value>::new();
qq_pol_def_m.insert("max-length".to_string(), json!(99));
let pol_result = rc.declare_policy(&PolicyParams {
vhost: vh_params.name,
name: qq_pol_name,
pattern: "vhost.definitions.qq.limited",
apply_to: PolicyTarget::QuorumQueues,
priority: 1,
definition: qq_pol_def_m,
});
assert!(pol_result.is_ok());
let q_name = "vhost.definitions_test.qq.test_export_definitions_as_data";
let q_result = rc.declare_queue(
vh_params.name,
&QueueParams::new_durable_classic_queue(q_name, None),
);
assert!(q_result.is_ok(), "failed to declare queue {q_name}");
let _ = rc.bind_queue(vh_params.name, q_name, x_name, None, None);
await_metric_emission(1000);
let result = rc.export_vhost_definitions_as_data(vh);
assert!(
result.is_ok(),
"export_definitions_as_data returned {result:?}"
);
let defs = result.unwrap();
assert!(
!defs.exchanges.is_empty(),
"expected more than zero exchanges in definitions"
);
let x_found = defs.exchanges.iter().any(|x| x.name == x_name);
assert!(x_found, "expected to find exchange {x_name} in definitions");
let qq_pol_found = defs.policies.iter().any(|p| p.name == qq_pol_name);
assert!(
qq_pol_found,
"expected to find policy {qq_pol_name} in definitions"
);
let b_found = defs
.bindings
.iter()
.any(|b| b.destination_type == "queue".into() && b.destination == q_name);
assert!(
b_found,
"expected to find a binding for queue {q_name} in definitions"
);
rc.delete_exchange(vh, x_name, false).unwrap();
rc.delete_policy(vh, qq_pol_name, false).unwrap();
rc.delete_policy(vh, qq_pol_name, true).unwrap();
assert!(rc.delete_policy(vh, qq_pol_name, false).is_err());
rc.delete_vhost(vh, true).unwrap();
}
#[test]
fn test_blocking_import_cluster_definitions() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "/";
let queue_name = "test_blocking_import_cluster_definitions";
let _ = rc.delete_queue(vh, queue_name, false);
let defs = json!({ "queues": [
{
"auto_delete": false,
"durable": true,
"name": queue_name,
"vhost": vh
}
]});
let result = rc.import_cluster_wide_definitions(defs);
assert!(
result.is_ok(),
"import_cluster_wide_definitions returned {result:?}"
);
let result1 = rc.get_queue_info(vh, queue_name);
assert!(
result1.is_ok(),
"can't get the imported import_cluster_wide_definitions: {result1:?}"
);
rc.delete_queue(vh, queue_name, true).unwrap();
}
#[test]
fn test_blocking_import_vhost_definitions() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust/http/api/blocking/vhost.definitions.import";
rc.delete_vhost(vh, true).unwrap();
let vh_params = VirtualHostParams::named(vh);
rc.create_vhost(&vh_params).unwrap();
let q = "imported_queue";
let defs = json!({ "queues": [
{
"auto_delete": false,
"durable": true,
"name": q,
}
]});
let result = rc.import_vhost_definitions(vh, defs);
assert!(
result.is_ok(),
"import_vhost_definitions returned {result:?}"
);
await_queue_metric_emission();
let result1 = rc.get_queue_info(vh, q);
assert!(result1.is_ok(), "can't get the imported queue: {result1:?}");
rc.delete_vhost(vh, true).unwrap();
}