use super::*;
use crate::error::QueueError;
use crate::message::{Message, QueueName, ReceiptHandle};
use crate::provider::{InMemoryConfig, ProviderConfig, ProviderType, QueueConfig, SessionSupport};
use chrono::Duration;
async fn test_queue_client_send_message_success<C: QueueClient>(client: &C, queue: &QueueName) {
let message = Message::new("test message".into());
let result = client.send_message(queue, message).await;
assert!(result.is_ok(), "Send message should succeed");
let message_id = result.unwrap();
assert!(
!message_id.as_str().is_empty(),
"Message ID should not be empty"
);
}
async fn test_queue_client_send_to_nonexistent_queue<C: QueueClient>(client: &C) {
let invalid_queue = QueueName::new("nonexistent-queue-12345".to_string()).unwrap();
let message = Message::new("test".into());
let result = client.send_message(&invalid_queue, message).await;
assert!(result.is_err(), "Should fail for non-existent queue");
match result.unwrap_err() {
QueueError::QueueNotFound { queue_name } => {
assert_eq!(queue_name, invalid_queue.as_str());
}
other => panic!("Expected QueueNotFound error, got: {:?}", other),
}
}
async fn test_queue_client_receive_message_success<C: QueueClient>(client: &C, queue: &QueueName) {
let message = Message::new("test receive".into());
let _sent_id = client
.send_message(queue, message.clone())
.await
.expect("Setup: send should succeed");
let result = client.receive_message(queue, Duration::seconds(5)).await;
assert!(result.is_ok(), "Receive should succeed");
let received = result.unwrap();
assert!(received.is_some(), "Should receive the message");
let received_msg = received.unwrap();
assert_eq!(received_msg.body, message.body);
assert!(!received_msg.receipt_handle.handle().is_empty());
}
async fn test_queue_client_receive_from_empty_queue<C: QueueClient>(client: &C, queue: &QueueName) {
let result = client
.receive_message(queue, Duration::milliseconds(100))
.await;
assert!(result.is_ok(), "Should not error on empty queue");
let received = result.unwrap();
assert!(received.is_none(), "Should return None for empty queue");
}
async fn test_queue_client_complete_message<C: QueueClient>(client: &C, queue: &QueueName) {
let message = Message::new("test complete".into());
client
.send_message(queue, message)
.await
.expect("Setup: send should succeed");
let received = client
.receive_message(queue, Duration::seconds(5))
.await
.expect("Setup: receive should succeed")
.expect("Setup: should have message");
let receipt = received.receipt_handle.clone();
let result = client.complete_message(receipt).await;
assert!(result.is_ok(), "Complete should succeed");
let recheck = client
.receive_message(queue, Duration::milliseconds(100))
.await
.expect("Recheck should not error");
assert!(
recheck.is_none(),
"Completed message should not be re-received"
);
}
async fn test_queue_client_abandon_message<C: QueueClient>(client: &C, queue: &QueueName) {
let message = Message::new("test abandon".into());
client
.send_message(queue, message)
.await
.expect("Setup: send should succeed");
let received = client
.receive_message(queue, Duration::seconds(5))
.await
.expect("Setup: receive should succeed")
.expect("Setup: should have message");
let receipt = received.receipt_handle.clone();
let result = client.abandon_message(receipt).await;
assert!(result.is_ok(), "Abandon should succeed");
let recheck = client
.receive_message(queue, Duration::seconds(5))
.await
.expect("Recheck should not error");
assert!(
recheck.is_some(),
"Abandoned message should be re-available"
);
}
async fn test_queue_client_dead_letter_message<C: QueueClient>(client: &C, queue: &QueueName) {
let message = Message::new("test dead letter".into());
client
.send_message(queue, message)
.await
.expect("Setup: send should succeed");
let received = client
.receive_message(queue, Duration::seconds(5))
.await
.expect("Setup: receive should succeed")
.expect("Setup: should have message");
let receipt = received.receipt_handle.clone();
let result = client
.dead_letter_message(receipt, "Test failure reason".to_string())
.await;
assert!(result.is_ok(), "Dead letter should succeed");
}
async fn test_queue_client_send_batch<C: QueueClient>(client: &C, queue: &QueueName) {
let messages = vec![
Message::new("batch 1".into()),
Message::new("batch 2".into()),
Message::new("batch 3".into()),
];
let result = client.send_messages(queue, messages).await;
if client.supports_batching() {
assert!(result.is_ok(), "Batch send should succeed");
let ids = result.unwrap();
assert_eq!(ids.len(), 3, "Should return 3 message IDs");
} else {
assert!(result.is_ok(), "Should handle batch gracefully");
}
}
async fn test_queue_client_receive_batch<C: QueueClient>(client: &C, queue: &QueueName) {
for i in 0..5 {
let message = Message::new(format!("batch receive {}", i).into());
client
.send_message(queue, message)
.await
.expect("Setup: send should succeed");
}
let result = client
.receive_messages(queue, 3, Duration::seconds(5))
.await;
assert!(result.is_ok(), "Batch receive should succeed");
let messages = result.unwrap();
assert!(
messages.len() <= 3,
"Should not exceed requested max messages"
);
assert!(!messages.is_empty(), "Should receive at least one message");
}
async fn test_queue_client_provider_type<C: QueueClient>(client: &C) {
let provider_type = client.provider_type();
assert!(
matches!(
provider_type,
ProviderType::InMemory
| ProviderType::AzureServiceBus
| ProviderType::AwsSqs
| ProviderType::RabbitMq
| ProviderType::Nats
),
"Should return valid provider type"
);
}
async fn test_queue_client_supports_sessions<C: QueueClient>(client: &C) {
let supports_sessions = client.supports_sessions();
let _ = supports_sessions;
}
#[allow(dead_code)]
async fn test_session_client_receive<S: SessionClient>(session: &S) {
let result = session.receive_message(Duration::seconds(5)).await;
assert!(
result.is_ok(),
"Session receive should not error (may return None)"
);
}
#[allow(dead_code)]
async fn test_session_client_complete<S: SessionClient>(session: &S, receipt: ReceiptHandle) {
let result = session.complete_message(receipt).await;
assert!(result.is_ok(), "Session complete should succeed");
}
#[allow(dead_code)]
async fn test_session_client_renew_lock<S: SessionClient>(session: &S) {
let result = session.renew_session_lock().await;
assert!(result.is_ok(), "Session lock renewal should succeed");
}
#[allow(dead_code)]
async fn test_session_client_close<S: SessionClient>(session: &mut S) {
let result = session.close_session().await;
assert!(result.is_ok(), "Session close should succeed");
}
#[allow(dead_code)]
async fn test_session_client_session_id<S: SessionClient>(session: &S) {
let session_id = session.session_id();
assert!(
!session_id.as_str().is_empty(),
"Session ID should not be empty"
);
}
async fn test_queue_provider_send_message<P: QueueProvider>(provider: &P, queue: &QueueName) {
let message = Message::new("provider test".into());
let result = provider.send_message(queue, &message).await;
assert!(result.is_ok(), "Provider send should succeed");
}
async fn test_queue_provider_session_support<P: QueueProvider>(provider: &P) {
let support = provider.supports_sessions();
assert!(
matches!(support, SessionSupport::Native | SessionSupport::Emulated),
"Should return valid session support level"
);
}
async fn test_queue_provider_batch_limits<P: QueueProvider>(provider: &P) {
let max_batch = provider.max_batch_size();
if provider.supports_batching() {
assert!(max_batch > 0, "Batch size should be positive if supported");
assert!(max_batch <= 100, "Batch size should be reasonable (≤100)");
} else {
assert_eq!(max_batch, 1, "Non-batching provider should have max 1");
}
}
#[tokio::test]
async fn test_factory_create_test_client() {
let client = QueueClientFactory::create_test_client();
assert_eq!(
client.provider_type(),
ProviderType::InMemory,
"Test client should use InMemory provider"
);
}
#[tokio::test]
async fn test_factory_create_from_in_memory_config() {
let config = QueueConfig {
provider: ProviderConfig::InMemory(InMemoryConfig::default()),
..Default::default()
};
let result = QueueClientFactory::create_client(config).await;
assert!(result.is_ok(), "Should create client from InMemory config");
let client = result.unwrap();
assert_eq!(client.provider_type(), ProviderType::InMemory);
}
#[tokio::test]
async fn test_factory_create_from_azure_config() {
let config = QueueConfig {
provider: ProviderConfig::AzureServiceBus(crate::provider::AzureServiceBusConfig {
connection_string: Some("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test".to_string()),
namespace: Some("test".to_string()),
auth_method: crate::providers::AzureAuthMethod::ConnectionString,
use_sessions: true,
session_timeout: Duration::minutes(5),
}),
..Default::default()
};
let result = QueueClientFactory::create_client(config).await;
let _ = result;
}
#[tokio::test]
async fn test_factory_create_from_aws_config() {
let config = QueueConfig {
provider: ProviderConfig::AwsSqs(crate::provider::AwsSqsConfig {
region: "us-east-1".to_string(),
access_key_id: None,
secret_access_key: None,
use_fifo_queues: true,
}),
..Default::default()
};
let result = QueueClientFactory::create_client(config).await;
let _ = result;
}
#[tokio::test]
async fn test_factory_create_from_rabbitmq_config_errors_without_broker() {
let config = QueueConfig {
provider: ProviderConfig::RabbitMq(crate::providers::rabbitmq::RabbitMqConfig {
url: "amqp://guest:guest@127.0.0.1:15799".to_string(),
..crate::providers::rabbitmq::RabbitMqConfig::default()
}),
..Default::default()
};
let result = QueueClientFactory::create_client(config).await;
assert!(
result.is_err(),
"Should fail with connection error when no broker is available"
);
}
#[tokio::test]
async fn test_factory_create_from_nats_config_errors_without_server() {
let config = QueueConfig {
provider: ProviderConfig::Nats(crate::providers::nats::NatsConfig {
url: "nats://127.0.0.1:15800".to_string(),
..crate::providers::nats::NatsConfig::default()
}),
..Default::default()
};
let result = QueueClientFactory::create_client(config).await;
assert!(
result.is_err(),
"Should fail with connection error when no NATS server is available"
);
}
#[tokio::test]
async fn test_standard_client_delegates_to_provider() {
let provider = InMemoryProvider::default();
let config = QueueConfig::default();
let _client = StandardQueueClient::new(Box::new(provider), config);
}
#[tokio::test]
async fn test_in_memory_provider_creation() {
let config = InMemoryConfig::default();
let provider = InMemoryProvider::new(config);
assert_eq!(provider.provider_type(), ProviderType::InMemory);
assert_eq!(provider.supports_sessions(), SessionSupport::Native);
assert!(provider.supports_batching());
}
#[tokio::test]
async fn test_in_memory_provider_batch_limits() {
let provider = InMemoryProvider::default();
let max_batch = provider.max_batch_size();
assert_eq!(max_batch, 100, "InMemory should support batch of 100");
}
#[allow(dead_code)]
async fn run_queue_client_contract_tests<C: QueueClient>(client: &C, test_queue: &QueueName) {
test_queue_client_send_message_success(client, test_queue).await;
test_queue_client_send_to_nonexistent_queue(client).await;
test_queue_client_receive_message_success(client, test_queue).await;
test_queue_client_receive_from_empty_queue(client, test_queue).await;
test_queue_client_complete_message(client, test_queue).await;
test_queue_client_abandon_message(client, test_queue).await;
test_queue_client_dead_letter_message(client, test_queue).await;
test_queue_client_send_batch(client, test_queue).await;
test_queue_client_receive_batch(client, test_queue).await;
test_queue_client_provider_type(client).await;
test_queue_client_supports_sessions(client).await;
}
#[allow(dead_code)]
async fn run_queue_provider_contract_tests<P: QueueProvider>(provider: &P, test_queue: &QueueName) {
test_queue_provider_send_message(provider, test_queue).await;
test_queue_provider_session_support(provider).await;
test_queue_provider_batch_limits(provider).await;
}