#[cfg(test)]
mod integration_tests {
use super::super::*;
use crate::{Publisher, Subscriber, Message};
use std::sync::Arc;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_complete_message_flow() {
let config = InMemoryConfig::new()
.with_max_queue_size(Some(100))
.with_stats(true);
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("orders").await.unwrap();
let orders = vec![
Message::new(b"Order #1: 2x Coffee".to_vec())
.with_metadata("customer", "Alice")
.with_metadata("priority", "high"),
Message::new(b"Order #2: 1x Tea".to_vec())
.with_metadata("customer", "Bob")
.with_metadata("priority", "normal"),
Message::new(b"Order #3: 3x Sandwich".to_vec())
.with_metadata("customer", "Charlie")
.with_metadata("priority", "low"),
];
publisher.publish("orders", orders.clone()).await.unwrap();
for expected_order in &orders {
let received_order = subscriber.receive().await.unwrap();
assert_eq!(received_order.payload, expected_order.payload);
for (key, expected_value) in &expected_order.metadata {
assert_eq!(
received_order.metadata.get(key),
Some(expected_value),
"Metadata key '{}' mismatch", key
);
}
assert!(received_order.metadata.contains_key("_sequence"), "Missing _sequence metadata");
assert!(received_order.metadata.contains_key("_enqueued_at"), "Missing _enqueued_at metadata");
println!("Processed order: {} for customer: {}",
String::from_utf8_lossy(&received_order.payload),
received_order.metadata.get("customer").unwrap_or(&"Unknown".to_string())
);
}
let stats = publisher.stats().unwrap();
assert_eq!(stats.messages_published, 3);
assert_eq!(stats.messages_consumed, 3);
assert_eq!(stats.active_topics, 1);
assert_eq!(stats.active_subscribers, 1);
}
#[tokio::test]
async fn test_multiple_topics_and_subscribers() {
let broker = Arc::new(InMemoryBroker::new(InMemoryConfig::for_testing()));
let order_publisher = InMemoryPublisher::new(broker.clone());
let notification_publisher = InMemoryPublisher::new(broker.clone());
let mut order_processor = InMemorySubscriber::new(broker.clone());
let mut notification_service = InMemorySubscriber::new(broker.clone());
let mut audit_service = InMemorySubscriber::new(broker.clone());
order_processor.subscribe("orders").await.unwrap();
notification_service.subscribe("notifications").await.unwrap();
audit_service.subscribe("orders").await.unwrap();
let order = vec![Message::new(b"New order received".to_vec())];
let notification = vec![Message::new(b"System maintenance scheduled".to_vec())];
order_publisher.publish("orders", order.clone()).await.unwrap();
notification_publisher.publish("notifications", notification.clone()).await.unwrap();
let processed_order = order_processor.receive().await.unwrap();
let audit_order = audit_service.receive().await.unwrap();
let system_notification = notification_service.receive().await.unwrap();
assert_eq!(processed_order.payload, order[0].payload);
assert_eq!(audit_order.payload, order[0].payload);
assert_eq!(system_notification.payload, notification[0].payload);
assert_eq!(broker.topic_count(), 2);
}
#[tokio::test]
async fn test_high_throughput_scenario() {
let config = InMemoryConfig::new()
.with_max_queue_size(Some(10000))
.with_stats(true);
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("high-volume").await.unwrap();
let message_count = 1000;
let mut messages = Vec::with_capacity(message_count);
for i in 0..message_count {
messages.push(
Message::new(format!("Message #{}", i).into_bytes())
.with_metadata("sequence", &i.to_string())
);
}
let batch_size = 100;
for chunk in messages.chunks(batch_size) {
publisher.publish("high-volume", chunk.to_vec()).await.unwrap();
}
let mut received_count = 0;
while received_count < message_count {
let batch = subscriber.try_receive_batch(50).unwrap();
if batch.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
received_count += batch.len();
}
assert_eq!(received_count, message_count);
let stats = publisher.stats().unwrap();
assert_eq!(stats.messages_published as usize, message_count);
assert_eq!(stats.messages_consumed as usize, message_count);
}
#[tokio::test]
async fn test_error_handling_scenarios() {
let config = InMemoryConfig::new()
.with_max_topics(Some(1))
.with_max_queue_size(Some(2));
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("topic1").await.unwrap();
let result = subscriber.subscribe("topic2").await;
assert!(matches!(result, Err(InMemoryError::MaxTopicsReached { .. })));
let messages = vec![
Message::new(b"Message 1".to_vec()),
Message::new(b"Message 2".to_vec()),
Message::new(b"Message 3".to_vec()), ];
let result = publisher.publish("topic1", messages).await;
assert!(matches!(result, Err(InMemoryError::QueueFull { .. })));
let result = publisher.publish("", vec![Message::new(b"test".to_vec())]).await;
assert!(matches!(result, Err(InMemoryError::InvalidTopicName { .. })));
let result = subscriber.subscribe("test\0topic").await;
assert!(matches!(result, Err(InMemoryError::InvalidTopicName { .. })));
}
#[tokio::test]
async fn test_broker_lifecycle() {
let broker = Arc::new(InMemoryBroker::new(InMemoryConfig::for_testing()));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("lifecycle-test").await.unwrap();
assert!(publisher.is_connected());
assert!(subscriber.is_connected());
let message = vec![Message::new(b"Before shutdown".to_vec())];
publisher.publish("lifecycle-test", message).await.unwrap();
let received = subscriber.receive().await.unwrap();
assert_eq!(received.payload, b"Before shutdown");
broker.shutdown().unwrap();
assert!(!publisher.is_connected());
assert!(!subscriber.is_connected());
let message = vec![Message::new(b"After shutdown".to_vec())];
let result = publisher.publish("lifecycle-test", message).await;
assert!(matches!(result, Err(InMemoryError::BrokerShutdown)));
let result = subscriber.receive().await;
assert!(matches!(result, Err(InMemoryError::BrokerShutdown)));
}
#[tokio::test]
async fn test_concurrent_operations() {
let broker = Arc::new(InMemoryBroker::new(InMemoryConfig::for_testing()));
let publishers: Vec<_> = (0..5)
.map(|_| InMemoryPublisher::new(broker.clone()))
.collect();
let mut subscribers: Vec<_> = (0..3)
.map(|_| InMemorySubscriber::new(broker.clone()))
.collect();
for subscriber in &mut subscribers {
subscriber.subscribe("concurrent-test").await.unwrap();
}
let publish_tasks: Vec<_> = publishers.into_iter().enumerate().map(|(i, publisher)| {
tokio::spawn(async move {
let messages = vec![
Message::new(format!("Message from publisher {}", i).into_bytes())
];
publisher.publish("concurrent-test", messages).await.unwrap();
})
}).collect();
for task in publish_tasks {
task.await.unwrap();
}
for subscriber in &mut subscribers {
for _ in 0..5 {
let message = timeout(Duration::from_secs(1), subscriber.receive()).await.unwrap().unwrap();
assert!(String::from_utf8_lossy(&message.payload).contains("Message from publisher"));
}
}
assert_eq!(broker.topic_count(), 1);
}
#[tokio::test]
async fn test_message_metadata_preservation() {
let broker = Arc::new(InMemoryBroker::new(InMemoryConfig::for_testing()));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("metadata-test").await.unwrap();
let original_message = Message::new(b"Test payload".to_vec())
.with_metadata("content-type", "application/json")
.with_metadata("timestamp", "2024-01-15T10:30:00Z")
.with_metadata("source", "order-service")
.with_metadata("version", "1.0")
.with_metadata("priority", "high");
publisher.publish("metadata-test", vec![original_message.clone()]).await.unwrap();
let received_message = subscriber.receive().await.unwrap();
assert_eq!(received_message.payload, original_message.payload);
assert_eq!(received_message.uuid, original_message.uuid);
for (key, expected_value) in &original_message.metadata {
assert_eq!(
received_message.metadata.get(key),
Some(expected_value),
"Metadata key '{}' mismatch", key
);
}
assert!(received_message.metadata.contains_key("_sequence"), "Missing _sequence metadata");
assert!(received_message.metadata.contains_key("_enqueued_at"), "Missing _enqueued_at metadata");
assert_eq!(received_message.metadata.get("content-type"), Some(&"application/json".to_string()));
assert_eq!(received_message.metadata.get("priority"), Some(&"high".to_string()));
assert_eq!(received_message.metadata.get("source"), Some(&"order-service".to_string()));
}
}