use super::ack::{KafkaAckHandle, KafkaAckSubscriber};
use crate::ack::{AckHandle, AckSubscriber};
use crate::Message;
use std::time::SystemTime;
use tokio::time::Duration;
#[cfg(test)]
mod kafka_ack_handle_tests {
use super::*;
#[test]
fn test_kafka_ack_handle_creation() {
let message_id = "test-message-123".to_string();
let topic = "test-topic".to_string();
let partition = 2;
let offset = 12345;
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), offset);
assert_eq!(handle.delivery_count(), delivery_count);
assert_eq!(handle.timestamp(), timestamp);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_kafka_ack_handle_retry_detection() {
let message_id = "test-message-retry".to_string();
let topic = "test-topic".to_string();
let partition = 0;
let offset = 100;
let timestamp = SystemTime::now();
let handle_first = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
1, timestamp,
);
assert!(!handle_first.is_retry());
let handle_retry = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
2, timestamp,
);
assert!(handle_retry.is_retry());
let handle_multiple_retry = KafkaAckHandle::new(
message_id,
topic,
partition,
offset,
5, timestamp,
);
assert!(handle_multiple_retry.is_retry());
}
#[test]
fn test_kafka_ack_handle_uniqueness() {
let message_id = "test-message-unique".to_string();
let topic = "test-topic".to_string();
let partition = 1;
let offset = 999;
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle1 = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
delivery_count,
timestamp,
);
let handle2 = KafkaAckHandle::new(
message_id,
topic,
partition,
offset,
delivery_count,
timestamp,
);
assert_ne!(handle1.handle_id(), handle2.handle_id());
}
#[test]
fn test_kafka_ack_handle_partition_offset_combinations() {
let message_id = "test-message".to_string();
let topic = "test-topic".to_string();
let timestamp = SystemTime::now();
let test_cases = vec![
(0, 0), (0, 1000), (5, 0), (10, 5000), (255, u64::MAX), ];
for (partition, offset) in test_cases {
let handle = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
1,
timestamp,
);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), offset);
assert_eq!(handle.topic(), topic);
}
}
#[test]
fn test_kafka_ack_handle_properties() {
let message_id = "test-message-props".to_string();
let topic = "test-topic-props".to_string();
let partition = 3;
let offset = 7777;
let delivery_count = 4;
let timestamp = SystemTime::now();
let handle = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(handle.is_retry()); assert_eq!(handle.timestamp(), timestamp);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), offset);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_kafka_ack_handle_edge_cases() {
let message_id = "".to_string(); let topic = "".to_string(); let partition = 0;
let offset = 0;
let delivery_count = 0; let timestamp = SystemTime::now();
let handle = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
delivery_count,
timestamp,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), offset);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(!handle.is_retry()); }
}
#[cfg(test)]
mod kafka_ack_subscriber_tests {
use super::*;
#[tokio::test]
async fn test_kafka_ack_subscriber_creation() {
let brokers = vec!["localhost:9092".to_string()];
let group_id = "test-group".to_string();
let result = KafkaAckSubscriber::new(brokers, group_id).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("connection") ||
error.to_string().contains("Connection") ||
error.to_string().contains("broker") ||
error.to_string().contains("timeout") ||
error.to_string().contains("resolve"));
}
#[tokio::test]
async fn test_kafka_ack_subscriber_invalid_brokers() {
let invalid_broker_configs = vec![
(vec![], "empty-group".to_string()), (vec!["".to_string()], "test-group".to_string()), (vec!["invalid-broker".to_string()], "test-group".to_string()), (vec!["localhost:99999".to_string()], "test-group".to_string()), ];
for (brokers, group_id) in invalid_broker_configs {
let result = KafkaAckSubscriber::new(brokers.clone(), group_id.clone()).await;
assert!(result.is_err(), "Expected error for brokers: {:?}, group: {}", brokers, group_id);
}
}
#[tokio::test]
async fn test_kafka_ack_subscriber_invalid_group_id() {
let brokers = vec!["localhost:9092".to_string()];
let invalid_group_ids = vec![
"".to_string(), ];
for group_id in invalid_group_ids {
let result = KafkaAckSubscriber::new(brokers.clone(), group_id.clone()).await;
assert!(result.is_err(), "Expected error for group ID: {}", group_id);
}
}
#[test]
fn test_kafka_broker_string_validation() {
let valid_broker_strings = vec![
"localhost:9092",
"127.0.0.1:9092",
"kafka.example.com:9092",
"kafka1.cluster.local:9092",
"10.0.0.1:9092",
];
for broker_string in valid_broker_strings {
assert!(!broker_string.is_empty());
assert!(broker_string.contains(':'));
let parts: Vec<&str> = broker_string.split(':').collect();
assert_eq!(parts.len(), 2);
assert!(!parts[0].is_empty()); assert!(parts[1].parse::<u16>().is_ok()); }
}
}
#[cfg(test)]
mod kafka_error_handling_tests {
use super::*;
#[tokio::test]
async fn test_kafka_connection_timeout() {
let brokers = vec!["192.0.2.1:9092".to_string()]; let group_id = "timeout-test-group".to_string();
let start_time = std::time::Instant::now();
let result = KafkaAckSubscriber::new(brokers, group_id).await;
let elapsed = start_time.elapsed();
assert!(result.is_err());
assert!(elapsed < Duration::from_secs(30));
}
#[tokio::test]
async fn test_kafka_malformed_broker_addresses() {
let malformed_brokers = vec![
vec!["not-a-broker".to_string()],
vec!["localhost".to_string()], vec![":9092".to_string()], vec!["localhost:".to_string()], vec!["localhost:abc".to_string()], ];
for brokers in malformed_brokers {
let result = KafkaAckSubscriber::new(brokers.clone(), "test-group".to_string()).await;
assert!(result.is_err(), "Expected error for malformed brokers: {:?}", brokers);
}
}
}
#[cfg(test)]
mod kafka_integration_unit_tests {
use super::*;
#[test]
fn test_message_to_kafka_handle_conversion() {
let message = Message::new(b"test payload".to_vec())
.with_metadata("partition", "2")
.with_metadata("offset", "12345")
.with_metadata("delivery_count", "1")
.with_metadata("topic", "test-topic");
let partition: i32 = message.metadata
.get("partition")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let offset: u64 = message.metadata
.get("offset")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let delivery_count: u32 = message.metadata
.get("delivery_count")
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let topic = message.metadata
.get("topic")
.unwrap_or(&"default".to_string())
.clone();
assert_eq!(partition, 2);
assert_eq!(offset, 12345);
assert_eq!(delivery_count, 1);
assert_eq!(topic, "test-topic");
let handle = KafkaAckHandle::new(
message.uuid.clone(),
topic,
partition,
offset,
delivery_count,
SystemTime::now(),
);
assert_eq!(handle.message_id(), message.uuid);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), offset);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(!handle.is_retry()); }
#[test]
fn test_batch_acknowledgment_logic() {
let handles = vec![
KafkaAckHandle::new("msg1".to_string(), "topic".to_string(), 0, 100, 1, SystemTime::now()),
KafkaAckHandle::new("msg2".to_string(), "topic".to_string(), 0, 101, 1, SystemTime::now()),
KafkaAckHandle::new("msg3".to_string(), "topic".to_string(), 0, 102, 1, SystemTime::now()),
KafkaAckHandle::new("msg4".to_string(), "topic".to_string(), 1, 200, 1, SystemTime::now()),
KafkaAckHandle::new("msg5".to_string(), "topic".to_string(), 1, 201, 1, SystemTime::now()),
];
let mut partition_offsets = std::collections::HashMap::new();
for handle in &handles {
let current_max = partition_offsets.get(&handle.partition()).unwrap_or(&0);
if handle.offset() > *current_max {
partition_offsets.insert(handle.partition(), handle.offset());
}
}
assert_eq!(partition_offsets.get(&0), Some(&102));
assert_eq!(partition_offsets.get(&1), Some(&201));
let all_same_topic = handles.iter()
.all(|h| h.topic() == "topic");
assert!(all_same_topic);
}
#[test]
fn test_offset_management_logic() {
let topic = "test-topic".to_string();
let partition = 0;
let timestamp = SystemTime::now();
let test_cases = vec![
(0, 1, 1), (100, 1, 101), (999, 2, 1000), ];
for (offset, delivery_count, expected_commit) in test_cases {
let handle = KafkaAckHandle::new(
format!("msg-{}", offset),
topic.clone(),
partition,
offset,
delivery_count,
timestamp,
);
let commit_offset = handle.offset() + 1;
assert_eq!(commit_offset, expected_commit);
}
}
#[test]
fn test_consumer_group_logic() {
let group_ids = vec![
"test-group-1".to_string(),
"test-group-2".to_string(),
"production-consumers".to_string(),
"analytics-pipeline".to_string(),
];
for group_id in group_ids {
assert!(!group_id.is_empty());
assert!(!group_id.contains('\0'));
assert!(group_id.len() < 256);
}
}
#[test]
fn test_partition_assignment_logic() {
let topic = "multi-partition-topic".to_string();
let partitions = vec![0, 1, 2, 3, 4]; let timestamp = SystemTime::now();
for partition in partitions {
let handle = KafkaAckHandle::new(
format!("msg-p{}", partition),
topic.clone(),
partition,
1000 + partition as u64, 1,
timestamp,
);
assert_eq!(handle.partition(), partition);
assert_eq!(handle.offset(), 1000 + partition as u64);
assert_eq!(handle.topic(), topic);
}
}
#[test]
fn test_delivery_count_tracking() {
let message_id = "retry-test".to_string();
let topic = "test-topic".to_string();
let partition = 0;
let offset = 500;
let deliveries = vec![
(1, false), (2, true), (3, true), (10, true), ];
for (count, expected_retry) in deliveries {
let handle = KafkaAckHandle::new(
message_id.clone(),
topic.clone(),
partition,
offset,
count,
SystemTime::now(),
);
assert_eq!(handle.is_retry(), expected_retry,
"Delivery count {} should have retry={}", count, expected_retry);
assert_eq!(handle.delivery_count(), count);
}
}
}
#[cfg(test)]
mod kafka_performance_unit_tests {
use super::*;
#[test]
fn test_handle_creation_performance() {
let start = std::time::Instant::now();
let count = 1000;
for i in 0..count {
let _handle = KafkaAckHandle::new(
format!("message-{}", i),
"test-topic".to_string(),
(i % 5) as i32, i as u64,
1,
SystemTime::now(),
);
}
let elapsed = start.elapsed();
let per_handle = elapsed / count;
assert!(per_handle < Duration::from_millis(1),
"Handle creation took too long: {:?} per handle", per_handle);
}
#[test]
fn test_handle_method_call_performance() {
let handle = KafkaAckHandle::new(
"perf-test".to_string(),
"test-topic".to_string(),
2,
12345,
3,
SystemTime::now(),
);
let start = std::time::Instant::now();
let iterations = 10000;
for _ in 0..iterations {
let _ = handle.message_id();
let _ = handle.topic();
let _ = handle.partition();
let _ = handle.offset();
let _ = handle.delivery_count();
let _ = handle.is_retry();
let _ = handle.timestamp();
let _ = handle.handle_id();
}
let elapsed = start.elapsed();
let per_call = elapsed / (iterations * 8);
assert!(per_call < Duration::from_micros(1),
"Method calls took too long: {:?} per call", per_call);
}
#[test]
fn test_offset_calculation_performance() {
let handles: Vec<KafkaAckHandle> = (0..1000)
.map(|i| KafkaAckHandle::new(
format!("message-{}", i),
"test-topic".to_string(),
(i % 10) as i32, i as u64,
1,
SystemTime::now(),
))
.collect();
let start = std::time::Instant::now();
let mut partition_offsets = std::collections::HashMap::new();
for handle in &handles {
let current_max = partition_offsets.get(&handle.partition()).unwrap_or(&0);
if handle.offset() > *current_max {
partition_offsets.insert(handle.partition(), handle.offset());
}
}
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(10),
"Offset calculation took too long: {:?}", elapsed);
assert_eq!(partition_offsets.len(), 10); for i in 0..10 {
let expected_max = ((999 / 10) * 10 + i) as u64; assert!(partition_offsets.get(&(i as i32)).unwrap() >= &expected_max);
}
}
}