#[cfg(test)]
mod tests {
use super::super::*;
use crate::{Publisher, Subscriber, Message};
use std::sync::Arc;
use tokio::time::{Duration, sleep};
#[tokio::test]
async fn test_message_ordering() {
let config = InMemoryConfig::new()
.with_maintain_order(true)
.with_stats(true);
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("ordered-topic").await.unwrap();
let messages = vec![
Message::new(b"Message 1".to_vec()),
Message::new(b"Message 2".to_vec()),
Message::new(b"Message 3".to_vec()),
Message::new(b"Message 4".to_vec()),
Message::new(b"Message 5".to_vec()),
];
publisher.publish("ordered-topic", messages.clone()).await.unwrap();
for (i, expected_msg) in messages.iter().enumerate() {
let received = subscriber.receive().await.unwrap();
assert_eq!(received.payload, expected_msg.payload);
if let Some(sequence) = received.metadata.get("_sequence") {
assert_eq!(sequence, &(i + 1).to_string());
}
assert!(received.metadata.contains_key("_enqueued_at"));
}
}
#[tokio::test]
async fn test_message_ttl_cleanup() {
let config = InMemoryConfig::new()
.with_message_ttl(Some(Duration::from_millis(50)))
.with_cleanup_interval(Duration::from_millis(25));
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let messages = vec![
Message::new(b"Message 1".to_vec()),
Message::new(b"Message 2".to_vec()),
];
publisher.publish("ttl-topic", messages).await.unwrap();
let topic_info = broker.topic_info("ttl-topic").unwrap();
assert_eq!(topic_info.queue_size, 2);
sleep(Duration::from_millis(100)).await;
let topic_info = broker.topic_info("ttl-topic").unwrap();
assert_eq!(topic_info.queue_size, 0);
}
#[tokio::test]
async fn test_broker_health_check() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
let health = broker.health_check();
assert!(health.is_healthy);
assert_eq!(health.topic_count, 0);
assert_eq!(health.total_queued_messages, 0);
assert_eq!(health.total_subscribers, 0);
subscriber.subscribe("health-topic").await.unwrap();
let messages = vec![
Message::new(b"Health check message 1".to_vec()),
Message::new(b"Health check message 2".to_vec()),
];
publisher.publish("health-topic", messages).await.unwrap();
let health = broker.health_check();
assert!(health.is_healthy);
assert_eq!(health.topic_count, 1);
assert_eq!(health.total_queued_messages, 2);
assert_eq!(health.total_subscribers, 1);
assert!(health.memory_usage_estimate > 0);
assert!(health.uptime.as_millis() >= 0);
assert!(!health.is_overloaded(10, 10)); assert!(health.is_overloaded(0, 1)); }
#[tokio::test]
async fn test_topic_info_enhanced() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("info-topic").await.unwrap();
let messages = vec![
Message::new(b"Info message 1".to_vec()),
Message::new(b"Info message 2".to_vec()),
];
publisher.publish("info-topic", messages).await.unwrap();
let topic_info = broker.topic_info("info-topic").unwrap();
assert_eq!(topic_info.name, "info-topic");
assert_eq!(topic_info.queue_size, 2);
assert_eq!(topic_info.subscriber_count, 1);
assert_eq!(topic_info.total_published, 2);
assert_eq!(topic_info.total_consumed, 0);
assert!(topic_info.age().as_millis() >= 0); assert!(topic_info.idle_time().as_millis() >= 0);
assert!(topic_info.is_active());
assert!(topic_info.throughput_rate() >= 0.0); assert!(topic_info.next_sequence > 0);
subscriber.receive().await.unwrap();
let topic_info = broker.topic_info("info-topic").unwrap();
assert_eq!(topic_info.total_consumed, 0); assert_eq!(topic_info.queue_size, 2); }
#[tokio::test]
async fn test_idle_topic_cleanup() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
publisher.publish("active-topic", vec![Message::new(b"active".to_vec())]).await.unwrap();
publisher.publish("idle-topic", vec![Message::new(b"idle".to_vec())]).await.unwrap();
assert_eq!(broker.topic_count(), 2);
let consumed = broker.consume("idle-topic").unwrap();
assert!(consumed.is_some());
sleep(Duration::from_millis(10)).await;
let removed = broker.cleanup_idle_topics(Duration::from_millis(5)).unwrap();
assert!(removed.len() >= 1, "Expected at least 1 topic to be removed, got {}", removed.len());
assert!(broker.topic_count() <= 1, "Expected at most 1 topic to remain, got {}", broker.topic_count());
let remaining_topics = broker.list_topics();
assert!(remaining_topics.len() <= 2, "Too many topics remaining: {}", remaining_topics.len());
}
#[tokio::test]
async fn test_graceful_shutdown() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker.clone());
subscriber.subscribe("shutdown-topic").await.unwrap();
let messages = vec![
Message::new(b"Message before shutdown".to_vec()),
];
publisher.publish("shutdown-topic", messages).await.unwrap();
assert!(publisher.is_connected());
assert_eq!(broker.topic_count(), 1);
broker.force_shutdown();
assert!(!publisher.is_connected());
assert_eq!(broker.topic_count(), 0);
let result = publisher.publish("test", vec![Message::new(b"test".to_vec())]).await;
assert!(matches!(result, Err(InMemoryError::BrokerShutdown)));
}
#[tokio::test]
async fn test_force_shutdown() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
publisher.publish("force-topic", vec![Message::new(b"test".to_vec())]).await.unwrap();
assert_eq!(broker.topic_count(), 1);
let shutdown_result = broker.force_shutdown();
assert!(shutdown_result.is_ok());
assert!(!publisher.is_connected());
assert_eq!(broker.topic_count(), 0);
}
#[tokio::test]
async fn test_list_topic_info() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber1 = InMemorySubscriber::new(broker.clone());
let mut subscriber2 = InMemorySubscriber::new(broker.clone());
subscriber1.subscribe("topic1").await.unwrap();
subscriber2.subscribe("topic2").await.unwrap();
publisher.publish("topic1", vec![Message::new(b"msg1".to_vec())]).await.unwrap();
publisher.publish("topic2", vec![
Message::new(b"msg2a".to_vec()),
Message::new(b"msg2b".to_vec()),
]).await.unwrap();
let topic_infos = broker.list_topic_info();
assert_eq!(topic_infos.len(), 2);
let topic1_info = topic_infos.iter().find(|t| t.name == "topic1").unwrap();
let topic2_info = topic_infos.iter().find(|t| t.name == "topic2").unwrap();
assert_eq!(topic1_info.queue_size, 1);
assert_eq!(topic1_info.subscriber_count, 1);
assert_eq!(topic1_info.total_published, 1);
assert_eq!(topic2_info.queue_size, 2);
assert_eq!(topic2_info.subscriber_count, 1);
assert_eq!(topic2_info.total_published, 2);
assert!(topic1_info.is_active());
assert!(topic2_info.is_active());
}
#[tokio::test]
async fn test_memory_usage_estimation() {
let config = InMemoryConfig::for_testing();
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());
let initial_health = broker.health_check();
let initial_memory = initial_health.memory_usage_estimate;
let large_messages = vec![
Message::new(vec![b'A'; 1000]) .with_metadata("large", "metadata_value"),
Message::new(vec![b'B'; 2000]) .with_metadata("even_larger", "another_metadata_value"),
];
publisher.publish("memory-test", large_messages).await.unwrap();
let after_health = broker.health_check();
let after_memory = after_health.memory_usage_estimate;
assert!(after_memory > initial_memory);
assert!(after_health.memory_usage_mb() > 0.0);
let memory_increase = after_memory - initial_memory;
assert!(memory_increase >= 3000); }
#[tokio::test]
async fn test_concurrent_advanced_operations() {
let config = InMemoryConfig::new()
.with_stats(true)
.with_maintain_order(true);
let broker = Arc::new(InMemoryBroker::new(config));
let broker_clone = broker.clone();
let health_task = tokio::spawn(async move {
for _ in 0..3 { let health = broker_clone.health_check();
assert!(health.is_healthy);
sleep(Duration::from_millis(5)).await; }
});
let broker_clone = broker.clone();
let cleanup_task = tokio::spawn(async move {
for _ in 0..2 { let _removed = broker_clone.cleanup_idle_topics(Duration::from_millis(1)).unwrap();
sleep(Duration::from_millis(5)).await; }
});
let broker_clone = broker.clone();
let info_task = tokio::spawn(async move {
for _ in 0..3 { let _infos = broker_clone.list_topic_info();
sleep(Duration::from_millis(5)).await; }
});
let publisher = InMemoryPublisher::new(broker.clone());
for i in 0..5 { let msg = vec![Message::new(format!("concurrent-{}", i).into_bytes())];
publisher.publish("concurrent-topic", msg).await.unwrap();
}
health_task.await.unwrap();
cleanup_task.await.unwrap();
info_task.await.unwrap();
let final_health = broker.health_check();
assert!(final_health.is_healthy);
}
}