use crate::test_helpers::{
PASSWORD, USERNAME, await_metric_emission, endpoint, rabbitmq_version_is_at_least,
};
use proptest::prelude::*;
use proptest::test_runner::Config as ProptestConfig;
use rabbitmq_http_client::{blocking_api::Client, commons::QueueType, requests::QueueParams};
use serde_json::{Map, Value, json};
fn arb_queue_name() -> impl Strategy<Value = String> {
prop::string::string_regex(r"rust\.tests\.blocking\.proptest\.[a-zA-Z0-9_-]{8,20}").unwrap()
}
fn arb_classic_queue_params()
-> impl Strategy<Value = (String, bool, bool, Option<Map<String, Value>>)> {
(
arb_queue_name(),
any::<bool>(), any::<bool>(), arb_optional_args(),
)
}
fn arb_quorum_queue_params() -> impl Strategy<Value = (String, Option<Map<String, Value>>)> {
(arb_queue_name(), arb_optional_args())
}
fn arb_stream_params() -> impl Strategy<Value = (String, u64)> {
(arb_queue_name(), arb_max_length_bytes())
}
fn arb_message_ttl() -> impl Strategy<Value = u64> {
1000u64..3600000u64
}
fn arb_max_length() -> impl Strategy<Value = u64> {
100u64..1000000u64
}
fn arb_max_length_bytes() -> impl Strategy<Value = u64> {
1024u64..100_000_000u64
}
fn arb_optional_args() -> impl Strategy<Value = Option<Map<String, Value>>> {
prop_oneof![
Just(None),
arb_message_ttl().prop_map(|ttl| {
let mut map = Map::new();
map.insert("x-message-ttl".to_string(), json!(ttl));
Some(map)
}),
arb_max_length().prop_map(|len| {
let mut map = Map::new();
map.insert("x-max-length".to_string(), json!(len));
Some(map)
}),
arb_max_length_bytes().prop_map(|bytes| {
let mut map = Map::new();
map.insert("x-max-length-bytes".to_string(), json!(bytes));
Some(map)
}),
(arb_message_ttl(), arb_max_length()).prop_map(|(ttl, len)| {
let mut map = Map::new();
map.insert("x-message-ttl".to_string(), json!(ttl));
map.insert("x-max-length".to_string(), json!(len));
Some(map)
}),
]
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
#[test]
fn prop_blocking_durable_client_named_classic_queue(
(name, durable, auto_delete, optional_args) in arb_classic_queue_params()
) {
if !durable && rabbitmq_version_is_at_least(4, 3, 0) {
return Ok(());
}
let endpoint = endpoint();
let client = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let _ = client.delete_queue(vhost, &name, true);
let params = QueueParams::new(&name, QueueType::Classic, durable, auto_delete, optional_args);
let result1 = client.declare_queue(vhost, ¶ms);
prop_assert!(result1.is_ok(), "Failed to declare classic queue: {result1:?}");
await_metric_emission(20);
let result2 = client.list_queues();
prop_assert!(result2.is_ok(), "Failed to list queues: {result2:?}");
let queues = result2.unwrap();
let found_queue = queues.iter().find(|q| q.name == name);
prop_assert!(found_queue.is_some(), "list_queues did not include the declared queue: {}", name);
let queue = found_queue.unwrap();
prop_assert_eq!(&queue.queue_type, "classic");
prop_assert_eq!(queue.durable, durable);
prop_assert_eq!(queue.auto_delete, auto_delete);
let _ = client.delete_queue(vhost, &name, true);
}
#[test]
fn prop_blocking_durable_client_named_quorum_queue(
(name, optional_args) in arb_quorum_queue_params()
) {
let endpoint = endpoint();
let client = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let _ = client.delete_queue(vhost, &name, true);
let params = QueueParams::new_quorum_queue(&name, optional_args);
let result1 = client.declare_queue(vhost, ¶ms);
prop_assert!(result1.is_ok(), "Failed to declare quorum queue: {result1:?}");
await_metric_emission(20);
let result2 = client.list_queues_in(vhost);
prop_assert!(result2.is_ok(), "Failed to list queues in vhost: {result2:?}");
let queues = result2.unwrap();
let found_queue = queues.iter().find(|q| q.name == name);
prop_assert!(found_queue.is_some(), "list_queues did not include the declared queue: {}", name);
let queue = found_queue.unwrap();
prop_assert_eq!(&queue.queue_type, "quorum");
prop_assert_eq!(queue.durable, true);
prop_assert_eq!(queue.auto_delete, false);
let _ = client.delete_queue(vhost, &name, true);
}
#[test]
fn prop_blocking_stream_essential_ops(
(name, max_length_bytes) in arb_stream_params()
) {
if !rabbitmq_version_is_at_least(3, 13, 0) {
return Ok(());
}
let endpoint = endpoint();
let client = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let _ = client.delete_queue(vhost, &name, true);
let mut map = Map::new();
map.insert("x-max-length-bytes".to_string(), json!(max_length_bytes));
let optional_args = Some(map);
let params = QueueParams::new_stream(&name, optional_args);
let result1 = client.declare_queue(vhost, ¶ms);
prop_assert!(result1.is_ok(), "Failed to declare stream: {result1:?}");
await_metric_emission(20);
let result2 = client.list_queues_with_details();
prop_assert!(result2.is_ok(), "list_queues_with_details did not include the declared queue: {result2:?}");
let queues = result2.unwrap();
let found_queue = queues.iter().find(|q| q.name == name);
prop_assert!(found_queue.is_some(), "list_queues_with_details did not include the declared stream: {}", name);
let queue = found_queue.unwrap();
prop_assert_eq!(&queue.queue_type, "stream");
prop_assert_eq!(queue.durable, true);
prop_assert_eq!(queue.auto_delete, false);
let _ = client.delete_queue(vhost, &name, true);
}
#[test]
fn prop_blocking_transient_autodelete_classic_queue(
name in arb_queue_name(),
optional_args in arb_optional_args()
) {
if rabbitmq_version_is_at_least(4, 3, 0) {
return Ok(());
}
let endpoint = endpoint();
let client = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let _ = client.delete_queue(vhost, &name, true);
let params = QueueParams::new_transient_autodelete(&name, optional_args);
let result1 = client.declare_queue(vhost, ¶ms);
prop_assert!(result1.is_ok(), "Failed to declare transient auto-delete queue: {result1:?}");
await_metric_emission(20);
let result2 = client.get_queue_info(vhost, &name);
prop_assert!(result2.is_ok(), "Failed to get queue info: {result2:?}");
let queue = result2.unwrap();
prop_assert_eq!(queue.name, name.clone());
prop_assert_eq!(queue.vhost, vhost);
prop_assert_eq!(&queue.queue_type, "classic");
prop_assert_eq!(queue.durable, false);
prop_assert_eq!(queue.auto_delete, true);
let _ = client.delete_queue(vhost, &name, true);
}
#[test]
fn prop_blocking_list_queues_consistency(
names in prop::collection::vec(arb_queue_name(), 1..5)
) {
let endpoint = endpoint();
let client = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
for name in &names {
let _ = client.delete_queue(vhost, name, true);
}
for name in &names {
let params = QueueParams::new_durable_classic_queue(name, None);
let result1 = client.declare_queue(vhost, ¶ms);
prop_assert!(result1.is_ok(), "Failed to declare queue {}: {result1:?}", name);
}
await_metric_emission(20);
let result2 = client.list_queues();
prop_assert!(result2.is_ok(), "Failed to list all queues: {result2:?}");
let result3 = client.list_queues_in(vhost);
prop_assert!(result3.is_ok(), "Failed to list queues in vhost: {result3:?}");
let all_queues = result2.unwrap();
let vhost_queues = result3.unwrap();
for name in &names {
let found_in_all = all_queues.iter().any(|q| q.name == *name);
let found_in_vhost = vhost_queues.iter().any(|q| q.name == *name);
prop_assert!(found_in_all, "list_queues did not include the declared queue {}", name);
prop_assert!(found_in_vhost, "list_queues_in did not include the declared queue {}", name);
}
for name in &names {
let _ = client.delete_queue(vhost, name, true);
}
}
}