#![allow(dead_code)]
use rabbitmq_http_client::blocking_api::Client as BlockingClient;
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use amqprs::BasicProperties;
use amqprs::channel::BasicPublishArguments;
use amqprs::connection::{Connection, OpenConnectionArguments};
use rabbitmq_http_client::api::Client as AsyncClient;
use regex::Regex;
use serde_json::{Map, Value, json};
use tokio::time;
pub const ENDPOINT: &str = "http://localhost:15672/api";
pub const USERNAME: &str = "guest";
pub const PASSWORD: &str = "guest";
pub const AMQP_ENDPOINT: &str = "amqp://localhost:5672";
pub type APIClient<'a> = BlockingClient<&'a str, &'a str, &'a str>;
pub fn endpoint() -> String {
ENDPOINT.to_owned()
}
pub fn hostname() -> String {
"localhost".to_owned()
}
pub fn amqp_endpoint() -> String {
AMQP_ENDPOINT.to_owned()
}
pub fn amqp_endpoint_with_vhost(name: &str) -> String {
format!("{AMQP_ENDPOINT}/{name}").to_owned()
}
pub fn amqp10_endpoint_with_vhost(name: &str) -> String {
format!("{AMQP_ENDPOINT}?hostname='vhost:{name}'").to_owned()
}
fn parse_version(version_str: &str) -> (u32, u32, u32) {
let base_version = version_str.split('+').next().unwrap_or(version_str);
let parts: Vec<&str> = base_version.split('.').collect();
let parse_component = |s: &str| {
s.split('-')
.next()
.and_then(|n| n.parse().ok())
.unwrap_or(0)
};
let major = parts.first().map(|s| parse_component(s)).unwrap_or(0);
let minor = parts.get(1).map(|s| parse_component(s)).unwrap_or(0);
let patch = parts.get(2).map(|s| parse_component(s)).unwrap_or(0);
(major, minor, patch)
}
pub fn rabbitmq_version() -> (u32, u32, u32) {
let endpoint = endpoint();
let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD);
let version = rc.server_version().unwrap();
parse_version(&version)
}
pub fn rabbitmq_version_is_at_least(min_major: u32, min_minor: u32, min_patch: u32) -> bool {
let (major, minor, patch) = rabbitmq_version();
(major, minor, patch) >= (min_major, min_minor, min_patch)
}
pub fn testing_against_3_12_x() -> bool {
testing_against_series("^3.12")
}
pub fn testing_against_3_13_x() -> bool {
testing_against_series("^3.13")
}
pub fn testing_against_4_0_x() -> bool {
testing_against_series("^4.0")
}
pub fn testing_against_4_1_x() -> bool {
testing_against_series("^4.1")
}
pub fn testing_against_4_2_x() -> bool {
testing_against_series("^4.2")
}
pub fn testing_against_4_3_x() -> bool {
testing_against_series("^4.3")
}
pub fn expected_stable_version_feature_flag() -> Option<&'static str> {
let endpoint = endpoint();
let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD);
let version = rc.server_version().unwrap();
if version.starts_with("4.3") {
Some("rabbitmq_4.3.0")
} else if version.starts_with("4.2") {
Some("rabbitmq_4.2.0")
} else if version.starts_with("4.1") {
Some("rabbitmq_4.1.0")
} else if version.starts_with("4.0") {
Some("rabbitmq_4.0.0")
} else {
None
}
}
pub fn testing_against_series(series: &str) -> bool {
let endpoint = endpoint();
let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD);
let regex = Regex::new(series).unwrap();
regex.is_match(&rc.server_version().unwrap())
}
pub fn testing_against_version(series: &str) -> bool {
let endpoint = endpoint();
let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD);
&rc.server_version().unwrap() == series
}
pub fn await_metric_emission(ms: u64) {
std::thread::sleep(Duration::from_millis(ms));
}
pub fn await_queue_metric_emission() {
let delay = env::var("TEST_STATS_DELAY").unwrap_or("500".to_owned());
await_metric_emission(delay.parse::<u64>().unwrap());
}
pub async fn async_rabbitmq_version() -> (u32, u32, u32) {
let endpoint = endpoint();
let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD);
let version = rc.server_version().await.unwrap();
parse_version(&version)
}
pub async fn async_rabbitmq_version_is_at_least(
min_major: u32,
min_minor: u32,
min_patch: u32,
) -> bool {
let (major, minor, patch) = async_rabbitmq_version().await;
(major, minor, patch) >= (min_major, min_minor, min_patch)
}
pub async fn async_testing_against_3_12_x() -> bool {
async_testing_against_series("^3.12").await
}
pub async fn async_testing_against_3_13_x() -> bool {
async_testing_against_series("^3.13").await
}
pub async fn async_testing_against_4_0_x() -> bool {
async_testing_against_series("^4.0").await
}
pub async fn async_testing_against_4_1_x() -> bool {
async_testing_against_series("^4.1").await
}
pub async fn async_testing_against_4_2_x() -> bool {
async_testing_against_series("^4.2").await
}
pub async fn async_testing_against_4_3_x() -> bool {
async_testing_against_series("^4.3").await
}
pub async fn async_expected_stable_version_feature_flag() -> Option<&'static str> {
let endpoint = endpoint();
let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD);
let version = rc.server_version().await.unwrap();
if version.starts_with("4.3") {
Some("rabbitmq_4.3.0")
} else if version.starts_with("4.2") {
Some("rabbitmq_4.2.0")
} else if version.starts_with("4.1") {
Some("rabbitmq_4.1.0")
} else if version.starts_with("4.0") {
Some("rabbitmq_4.0.0")
} else {
None
}
}
pub async fn async_testing_against_series(series: &str) -> bool {
let endpoint = endpoint();
let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD);
let regex = Regex::new(series).unwrap();
regex.is_match(&rc.server_version().await.unwrap())
}
pub async fn async_testing_against_version(series: &str) -> bool {
let endpoint = endpoint();
let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD);
&rc.server_version().await.unwrap() == series
}
pub async fn async_await_metric_emission(ms: u64) {
time::sleep(Duration::from_millis(ms)).await;
}
pub async fn async_await_queue_metric_emission() {
let delay = env::var("TEST_STATS_DELAY").unwrap_or("500".to_owned());
await_metric_emission(delay.parse::<u64>().unwrap());
}
pub async fn generate_activity() {
let args = OpenConnectionArguments::new(&hostname(), 5672, USERNAME, PASSWORD);
let conn = Connection::open(&args).await.unwrap();
assert!(conn.is_open());
let ch = conn.open_channel(None).await.unwrap();
assert!(ch.is_open());
let payload = String::from("a dummy message").into_bytes();
let args = BasicPublishArguments::new("amq.fanout", "");
for _ in 0..1000 {
ch.basic_publish(BasicProperties::default(), payload.clone(), args.clone())
.await
.unwrap()
}
async_await_queue_metric_emission().await;
conn.close().await.unwrap()
}
pub fn cluster_tags(tags: Map<String, Value>) -> Map<String, Value> {
let mut val = Map::<String, Value>::new();
val.insert(String::from("cluster_tags"), json!(tags));
val
}
pub const TLS_ENDPOINT: &str = "https://localhost:15671/api";
pub fn tls_certs_dir() -> PathBuf {
env::var("TLS_CERTS_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| {
let manifest_dir = env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
PathBuf::from(manifest_dir).join("tests/tls/certs")
})
}
pub fn ca_cert_path() -> PathBuf {
tls_certs_dir().join("ca_certificate.pem")
}
pub fn client_identity_p12_path() -> PathBuf {
tls_certs_dir().join("client_identity.p12")
}