use crate::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber, InMemoryAckSubscriber, InMemoryConfig};
use crate::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use crate::kafka::{KafkaPublisher, KafkaSubscriber};
use crate::mqtt::{MqttPublisher, MqttSubscriber};
use crate::{Message, Publisher, Subscriber, AckSubscriber};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::time::timeout;
use uuid::Uuid;
fn create_test_messages(count: usize, prefix: &str) -> Vec<Message> {
(0..count)
.map(|i| {
let payload = format!("{} message {}", prefix, i).into_bytes();
let mut msg = Message::new(payload);
msg = msg.with_metadata("test_id", &i.to_string());
msg = msg.with_metadata("prefix", prefix);
msg = msg.with_metadata("created_at", &SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs().to_string());
msg
})
.collect()
}
fn create_single_test_message(content: &str) -> Message {
let mut msg = Message::new(content.as_bytes().to_vec());
msg = msg.with_metadata("content", content);
msg = msg.with_metadata("timestamp", &SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs().to_string());
msg
}
#[cfg(test)]
mod in_memory_backend_tests {
use super::*;
#[tokio::test]
async fn test_in_memory_publisher_creation() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
assert!(Arc::strong_count(&broker) >= 2); }
#[tokio::test]
async fn test_in_memory_subscriber_creation() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let subscriber = InMemorySubscriber::new(broker.clone());
assert!(Arc::strong_count(&broker) >= 2); }
#[tokio::test]
async fn test_in_memory_basic_publish_subscribe() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let topic = "test_topic";
let test_message = create_single_test_message("Hello, World!");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, vec![test_message.clone()]).await.expect("Failed to publish");
let received = timeout(Duration::from_secs(1), subscriber.receive()).await
.expect("Timeout waiting for message")
.expect("Failed to receive message");
assert_eq!(received.payload, test_message.payload);
assert_eq!(received.metadata.get("content"), test_message.metadata.get("content"));
}
#[tokio::test]
async fn test_in_memory_multiple_messages() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let topic = "multi_message_topic";
let test_messages = create_test_messages(5, "MultiTest");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, test_messages.clone()).await.expect("Failed to publish");
for i in 0..5 {
let received = timeout(Duration::from_secs(1), subscriber.receive()).await
.expect("Timeout waiting for message")
.expect("Failed to receive message");
assert!(String::from_utf8_lossy(&received.payload).contains("MultiTest"));
assert_eq!(received.metadata.get("prefix"), Some(&"MultiTest".to_string()));
}
}
#[tokio::test]
async fn test_in_memory_multiple_topics() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber1 = InMemorySubscriber::new(broker.clone());
let mut subscriber2 = InMemorySubscriber::new(broker.clone());
let topic1 = "topic_one";
let topic2 = "topic_two";
subscriber1.subscribe(topic1).await.expect("Failed to subscribe to topic1");
subscriber2.subscribe(topic2).await.expect("Failed to subscribe to topic2");
let msg1 = create_single_test_message("Message for topic 1");
let msg2 = create_single_test_message("Message for topic 2");
publisher.publish(topic1, vec![msg1.clone()]).await.expect("Failed to publish to topic1");
publisher.publish(topic2, vec![msg2.clone()]).await.expect("Failed to publish to topic2");
let received1 = timeout(Duration::from_secs(1), subscriber1.receive()).await
.expect("Timeout waiting for message from topic1")
.expect("Failed to receive from topic1");
let received2 = timeout(Duration::from_secs(1), subscriber2.receive()).await
.expect("Timeout waiting for message from topic2")
.expect("Failed to receive from topic2");
assert_eq!(received1.payload, msg1.payload);
assert_eq!(received2.payload, msg2.payload);
}
#[tokio::test]
async fn test_in_memory_concurrent_publishers() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let topic = "concurrent_topic";
subscriber.subscribe(topic).await.expect("Failed to subscribe");
let mut handles = Vec::new();
for i in 0..3 {
let broker_clone = broker.clone();
let handle = tokio::spawn(async move {
let publisher = InMemoryPublisher::new(broker_clone);
let message = create_single_test_message(&format!("Concurrent message {}", i));
publisher.publish(topic, vec![message]).await.expect("Failed to publish");
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Publisher task failed");
}
let mut received_count = 0;
for _ in 0..3 {
match timeout(Duration::from_secs(1), subscriber.receive()).await {
Ok(Ok(_)) => received_count += 1,
_ => break,
}
}
assert_eq!(received_count, 3, "Should receive all concurrent messages");
}
#[tokio::test]
async fn test_in_memory_error_handling() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let mut subscriber = InMemorySubscriber::new(broker.clone());
match timeout(Duration::from_millis(100), subscriber.receive()).await {
Err(_) => {}, Ok(Err(_)) => {}, Ok(Ok(_)) => panic!("Should not receive message without subscription"),
}
let publisher = InMemoryPublisher::new(broker.clone());
let message = create_single_test_message("Test");
match publisher.publish("", vec![message.clone()]).await {
Err(_) => {}, Ok(_) => panic!("Should not allow empty topic name"),
}
}
#[tokio::test]
async fn test_in_memory_broker_shutdown() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let topic = "shutdown_topic";
subscriber.subscribe(topic).await.expect("Failed to subscribe");
broker.shutdown().await.expect("Failed to shutdown broker");
let message = create_single_test_message("After shutdown");
match publisher.publish(topic, vec![message]).await {
Err(_) => {}, Ok(_) => panic!("Should not allow publishing after shutdown"),
}
match timeout(Duration::from_millis(100), subscriber.receive()).await {
Err(_) => {}, Ok(Err(_)) => {}, Ok(Ok(_)) => panic!("Should not receive after shutdown"),
}
}
}
#[cfg(test)]
mod in_memory_ack_backend_tests {
use super::*;
#[tokio::test]
async fn test_in_memory_ack_subscriber_creation() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let _subscriber = InMemoryAckSubscriber::new(broker.clone());
assert!(Arc::strong_count(&broker) >= 2);
}
#[tokio::test]
async fn test_in_memory_basic_acknowledgment_fixed() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topic = "ack_test_topic";
let test_message = create_single_test_message("Ack test message");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, vec![test_message.clone()]).await.expect("Failed to publish");
let (received, handle) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect("Timeout waiting for message")
.expect("Failed to receive message with ack");
assert_eq!(received.payload, test_message.payload);
assert_eq!(handle.topic(), topic);
assert!(!handle.is_retry());
subscriber.ack(handle).await.expect("Failed to acknowledge message");
}
#[tokio::test]
async fn test_in_memory_negative_acknowledgment_fixed() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topic = "nack_test_topic";
let test_message = create_single_test_message("Nack test message");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, vec![test_message.clone()]).await.expect("Failed to publish");
let (received, handle) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect("Timeout waiting for message")
.expect("Failed to receive message with ack");
assert_eq!(received.payload, test_message.payload);
subscriber.nack(handle, true).await.expect("Failed to nack message");
let (received_again, handle2) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect("Timeout waiting for requeued message")
.expect("Failed to receive requeued message");
assert_eq!(received_again.payload, test_message.payload);
assert!(handle2.is_retry());
subscriber.ack(handle2).await.expect("Failed to acknowledge retry");
}
#[tokio::test]
async fn test_in_memory_batch_acknowledgment_fixed() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topic = "batch_ack_topic";
let test_messages = create_test_messages(3, "BatchTest");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, test_messages.clone()).await.expect("Failed to publish");
let mut handles = Vec::new();
for i in 0..3 {
let (received, handle) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect(&format!("Timeout waiting for message {}", i + 1))
.expect(&format!("Failed to receive message {}", i + 1));
assert!(String::from_utf8_lossy(&received.payload).contains("BatchTest"));
handles.push(handle);
}
subscriber.ack_batch(handles).await.expect("Failed to batch acknowledge");
}
#[tokio::test]
async fn test_in_memory_ack_handle_properties() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topic = "handle_props_topic";
let test_message = create_single_test_message("Handle properties test");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, vec![test_message]).await.expect("Failed to publish");
let (_, handle) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect("Timeout waiting for message")
.expect("Failed to receive message");
assert_eq!(handle.topic(), topic);
assert_eq!(handle.delivery_count(), 1);
assert!(!handle.is_retry());
assert!(!handle.message_id().is_empty());
subscriber.ack(handle).await.expect("Failed to acknowledge");
}
}
#[cfg(test)]
mod rabbitmq_backend_tests {
use super::*;
#[tokio::test]
async fn test_rabbitmq_publisher_creation() {
let uri = "amqp://localhost:5672";
match RabbitMQPublisher::new(uri).await {
Ok(_publisher) => {
assert!(true);
}
Err(_) => {
println!("RabbitMQ not available for testing - this is expected in unit tests");
assert!(true);
}
}
}
#[tokio::test]
async fn test_rabbitmq_subscriber_creation() {
let uri = "amqp://localhost:5672";
match RabbitMQSubscriber::new(uri).await {
Ok(_subscriber) => {
assert!(true);
}
Err(_) => {
println!("RabbitMQ not available for testing - this is expected in unit tests");
assert!(true);
}
}
}
#[tokio::test]
async fn test_rabbitmq_invalid_uri() {
let invalid_uri = "invalid://uri";
match RabbitMQPublisher::new(invalid_uri).await {
Ok(_) => panic!("Should not create publisher with invalid URI"),
Err(_) => assert!(true), }
}
}
#[cfg(test)]
mod kafka_backend_tests {
use super::*;
#[test]
fn test_kafka_publisher_creation() {
let brokers = vec!["localhost:9092".to_string()];
match KafkaPublisher::new(brokers) {
Ok(_publisher) => {
assert!(true);
}
Err(_) => {
println!("Kafka publisher creation failed - this may be expected");
assert!(true);
}
}
}
#[test]
fn test_kafka_publisher_empty_brokers() {
let empty_brokers = vec![];
match KafkaPublisher::new(empty_brokers) {
Ok(_) => panic!("Should not create publisher with empty brokers"),
Err(_) => assert!(true), }
}
#[test]
fn test_kafka_publisher_invalid_broker() {
let invalid_brokers = vec!["invalid:broker:format".to_string()];
match KafkaPublisher::new(invalid_brokers) {
Ok(_) => {
assert!(true);
}
Err(_) => assert!(true), }
}
}
#[cfg(test)]
mod mqtt_backend_tests {
use super::*;
#[tokio::test]
async fn test_mqtt_publisher_creation() {
let broker_url = "127.0.0.1";
let qos = crate::mqtt::MqttQoS::AtMostOnce;
match MqttPublisher::new(broker_url, qos) {
Ok(_publisher) => assert!(true),
Err(_) => {
println!("MQTT publisher creation failed - this may be expected");
assert!(true);
}
}
}
#[tokio::test]
async fn test_mqtt_subscriber_creation() {
let broker_url = "127.0.0.1";
let qos = crate::mqtt::MqttQoS::AtMostOnce;
match MqttSubscriber::new(broker_url, qos) {
Ok(_subscriber) => assert!(true),
Err(_) => {
println!("MQTT subscriber creation failed - this may be expected");
assert!(true);
}
}
}
#[test]
fn test_mqtt_qos_levels() {
use crate::mqtt::MqttQoS;
let qos0 = MqttQoS::AtMostOnce;
let qos1 = MqttQoS::AtLeastOnce;
let qos2 = MqttQoS::ExactlyOnce;
assert_ne!(qos0, qos1);
assert_ne!(qos1, qos2);
assert_ne!(qos0, qos2);
}
}
#[cfg(test)]
mod cross_backend_tests {
use super::*;
#[tokio::test]
async fn test_message_creation_consistency() {
let content = "Test message content";
let message = create_single_test_message(content);
assert_eq!(message.payload, content.as_bytes());
assert!(message.metadata.contains_key("content"));
assert!(message.metadata.contains_key("timestamp"));
assert!(!message.uuid.is_empty());
}
#[tokio::test]
async fn test_message_metadata_handling() {
let mut message = create_single_test_message("Metadata test");
message = message.with_metadata("key1", "value1");
message = message.with_metadata("key2", "value2");
assert_eq!(message.metadata.get("key1"), Some(&"value1".to_string()));
assert_eq!(message.metadata.get("key2"), Some(&"value2".to_string()));
message = message.with_metadata("key1", "new_value1");
assert_eq!(message.metadata.get("key1"), Some(&"new_value1".to_string()));
}
#[tokio::test]
async fn test_message_uuid_uniqueness() {
let msg1 = create_single_test_message("Message 1");
let msg2 = create_single_test_message("Message 2");
assert_ne!(msg1.uuid, msg2.uuid);
assert!(!msg1.uuid.is_empty());
assert!(!msg2.uuid.is_empty());
}
#[test]
fn test_backend_error_types() {
use crate::memory::InMemoryError;
use crate::rabbitmq::RabbitMQError;
use crate::kafka::KafkaError;
use crate::mqtt::MqttError;
let mem_error = InMemoryError::BrokerShutdown;
let mem_error2 = InMemoryError::BrokerShutdown;
assert_eq!(mem_error, mem_error2);
let error_string = format!("{}", mem_error);
assert!(!error_string.is_empty());
}
}
#[cfg(test)]
mod performance_unit_tests {
use super::*;
use std::time::Instant;
#[tokio::test]
async fn test_in_memory_publish_performance() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let topic = "perf_topic";
let message_count = 100;
let messages = create_test_messages(message_count, "PerfTest");
let start = Instant::now();
publisher.publish(topic, messages).await.expect("Failed to publish");
let duration = start.elapsed();
assert!(duration.as_millis() < 100, "Publishing should be fast: {:?}", duration);
println!("Published {} messages in {:?}", message_count, duration);
}
#[tokio::test]
async fn test_in_memory_subscribe_performance() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let topic = "perf_sub_topic";
let message_count = 50;
let messages = create_test_messages(message_count, "SubPerfTest");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, messages).await.expect("Failed to publish");
let start = Instant::now();
for _ in 0..message_count {
timeout(Duration::from_secs(1), subscriber.receive()).await
.expect("Timeout receiving message")
.expect("Failed to receive message");
}
let duration = start.elapsed();
println!("Received {} messages in {:?}", message_count, duration);
let throughput = message_count as f64 / duration.as_secs_f64();
assert!(throughput > 100.0, "Throughput should be > 100 msg/sec, got {:.2}", throughput);
}
#[tokio::test]
async fn test_in_memory_ack_performance() {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topic = "ack_perf_topic";
let message_count = 20; let messages = create_test_messages(message_count, "AckPerfTest");
subscriber.subscribe(topic).await.expect("Failed to subscribe");
publisher.publish(topic, messages).await.expect("Failed to publish");
let start = Instant::now();
for _ in 0..message_count {
let (_, handle) = timeout(Duration::from_secs(2), subscriber.receive_with_ack()).await
.expect("Timeout receiving message")
.expect("Failed to receive message");
subscriber.ack(handle).await.expect("Failed to acknowledge");
}
let duration = start.elapsed();
println!("Acknowledged {} messages in {:?}", message_count, duration);
let avg_ack_time = duration / message_count as u32;
assert!(avg_ack_time.as_millis() < 50, "Average ack time should be < 50ms, got {:?}", avg_ack_time);
}
}