#![allow(dead_code)]
use std::env;
use std::error::Error;
use std::ffi::OsStr;
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use assert_cmd::assert::Assert;
use assert_cmd::prelude::*;
use predicates::prelude::predicate;
use rabbitmq_http_client::blocking_api::Client as GenericAPIClient;
use rabbitmqadmin::pre_flight::InteractivityMode;
type APIClient<'a> = GenericAPIClient<&'a str, &'a str, &'a str>;
type CommandRunResult = Result<(), Box<dyn Error>>;
pub const ENDPOINT: &str = "http://localhost:15672/api";
pub const USERNAME: &str = "guest";
pub const PASSWORD: &str = "guest";
pub const AMQP_ENDPOINT: &str = "amqp://localhost:5672";
pub fn endpoint() -> String {
ENDPOINT.to_owned()
}
pub fn api_client() -> APIClient<'static> {
APIClient::new(ENDPOINT, USERNAME, PASSWORD)
}
pub fn amqp_endpoint() -> String {
AMQP_ENDPOINT.to_owned()
}
pub fn amqp_endpoint_with_vhost(name: &str) -> String {
format!("{}/{}", AMQP_ENDPOINT, name)
}
pub fn await_ms(ms: u64) {
thread::sleep(Duration::from_millis(ms));
}
pub fn await_metric_emission(ms: u64) {
thread::sleep(Duration::from_millis(ms));
}
pub fn await_queue_metric_emission() {
let delay = env::var("TEST_STATS_DELAY").unwrap_or("500".to_owned());
await_metric_emission(delay.parse::<u64>().unwrap());
}
pub fn run_succeeds<I, S>(args: I) -> Assert
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(args).assert().success()
}
pub fn run_fails<I, S>(args: I) -> Assert
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(args).assert().failure()
}
pub fn run_succeeds_with_interactivity_mode<I, S>(args: I, mode: InteractivityMode) -> Assert
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
match mode {
InteractivityMode::NonInteractive => {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.env("RABBITMQADMIN_NON_INTERACTIVE_MODE", "true");
cmd.args(args).assert().success()
}
InteractivityMode::Interactive => run_succeeds(args),
}
}
pub fn run_fails_with_interactivity_mode<I, S>(args: I, mode: InteractivityMode) -> Assert
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
match mode {
InteractivityMode::NonInteractive => {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.env("RABBITMQADMIN_NON_INTERACTIVE_MODE", "true");
cmd.args(args).assert().failure()
}
InteractivityMode::Interactive => run_fails(args),
}
}
pub fn create_vhost(vhost: &str) -> CommandRunResult {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(["vhosts", "declare", "--name", vhost]);
cmd.assert().success();
Ok(())
}
pub fn delete_vhost(vhost: &str) -> CommandRunResult {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(["vhosts", "delete", "--name", vhost, "--idempotently"]);
let _ = cmd.output();
Ok(())
}
pub fn delete_user(username: &str) -> CommandRunResult {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(["delete", "user", "--name", username, "--idempotently"]);
cmd.assert().success();
Ok(())
}
pub fn delete_all_test_vhosts() -> CommandRunResult {
let client = api_client();
match client.list_vhosts() {
Ok(vhosts) => {
for vhost in vhosts {
if vhost.name.starts_with("rabbitmqadmin.") {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(["vhosts", "delete", "--name", &vhost.name, "--idempotently"]);
let _ = cmd.assert().success();
}
}
}
Err(_) => {
}
}
Ok(())
}
pub fn delete_vhosts_with_prefix(prefix: &str) -> CommandRunResult {
let client = api_client();
match client.list_vhosts() {
Ok(vhosts) => {
for vhost in vhosts {
if vhost.name.starts_with(prefix) {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
cmd.args(["vhosts", "delete", "--name", &vhost.name, "--idempotently"]);
let _ = cmd.assert().success();
}
}
}
Err(_) => {
}
}
Ok(())
}
pub fn output_includes(content: &str) -> predicates::str::ContainsPredicate {
predicate::str::contains(content)
}
pub fn rabbitmq_version() -> (u32, u32, u32) {
let client = api_client();
let overview = client.overview().expect("failed to get RabbitMQ overview");
parse_version(&overview.rabbitmq_version)
}
pub fn rabbitmq_version_is_at_least(min_major: u32, min_minor: u32, min_patch: u32) -> bool {
let (major, minor, patch) = rabbitmq_version();
(major, minor, patch) >= (min_major, min_minor, min_patch)
}
fn parse_version(version_str: &str) -> (u32, u32, u32) {
let base_version = version_str.split('+').next().unwrap_or(version_str);
let parts: Vec<&str> = base_version.split('.').collect();
let parse_component = |s: &str| {
s.split('-')
.next()
.and_then(|n| n.parse().ok())
.unwrap_or(0)
};
let major = parts.first().map(|s| parse_component(s)).unwrap_or(0);
let minor = parts.get(1).map(|s| parse_component(s)).unwrap_or(0);
let patch = parts.get(2).map(|s| parse_component(s)).unwrap_or(0);
(major, minor, patch)
}
pub fn await_federation_link_with(content: &str, timeout_ms: u64) {
let start = Instant::now();
let timeout = Duration::from_millis(timeout_ms);
loop {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
let output = cmd.args(["federation", "list_all_links"]).output().unwrap();
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
if stdout.contains(content) {
return;
}
}
if start.elapsed() > timeout {
panic!(
"Timed out after {}ms waiting for federation link output to contain '{}'",
timeout_ms, content
);
}
thread::sleep(Duration::from_millis(500));
}
}
pub fn await_no_federation_link_with(content: &str, timeout_ms: u64) {
let start = Instant::now();
let timeout = Duration::from_millis(timeout_ms);
loop {
let mut cmd = Command::new(assert_cmd::cargo::cargo_bin!("rabbitmqadmin"));
let output = cmd.args(["federation", "list_all_links"]).output().unwrap();
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
if !stdout.contains(content) {
return;
}
}
if start.elapsed() > timeout {
panic!(
"Timed out after {}ms waiting for federation link output to NOT contain '{}'",
timeout_ms, content
);
}
thread::sleep(Duration::from_millis(500));
}
}
#[macro_export]
macro_rules! skip_if_rabbitmq_version_below {
($min_major:expr, $min_minor:expr, $min_patch:expr) => {{
let (major, minor, patch) = $crate::test_helpers::rabbitmq_version();
if (major, minor, patch) < ($min_major, $min_minor, $min_patch) {
println!(
"SKIPPED: test requires RabbitMQ >= {}.{}.{}, found {}.{}.{}",
$min_major, $min_minor, $min_patch, major, minor, patch
);
return Ok(());
}
}};
}