use rabbitmq_http_client::commons::{MessageTransferAcknowledgementMode, QueueType};
use rabbitmq_http_client::requests::{
ExchangeFederationParams, FederationUpstreamParams, OwnedFederationUpstreamParams,
QueueFederationParams,
};
use rabbitmq_http_client::{blocking_api::Client, requests::VirtualHostParams};
use crate::test_helpers::{
PASSWORD, USERNAME, amqp_endpoint_with_vhost, endpoint, rabbitmq_version_is_at_least,
};
#[test]
fn test_blocking_declare_a_federation_upstream_with_queue_federation_parameters() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust.http.api.blocking.test_blocking_declare_a_federation_upstream_with_queue_federation_parameters";
let name = "upstream.1";
let q = "test_blocking_declare_a_federation_upstream";
let vh_params = VirtualHostParams::named(vh);
let result1 = rc.create_vhost(&vh_params);
assert!(result1.is_ok());
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
let params = QueueFederationParams::new(q);
let upstream_params =
FederationUpstreamParams::new_queue_federation_upstream(vh, name, &amqp_endpoint, params);
let result2 = rc.declare_federation_upstream(upstream_params);
assert!(result2.is_ok());
let _ = rc.delete_vhost(vh_params.name, false);
}
#[test]
fn test_blocking_federation_upstream_fetch_and_update_workflow() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust.http.api.blocking.test_federation_upstream_fetch_and_update_workflow";
let upstream_name = "test-fetch-update-upstream";
let vh_params = VirtualHostParams::named(vh);
let result1 = rc.create_vhost(&vh_params);
assert!(result1.is_ok());
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
let queue_params = QueueFederationParams::new("test-queue");
let original_upstream_params = FederationUpstreamParams::new_queue_federation_upstream(
vh,
upstream_name,
&amqp_endpoint,
queue_params,
);
let result2 = rc.declare_federation_upstream(original_upstream_params);
assert!(result2.is_ok());
let upstreams = rc.list_federation_upstreams();
assert!(upstreams.is_ok());
let upstreams_list = upstreams.unwrap();
let fetched_upstream = upstreams_list
.iter()
.find(|u| u.name == upstream_name && u.vhost == vh)
.expect("Recently reated upstream was not found");
let owned_params = OwnedFederationUpstreamParams::from(fetched_upstream.clone());
assert_eq!(owned_params.name, upstream_name);
assert_eq!(owned_params.vhost, vh);
assert_eq!(owned_params.uri, amqp_endpoint);
assert_eq!(
owned_params.ack_mode,
MessageTransferAcknowledgementMode::WhenConfirmed
);
let mut modified_params = owned_params;
modified_params.ack_mode = MessageTransferAcknowledgementMode::WhenPublished;
modified_params.trust_user_id = true;
modified_params.reconnect_delay = 15;
modified_params.prefetch_count = 2000;
if let Some(ref mut queue_fed) = modified_params.queue_federation {
queue_fed.queue = Some("updated-queue".to_string());
queue_fed.consumer_tag = Some("updated-consumer-tag".to_string());
}
let updated_upstream_params = FederationUpstreamParams::from(&modified_params);
let result3 = rc.declare_federation_upstream(updated_upstream_params);
assert!(result3.is_ok());
let updated_upstreams = rc.list_federation_upstreams();
assert!(updated_upstreams.is_ok());
let updated_upstreams_list = updated_upstreams.unwrap();
let updated_upstream = updated_upstreams_list
.iter()
.find(|u| u.name == upstream_name && u.vhost == vh)
.expect("Should find the updated upstream");
assert_eq!(
updated_upstream.ack_mode,
MessageTransferAcknowledgementMode::WhenPublished
);
assert_eq!(updated_upstream.trust_user_id, Some(true));
assert_eq!(updated_upstream.reconnect_delay, Some(15));
assert_eq!(updated_upstream.queue.as_ref().unwrap(), "updated-queue");
assert_eq!(
updated_upstream.consumer_tag.as_ref().unwrap(),
"updated-consumer-tag"
);
let _ = rc.delete_vhost(vh_params.name, false);
}
#[test]
fn test_blocking_exchange_federation_upstream_fetch_and_update_workflow() {
if !rabbitmq_version_is_at_least(4, 0, 0) {
return;
}
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust.http.api.blocking.test_exchange_federation_upstream_fetch_and_update_workflow";
let upstream_name = "test-exchange-fetch-update-upstream";
let vh_params = VirtualHostParams::named(vh);
let result1 = rc.create_vhost(&vh_params);
assert!(result1.is_ok());
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
let exchange_params = ExchangeFederationParams::new(QueueType::Quorum);
let original_upstream_params = FederationUpstreamParams::new_exchange_federation_upstream(
vh,
upstream_name,
&amqp_endpoint,
exchange_params,
);
let result2 = rc.declare_federation_upstream(original_upstream_params);
assert!(result2.is_ok());
let upstreams = rc.list_federation_upstreams();
assert!(upstreams.is_ok());
let upstreams_list = upstreams.unwrap();
let fetched_upstream = upstreams_list
.iter()
.find(|u| u.name == upstream_name && u.vhost == vh)
.expect("Recently reated upstream was not found");
let owned_params = OwnedFederationUpstreamParams::from(fetched_upstream.clone());
assert_eq!(owned_params.name, upstream_name);
assert_eq!(owned_params.vhost, vh);
assert_eq!(owned_params.uri, amqp_endpoint);
let mut modified_params = owned_params;
modified_params.ack_mode = MessageTransferAcknowledgementMode::Immediate;
modified_params.trust_user_id = false;
modified_params.reconnect_delay = 20;
if let Some(ref mut exchange_fed) = modified_params.exchange_federation {
exchange_fed.exchange = Some("updated-exchange".to_string());
exchange_fed.max_hops = Some(3);
exchange_fed.queue_type = Some(QueueType::Classic);
}
let updated_upstream_params = FederationUpstreamParams::from(&modified_params);
let result3 = rc.declare_federation_upstream(updated_upstream_params);
assert!(result3.is_ok());
let updated_upstreams = rc.list_federation_upstreams();
assert!(updated_upstreams.is_ok());
let updated_upstreams_list = updated_upstreams.unwrap();
let updated_upstream = updated_upstreams_list
.iter()
.find(|u| u.name == upstream_name && u.vhost == vh)
.expect("Should find the updated upstream");
assert_eq!(
updated_upstream.ack_mode,
MessageTransferAcknowledgementMode::Immediate
);
assert_eq!(updated_upstream.trust_user_id, Some(false));
assert_eq!(updated_upstream.reconnect_delay, Some(20));
assert_eq!(
updated_upstream.exchange.as_ref().unwrap(),
"updated-exchange"
);
assert_eq!(updated_upstream.max_hops.unwrap(), 3);
assert_eq!(
updated_upstream.queue_type.clone().unwrap(),
QueueType::Classic
);
let _ = rc.delete_vhost(vh_params.name, false);
}
#[test]
fn test_blocking_delete_federation_upstream() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vh = "rust.http.api.blocking.test_delete_federation_upstream";
let upstream_name = "test-delete-upstream";
let vh_params = VirtualHostParams::named(vh);
let result1 = rc.create_vhost(&vh_params);
assert!(result1.is_ok());
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
let queue_params = QueueFederationParams::new("test-queue");
let upstream_params = FederationUpstreamParams::new_queue_federation_upstream(
vh,
upstream_name,
&amqp_endpoint,
queue_params,
);
let result2 = rc.declare_federation_upstream(upstream_params);
assert!(result2.is_ok());
let result3 = rc.delete_federation_upstream(vh, upstream_name, false);
assert!(result3.is_ok());
let result4 = rc.delete_federation_upstream(vh, upstream_name, true);
assert!(result4.is_ok());
let result5 = rc.delete_federation_upstream(vh, upstream_name, false);
assert!(result5.is_err());
let _ = rc.delete_vhost(vh_params.name, false);
}