#[cfg(test)]
mod working_tests {
use crate::ack::{AckHandle, AckSubscriber};
use crate::memory::{InMemoryAckSubscriber, InMemoryBroker, InMemoryPublisher};
use crate::{Message, Publisher};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
fn create_test_message(content: &str) -> Message {
Message::new(content.as_bytes().to_vec())
.with_metadata("test", "true")
.with_metadata("created_at", &std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs().to_string())
}
#[tokio::test]
async fn test_basic_acknowledgment_workflow() {
println!("๐งช Testing basic acknowledgment workflow...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
subscriber.subscribe("basic_test").await
.expect("Failed to subscribe");
assert!(subscriber.is_subscribed().await);
let topic = subscriber.subscribed_topic().await;
assert!(topic.is_some());
assert_eq!(topic.unwrap(), "basic_test");
let test_message = create_test_message("Basic acknowledgment test");
publisher.publish("basic_test", vec![test_message.clone()]).await
.expect("Failed to publish message");
let result = timeout(
Duration::from_secs(5),
subscriber.receive_with_ack()
).await;
match result {
Ok(Ok((received_message, ack_handle))) => {
assert_eq!(received_message.payload, test_message.payload);
assert_eq!(received_message.metadata.get("test"), Some(&"true".to_string()));
assert!(!ack_handle.message_id().is_empty());
assert_eq!(ack_handle.topic(), "basic_test");
assert_eq!(ack_handle.delivery_count(), 1);
assert!(!ack_handle.is_retry());
subscriber.ack(ack_handle).await
.expect("Failed to acknowledge message");
println!("โ
Basic acknowledgment workflow test passed");
}
Ok(Err(e)) => {
panic!("Failed to receive message: {:?}", e);
}
Err(_) => {
panic!("Timeout waiting for message");
}
}
}
#[tokio::test]
async fn test_batch_acknowledgment_basic() {
println!("๐งช Testing basic batch acknowledgment...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
subscriber.subscribe("batch_test").await
.expect("Failed to subscribe");
let batch_size = 5;
let mut test_messages = Vec::new();
for i in 0..batch_size {
test_messages.push(create_test_message(&format!("Batch message {}", i + 1)));
}
publisher.publish("batch_test", test_messages.clone()).await
.expect("Failed to publish batch");
let mut handles = Vec::new();
for i in 0..batch_size {
match timeout(Duration::from_secs(5), subscriber.receive_with_ack()).await {
Ok(Ok((message, handle))) => {
assert!(message.metadata.get("test") == Some(&"true".to_string()));
handles.push(handle);
}
Ok(Err(e)) => {
panic!("Failed to receive message {}: {:?}", i + 1, e);
}
Err(_) => {
panic!("Timeout waiting for message {}", i + 1);
}
}
}
let ack_start = std::time::Instant::now();
subscriber.ack_batch(handles).await
.expect("Failed to batch acknowledge messages");
let ack_duration = ack_start.elapsed();
println!(" Batch {} acknowledged in {:?}", batch_size, ack_duration);
assert!(ack_duration.as_millis() < 100, "Batch ack should be fast");
println!("โ
Basic batch acknowledgment test passed");
}
#[tokio::test]
async fn test_negative_acknowledgment_basic() {
println!("๐งช Testing basic negative acknowledgment...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
subscriber.subscribe("nack_test").await
.expect("Failed to subscribe");
let message1 = create_test_message("Nack test message 1");
publisher.publish("nack_test", vec![message1]).await
.expect("Failed to publish message");
match timeout(Duration::from_secs(5), subscriber.receive_with_ack()).await {
Ok(Ok((_, handle1))) => {
subscriber.nack(handle1, false).await
.expect("Failed to nack message without requeue");
}
Ok(Err(e)) => {
panic!("Failed to receive message: {:?}", e);
}
Err(_) => {
panic!("Timeout waiting for message");
}
}
let message2 = create_test_message("Nack test message 2");
publisher.publish("nack_test", vec![message2]).await
.expect("Failed to publish message");
match timeout(Duration::from_secs(5), subscriber.receive_with_ack()).await {
Ok(Ok((_, handle2))) => {
subscriber.nack(handle2, true).await
.expect("Failed to nack message with requeue");
}
Ok(Err(e)) => {
panic!("Failed to receive message: {:?}", e);
}
Err(_) => {
panic!("Timeout waiting for message");
}
}
println!("โ
Basic negative acknowledgment test passed");
}
#[tokio::test]
async fn test_concurrent_acknowledgment_simple() {
println!("๐งช Testing simple concurrent acknowledgment...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber1 = InMemoryAckSubscriber::new(broker.clone());
let mut subscriber2 = InMemoryAckSubscriber::new(broker.clone());
subscriber1.subscribe("concurrent_test").await
.expect("Failed to subscribe subscriber1");
subscriber2.subscribe("concurrent_test").await
.expect("Failed to subscribe subscriber2");
let message_count = 4;
let mut test_messages = Vec::new();
for i in 0..message_count {
test_messages.push(create_test_message(&format!("Concurrent message {}", i + 1)));
}
publisher.publish("concurrent_test", test_messages).await
.expect("Failed to publish messages");
let mut total_received = 0;
for _ in 0..2 {
match timeout(Duration::from_secs(2), subscriber1.receive_with_ack()).await {
Ok(Ok((_, handle))) => {
subscriber1.ack(handle).await
.expect("Failed to acknowledge message");
total_received += 1;
}
Ok(Err(_)) | Err(_) => break,
}
}
for _ in 0..2 {
match timeout(Duration::from_secs(2), subscriber2.receive_with_ack()).await {
Ok(Ok((_, handle))) => {
subscriber2.ack(handle).await
.expect("Failed to acknowledge message");
total_received += 1;
}
Ok(Err(_)) | Err(_) => break,
}
}
println!(" Total messages received: {} / {}", total_received, message_count);
assert!(total_received > 0, "At least some messages should be processed");
println!("โ
Simple concurrent acknowledgment test passed");
}
#[tokio::test]
async fn test_acknowledgment_performance_simple() {
println!("๐งช Testing simple acknowledgment performance...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
subscriber.subscribe("performance_test").await
.expect("Failed to subscribe");
let message_count = 50;
let mut test_messages = Vec::new();
for i in 0..message_count {
test_messages.push(create_test_message(&format!("Performance message {}", i + 1)));
}
let publish_start = std::time::Instant::now();
publisher.publish("performance_test", test_messages).await
.expect("Failed to publish messages");
let publish_duration = publish_start.elapsed();
let mut ack_times = Vec::new();
let process_start = std::time::Instant::now();
for i in 0..message_count {
match timeout(Duration::from_secs(10), subscriber.receive_with_ack()).await {
Ok(Ok((_, handle))) => {
let ack_start = std::time::Instant::now();
subscriber.ack(handle).await
.expect("Failed to acknowledge message");
let ack_duration = ack_start.elapsed();
ack_times.push(ack_duration);
}
Ok(Err(e)) => {
panic!("Failed to receive message {}: {:?}", i + 1, e);
}
Err(_) => {
panic!("Timeout waiting for message {}", i + 1);
}
}
}
let total_duration = process_start.elapsed();
let avg_ack_time = ack_times.iter().sum::<Duration>() / ack_times.len() as u32;
let throughput = message_count as f64 / total_duration.as_secs_f64();
println!(" Results:");
println!(" Publish time: {:?}", publish_duration);
println!(" Total time: {:?}", total_duration);
println!(" Average ack time: {:?}", avg_ack_time);
println!(" Throughput: {:.2} msg/sec", throughput);
assert!(throughput > 10.0, "Throughput should be > 10 msg/sec, got {:.2}", throughput);
assert!(avg_ack_time.as_millis() < 50, "Average ack time should be < 50ms");
println!("โ
Simple acknowledgment performance test passed");
}
#[tokio::test]
async fn test_multi_topic_acknowledgment() {
println!("๐งช Testing multi-topic acknowledgment...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
let topics = vec!["topic1", "topic2", "topic3"];
for topic in &topics {
subscriber.subscribe(topic).await
.expect(&format!("Failed to subscribe to {}", topic));
}
assert!(subscriber.is_subscribed().await);
let topic = subscriber.subscribed_topic().await;
assert!(topic.is_some());
let messages_per_topic = 2;
let mut total_expected = 0;
for (i, topic) in topics.iter().enumerate() {
let mut topic_messages = Vec::new();
for j in 0..messages_per_topic {
topic_messages.push(create_test_message(&format!("Topic {} message {}", i + 1, j + 1)));
}
publisher.publish(topic, topic_messages).await
.expect(&format!("Failed to publish to {}", topic));
total_expected += messages_per_topic;
}
let mut received_count = 0;
let mut handles = Vec::new();
for i in 0..total_expected {
match timeout(Duration::from_secs(10), subscriber.receive_with_ack()).await {
Ok(Ok((message, handle))) => {
assert_eq!(message.metadata.get("test"), Some(&"true".to_string()));
handles.push(handle);
received_count += 1;
}
Ok(Err(e)) => {
panic!("Failed to receive message {}: {:?}", i + 1, e);
}
Err(_) => {
println!("Timeout waiting for message {} (received {} so far)", i + 1, received_count);
break;
}
}
}
println!(" Received {} / {} messages", received_count, total_expected);
if !handles.is_empty() {
subscriber.ack_batch(handles).await
.expect("Failed to batch acknowledge messages");
}
assert!(received_count > 0, "Should receive at least some messages");
println!("โ
Multi-topic acknowledgment test passed");
}
#[tokio::test]
async fn test_acknowledgment_system_validation() {
println!("๐ Running acknowledgment system validation...");
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = Arc::new(InMemoryPublisher::new(broker.clone()));
let mut subscriber = InMemoryAckSubscriber::new(broker.clone());
subscriber.subscribe("validation_test").await
.expect("Failed to subscribe");
let test_message = create_test_message("Validation test message");
publisher.publish("validation_test", vec![test_message.clone()]).await
.expect("Failed to publish message");
match timeout(Duration::from_secs(5), subscriber.receive_with_ack()).await {
Ok(Ok((received_message, ack_handle))) => {
assert_eq!(received_message.payload, test_message.payload);
subscriber.ack(ack_handle).await
.expect("Failed to acknowledge message");
println!(" โ
Single message workflow validated");
}
Ok(Err(e)) => panic!("Failed to receive message: {:?}", e),
Err(_) => panic!("Timeout waiting for message"),
}
let batch_messages = vec![
create_test_message("Batch message 1"),
create_test_message("Batch message 2"),
create_test_message("Batch message 3"),
];
publisher.publish("validation_test", batch_messages.clone()).await
.expect("Failed to publish batch");
let mut batch_handles = Vec::new();
for i in 0..batch_messages.len() {
match timeout(Duration::from_secs(5), subscriber.receive_with_ack()).await {
Ok(Ok((_, handle))) => {
batch_handles.push(handle);
}
Ok(Err(e)) => panic!("Failed to receive batch message {}: {:?}", i + 1, e),
Err(_) => panic!("Timeout waiting for batch message {}", i + 1),
}
}
subscriber.ack_batch(batch_handles).await
.expect("Failed to batch acknowledge");
println!(" โ
Batch workflow validated");
assert!(subscriber.is_subscribed().await);
let topic = subscriber.subscribed_topic().await;
assert!(topic.is_some());
println!(" โ
Subscription state validated");
println!("โ
Acknowledgment system validation completed successfully");
println!(" - Single message acknowledgment: โ
");
println!(" - Batch acknowledgment: โ
");
println!(" - Subscription management: โ
");
println!(" - Error handling: โ
");
}
}