use super::ack::{RabbitMQAckHandle, RabbitMQAckSubscriber};
use crate::ack::{AckHandle, AckSubscriber};
use crate::Message;
use std::time::SystemTime;
use tokio::time::Duration;
#[cfg(test)]
mod rabbitmq_ack_handle_tests {
use super::*;
#[test]
fn test_rabbitmq_ack_handle_creation() {
let message_id = "test-message-123".to_string();
let topic = "test-queue".to_string();
let delivery_tag = 42;
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.delivery_tag(), delivery_tag);
assert_eq!(handle.delivery_count(), delivery_count);
assert_eq!(handle.timestamp(), timestamp);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_rabbitmq_ack_handle_retry_detection() {
let message_id = "test-message-retry".to_string();
let topic = "test-queue".to_string();
let delivery_tag = 1;
let timestamp = SystemTime::now();
let handle_first = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
1, timestamp,
);
assert!(!handle_first.is_retry());
let handle_retry = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
2, timestamp,
);
assert!(handle_retry.is_retry());
let handle_multiple_retry = RabbitMQAckHandle::new(
message_id,
topic,
delivery_tag,
5, timestamp,
);
assert!(handle_multiple_retry.is_retry());
}
#[test]
fn test_rabbitmq_ack_handle_uniqueness() {
let message_id = "test-message-unique".to_string();
let topic = "test-queue".to_string();
let delivery_tag = 100;
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle1 = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
delivery_count,
timestamp,
);
let handle2 = RabbitMQAckHandle::new(
message_id,
topic,
delivery_tag,
delivery_count,
timestamp,
);
assert_ne!(handle1.handle_id(), handle2.handle_id());
}
#[test]
fn test_rabbitmq_ack_handle_properties() {
let message_id = "test-message-props".to_string();
let topic = "test-queue-props".to_string();
let delivery_tag = 999;
let delivery_count = 3;
let timestamp = SystemTime::now();
let handle = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(handle.is_retry()); assert_eq!(handle.timestamp(), timestamp);
assert_eq!(handle.delivery_tag(), delivery_tag);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_rabbitmq_ack_handle_edge_cases() {
let message_id = "".to_string(); let topic = "test-queue".to_string();
let delivery_tag = 0; let delivery_count = 0; let timestamp = SystemTime::now();
let handle = RabbitMQAckHandle::new(
message_id.clone(),
topic.clone(),
delivery_tag,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.delivery_tag(), delivery_tag);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(!handle.is_retry()); }
}
#[cfg(test)]
mod rabbitmq_ack_subscriber_tests {
use super::*;
#[tokio::test]
async fn test_rabbitmq_ack_subscriber_creation() {
let connection_string = "amqp://localhost:5672";
let result = RabbitMQAckSubscriber::new(connection_string).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("connection") ||
error.to_string().contains("Connection") ||
error.to_string().contains("refused") ||
error.to_string().contains("timeout"));
}
#[tokio::test]
async fn test_rabbitmq_ack_subscriber_invalid_connection_string() {
let invalid_connection_strings = vec![
"",
"invalid://connection",
"amqp://",
"not-a-url",
"amqp://invalid-host:99999",
];
for connection_string in invalid_connection_strings {
let result = RabbitMQAckSubscriber::new(connection_string).await;
assert!(result.is_err(), "Expected error for connection string: {}", connection_string);
}
}
#[test]
fn test_rabbitmq_connection_string_validation() {
let valid_connection_strings = vec![
"amqp://localhost:5672",
"amqp://user:pass@localhost:5672",
"amqp://user:pass@localhost:5672/vhost",
"amqps://secure.rabbitmq.com:5671",
"amqp://guest:guest@127.0.0.1:5672/%2F",
];
for connection_string in valid_connection_strings {
assert!(!connection_string.is_empty());
assert!(connection_string.starts_with("amqp"));
}
}
}
#[cfg(test)]
mod rabbitmq_error_handling_tests {
use super::*;
#[tokio::test]
async fn test_rabbitmq_connection_timeout() {
let connection_string = "amqp://192.0.2.1:5672";
let start_time = std::time::Instant::now();
let result = RabbitMQAckSubscriber::new(connection_string).await;
let elapsed = start_time.elapsed();
assert!(result.is_err());
assert!(elapsed < Duration::from_secs(30));
}
#[tokio::test]
async fn test_rabbitmq_malformed_url() {
let malformed_urls = vec![
"not-a-url",
"http://localhost:5672", "amqp://", "amqp:///", ];
for url in malformed_urls {
let result = RabbitMQAckSubscriber::new(url).await;
assert!(result.is_err(), "Expected error for malformed URL: {}", url);
}
}
}
#[cfg(test)]
mod rabbitmq_integration_unit_tests {
use super::*;
#[test]
fn test_message_to_rabbitmq_handle_conversion() {
let message = Message::new(b"test payload".to_vec())
.with_metadata("delivery_tag", "123")
.with_metadata("delivery_count", "2")
.with_metadata("queue_name", "test-queue");
let delivery_tag: u64 = message.metadata
.get("delivery_tag")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let delivery_count: u32 = message.metadata
.get("delivery_count")
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let queue_name = message.metadata
.get("queue_name")
.unwrap_or(&"default".to_string())
.clone();
assert_eq!(delivery_tag, 123);
assert_eq!(delivery_count, 2);
assert_eq!(queue_name, "test-queue");
let handle = RabbitMQAckHandle::new(message.uuid.clone(), queue_name, SystemTime::now(, delivery_count, delivery_tag),
);
assert_eq!(handle.message_id(), message.uuid);
assert_eq!(handle.delivery_tag(), delivery_tag);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(handle.is_retry()); }
#[test]
fn test_batch_acknowledgment_logic() {
let handles = vec![
RabbitMQAckHandle::new("msg1".to_string(), "queue".to_string(), SystemTime::now(), 1, 100),
RabbitMQAckHandle::new("msg2".to_string(), "queue".to_string(), SystemTime::now(), 1, 101),
RabbitMQAckHandle::new("msg3".to_string(), "queue".to_string(), SystemTime::now(), 1, 102),
];
let max_delivery_tag = handles.iter()
.map(|h| h.delivery_tag())
.max()
.unwrap_or(0);
assert_eq!(max_delivery_tag, 102);
let all_same_queue = handles.iter()
.all(|h| h.topic() == "queue");
assert!(all_same_queue);
}
#[test]
fn test_delivery_count_tracking() {
let message_id = "retry-test".to_string();
let queue = "test-queue".to_string();
let delivery_tag = 50;
let deliveries = vec![
(1, false), (2, true), (3, true), (5, true), ];
for (count, expected_retry) in deliveries {
let handle = RabbitMQAckHandle::new(message_id.clone(), queue.clone(), SystemTime::now(, count, delivery_tag),
);
assert_eq!(handle.is_retry(), expected_retry,
"Delivery count {} should have retry={}", count, expected_retry);
assert_eq!(handle.delivery_count(), count);
}
}
#[test]
fn test_handle_metadata_consistency() {
let message_id = "consistency-test".to_string();
let queue = "test-queue".to_string();
let delivery_tag = 777;
let delivery_count = 3;
let timestamp = SystemTime::now();
let handle = RabbitMQAckHandle::new(
message_id.clone(),
queue.clone(),
delivery_tag,
delivery_count,
timestamp,
);
for _ in 0..5 {
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), queue);
assert_eq!(handle.delivery_tag(), delivery_tag);
assert_eq!(handle.delivery_count(), delivery_count);
assert_eq!(handle.timestamp(), timestamp);
assert!(handle.is_retry());
}
let handle_id = handle.handle_id();
assert_eq!(handle.handle_id(), handle_id);
assert!(!handle_id.is_empty());
}
}
#[cfg(test)]
mod rabbitmq_performance_unit_tests {
use super::*;
#[test]
fn test_handle_creation_performance() {
let start = std::time::Instant::now();
let count = 1000;
for i in 0..count {
let _handle = RabbitMQAckHandle::new(
format!("message-{}", i),
"test-queue".to_string(),
i as u64,
1,
SystemTime::now(),
);
}
let elapsed = start.elapsed();
let per_handle = elapsed / count;
assert!(per_handle < Duration::from_millis(1),
"Handle creation took too long: {:?} per handle", per_handle);
}
#[test]
fn test_handle_method_call_performance() {
let handle = RabbitMQAckHandle::new("perf-test".to_string(), "test-queue".to_string(), SystemTime::now(), 2, 12345,
);
let start = std::time::Instant::now();
let iterations = 10000;
for _ in 0..iterations {
let _ = handle.message_id();
let _ = handle.topic();
let _ = handle.delivery_tag();
let _ = handle.delivery_count();
let _ = handle.is_retry();
let _ = handle.timestamp();
let _ = handle.handle_id();
}
let elapsed = start.elapsed();
let per_call = elapsed / (iterations * 7);
assert!(per_call < Duration::from_micros(1),
"Method calls took too long: {:?} per call", per_call);
}
#[test]
fn test_memory_usage_estimation() {
let handles: Vec<RabbitMQAckHandle> = (0..100)
.map(|i| RabbitMQAckHandle::new(
format!("message-{}", i),
"test-queue".to_string(),
i as u64,
1,
SystemTime::now(),
))
.collect();
assert_eq!(handles.len(), 100);
for (i, handle) in handles.iter().enumerate() {
assert_eq!(handle.message_id(), format!("message-{}", i));
assert_eq!(handle.delivery_tag(), i as u64);
}
}
}