use super::ack::{MQTTAckHandle, MQTTAckSubscriber};
use crate::ack::{AckHandle, AckSubscriber};
use crate::Message;
use rumqttc::QoS;
use std::time::SystemTime;
use tokio::time::Duration;
#[cfg(test)]
mod mqtt_ack_handle_tests {
use super::*;
#[test]
fn test_mqtt_ack_handle_creation() {
let message_id = "test-message-123".to_string();
let topic = "sensors/temperature".to_string();
let qos = QoS::AtLeastOnce;
let packet_id = Some(42);
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
timestamp,
delivery_count,
qos,
packet_id,
);
assert_eq!(handle.message_id(), message_id);
assert_eq!(handle.topic(), topic);
assert_eq!(handle.qos(), qos);
assert_eq!(handle.packet_id(), packet_id);
assert_eq!(handle.delivery_count(), delivery_count);
assert_eq!(handle.timestamp(), timestamp);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_mqtt_ack_handle_qos_levels() {
let message_id = "qos-test".to_string();
let topic = "test/topic".to_string();
let delivery_count = 1;
let timestamp = SystemTime::now();
let qos_test_cases = vec![
(QoS::AtMostOnce, None, false), (QoS::AtLeastOnce, Some(100), true), (QoS::ExactlyOnce, Some(200), true), ];
for (qos, packet_id, requires_ack) in qos_test_cases {
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
qos,
packet_id,
delivery_count,
timestamp,
);
assert_eq!(handle.qos(), qos);
assert_eq!(handle.packet_id(), packet_id);
assert_eq!(handle.requires_ack(), requires_ack);
}
}
#[test]
fn test_mqtt_ack_handle_retry_detection() {
let message_id = "retry-test".to_string();
let topic = "test/retry".to_string();
let qos = QoS::AtLeastOnce;
let packet_id = Some(50);
let timestamp = SystemTime::now();
let handle_first = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
timestamp,
1, qos,
packet_id,
);
assert!(!handle_first.is_retry());
let handle_retry = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
qos,
packet_id,
2, timestamp,
);
assert!(handle_retry.is_retry());
let handle_multiple_retry = MQTTAckHandle::new(
message_id,
topic,
qos,
packet_id,
5, timestamp,
);
assert!(handle_multiple_retry.is_retry());
}
#[test]
fn test_mqtt_ack_handle_uniqueness() {
let message_id = "unique-test".to_string();
let topic = "test/unique".to_string();
let qos = QoS::ExactlyOnce;
let packet_id = Some(999);
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle1 = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
qos,
packet_id,
delivery_count,
timestamp,
);
let handle2 = MQTTAckHandle::new(
message_id,
topic,
qos,
packet_id,
delivery_count,
timestamp,
);
assert_ne!(handle1.handle_id(), handle2.handle_id());
}
#[test]
fn test_mqtt_ack_handle_requires_ack_logic() {
let message_id = "ack-logic-test".to_string();
let topic = "test/ack".to_string();
let delivery_count = 1;
let timestamp = SystemTime::now();
let handle_qos0 = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
timestamp,
delivery_count,
QoS::AtMostOnce,
None, );
assert!(!handle_qos0.requires_ack());
let handle_qos1 = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
timestamp,
delivery_count,
QoS::AtLeastOnce,
Some(123),
);
assert!(handle_qos1.requires_ack());
let handle_qos2 = MQTTAckHandle::new(
message_id,
topic,
timestamp,
delivery_count,
QoS::ExactlyOnce,
Some(456),
);
assert!(handle_qos2.requires_ack());
}
#[test]
fn test_mqtt_ack_handle_packet_id_consistency() {
let message_id = "packet-id-test".to_string();
let topic = "test/packet".to_string();
let delivery_count = 1;
let timestamp = SystemTime::now();
let packet_id_cases = vec![
(QoS::AtMostOnce, None), (QoS::AtLeastOnce, Some(1)), (QoS::AtLeastOnce, Some(65535)), (QoS::ExactlyOnce, Some(32768)), ];
for (qos, expected_packet_id) in packet_id_cases {
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
timestamp,
delivery_count,
qos,
expected_packet_id,
);
assert_eq!(handle.packet_id(), expected_packet_id);
assert_eq!(handle.qos(), qos);
}
}
#[test]
fn test_mqtt_ack_handle_edge_cases() {
let timestamp = SystemTime::now();
let handle_empty = MQTTAckHandle::new(
"".to_string(),
"".to_string(),
QoS::AtMostOnce,
None,
0, timestamp,
);
assert_eq!(handle_empty.message_id(), "");
assert_eq!(handle_empty.topic(), "");
assert_eq!(handle_empty.delivery_count(), 0);
assert!(!handle_empty.is_retry());
let long_topic = "a".repeat(1000);
let handle_long = MQTTAckHandle::new(
"long-topic-test".to_string(),
long_topic.clone(),
QoS::AtLeastOnce,
Some(1),
1,
timestamp,
);
assert_eq!(handle_long.topic(), long_topic);
}
}
#[cfg(test)]
mod mqtt_ack_subscriber_tests {
use super::*;
#[tokio::test]
async fn test_mqtt_ack_subscriber_creation() {
let broker_url = "127.0.0.1";
let client_id = Some("test-client".to_string());
let result = MQTTAckSubscriber::new(broker_url, client_id).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("connection") ||
error.to_string().contains("Connection") ||
error.to_string().contains("refused") ||
error.to_string().contains("timeout") ||
error.to_string().contains("resolve"));
}
#[tokio::test]
async fn test_mqtt_ack_subscriber_invalid_broker_url() {
let invalid_broker_urls = vec![
"",
"invalid-host",
"256.256.256.256", "localhost:99999", ];
for broker_url in invalid_broker_urls {
let result = MQTTAckSubscriber::new(broker_url, Some("test-client".to_string())).await;
assert!(result.is_err(), "Expected error for broker URL: {}", broker_url);
}
}
#[tokio::test]
async fn test_mqtt_ack_subscriber_client_id_handling() {
let broker_url = "127.0.0.1";
let client_id_cases = vec![
Some("valid-client-id".to_string()),
Some("".to_string()), None, ];
for client_id in client_id_cases {
let result = MQTTAckSubscriber::new(broker_url, client_id.clone()).await;
assert!(result.is_err(), "Expected connection error for client ID: {:?}", client_id);
}
}
#[test]
fn test_mqtt_broker_url_validation() {
let valid_broker_urls = vec![
"localhost",
"127.0.0.1",
"mqtt.example.com",
"192.168.1.100",
"broker.hivemq.com",
];
for broker_url in valid_broker_urls {
assert!(!broker_url.is_empty());
assert!(!broker_url.contains('\0'));
assert!(!broker_url.contains('\n'));
}
}
#[test]
fn test_mqtt_client_id_generation() {
let client_ids = vec![
Some("custom-client-123".to_string()),
Some("device-sensor-01".to_string()),
Some("analytics-consumer".to_string()),
];
for client_id in client_ids {
if let Some(id) = &client_id {
assert!(!id.is_empty() || id.is_empty()); assert!(id.len() < 256); assert!(!id.contains('\0')); }
}
}
}
#[cfg(test)]
mod mqtt_error_handling_tests {
use super::*;
#[tokio::test]
async fn test_mqtt_connection_timeout() {
let broker_url = "192.0.2.1"; let client_id = Some("timeout-test".to_string());
let start_time = std::time::Instant::now();
let result = MQTTAckSubscriber::new(broker_url, client_id).await;
let elapsed = start_time.elapsed();
assert!(result.is_err());
assert!(elapsed < Duration::from_secs(30));
}
#[tokio::test]
async fn test_mqtt_invalid_configurations() {
let invalid_configs = vec![
("", Some("client".to_string())), ("invalid\0host", Some("client".to_string())), ];
for (broker_url, client_id) in invalid_configs {
let result = MQTTAckSubscriber::new(broker_url, client_id.clone()).await;
assert!(result.is_err(), "Expected error for broker: {}, client: {:?}", broker_url, client_id);
}
}
}
#[cfg(test)]
mod mqtt_integration_unit_tests {
use super::*;
#[test]
fn test_message_to_mqtt_handle_conversion() {
let message = Message::new(b"sensor data".to_vec())
.with_metadata("qos", "1")
.with_metadata("packet_id", "123")
.with_metadata("delivery_count", "1")
.with_metadata("topic", "sensors/temperature");
let qos_level: u8 = message.metadata
.get("qos")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let qos = match qos_level {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
};
let packet_id: Option<u16> = message.metadata
.get("packet_id")
.and_then(|s| s.parse().ok());
let delivery_count: u32 = message.metadata
.get("delivery_count")
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let topic = message.metadata
.get("topic")
.unwrap_or(&"default".to_string())
.clone();
assert_eq!(qos, QoS::AtLeastOnce);
assert_eq!(packet_id, Some(123));
assert_eq!(delivery_count, 1);
assert_eq!(topic, "sensors/temperature");
let handle = MQTTAckHandle::new(
message.uuid.clone(),
topic,
qos,
packet_id,
delivery_count,
SystemTime::now(),
);
assert_eq!(handle.message_id(), message.uuid);
assert_eq!(handle.qos(), qos);
assert_eq!(handle.packet_id(), packet_id);
assert_eq!(handle.delivery_count(), delivery_count);
assert!(!handle.is_retry()); assert!(handle.requires_ack()); }
#[test]
fn test_qos_acknowledgment_requirements() {
let message_id = "qos-ack-test".to_string();
let topic = "test/qos".to_string();
let delivery_count = 1;
let timestamp = SystemTime::now();
let qos_ack_cases = vec![
(QoS::AtMostOnce, None, false), (QoS::AtLeastOnce, Some(100), true), (QoS::ExactlyOnce, Some(200), true), ];
for (qos, packet_id, should_ack) in qos_ack_cases {
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
qos,
packet_id,
delivery_count,
timestamp,
);
assert_eq!(handle.requires_ack(), should_ack);
if should_ack {
assert!(packet_id.is_some(), "QoS {} should have packet ID", qos as u8);
} else {
assert!(packet_id.is_none(), "QoS {} should not have packet ID", qos as u8);
}
}
}
#[test]
fn test_topic_pattern_handling() {
let topic_patterns = vec![
"sensors/temperature",
"home/+/temperature", "sensors/#", "building/floor1/room2/sensor3",
"$SYS/broker/uptime", ];
for topic in topic_patterns {
let handle = MQTTAckHandle::new("topic-test".to_string(), topic.to_string(), SystemTime::now(), 1, QoS::AtLeastOnce, Some(1),
);
assert_eq!(handle.topic(), topic);
assert!(!handle.topic().is_empty());
}
}
#[test]
fn test_packet_id_range_handling() {
let message_id = "packet-range-test".to_string();
let topic = "test/packet".to_string();
let delivery_count = 1;
let timestamp = SystemTime::now();
let packet_id_cases = vec![
1, 100, 32768, 65535, ];
for packet_id in packet_id_cases {
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
QoS::AtLeastOnce,
Some(packet_id),
delivery_count,
timestamp,
);
assert_eq!(handle.packet_id(), Some(packet_id));
assert!(handle.requires_ack());
}
}
#[test]
fn test_delivery_count_tracking() {
let message_id = "delivery-count-test".to_string();
let topic = "test/delivery".to_string();
let qos = QoS::AtLeastOnce;
let packet_id = Some(500);
let deliveries = vec![
(1, false), (2, true), (3, true), (10, true), ];
for (count, expected_retry) in deliveries {
let handle = MQTTAckHandle::new(
message_id.clone(),
topic.clone(),
qos,
packet_id,
count,
SystemTime::now(),
);
assert_eq!(handle.is_retry(), expected_retry,
"Delivery count {} should have retry={}", count, expected_retry);
assert_eq!(handle.delivery_count(), count);
}
}
#[test]
fn test_persistent_session_logic() {
let client_ids = vec![
Some("persistent-client-1".to_string()),
Some("temp-client-2".to_string()),
None, ];
for client_id in client_ids {
if let Some(id) = &client_id {
assert!(!id.is_empty() || id.is_empty()); assert!(id.len() < 256); }
}
}
}
#[cfg(test)]
mod mqtt_performance_unit_tests {
use super::*;
#[test]
fn test_handle_creation_performance() {
let start = std::time::Instant::now();
let count = 1000;
for i in 0..count {
let qos = match i % 3 {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
};
let packet_id = if qos != QoS::AtMostOnce {
Some((i % 65535 + 1) as u16)
} else {
None
};
let _handle = MQTTAckHandle::new(
format!("message-{}", i),
format!("topic/{}", i % 10),
qos,
packet_id,
1,
SystemTime::now(),
);
}
let elapsed = start.elapsed();
let per_handle = elapsed / count;
assert!(per_handle < Duration::from_millis(1),
"Handle creation took too long: {:?} per handle", per_handle);
}
#[test]
fn test_handle_method_call_performance() {
let handle = MQTTAckHandle::new("perf-test".to_string(), "test/performance".to_string(), SystemTime::now(), 3, QoS::ExactlyOnce, Some(12345),
);
let start = std::time::Instant::now();
let iterations = 10000;
for _ in 0..iterations {
let _ = handle.message_id();
let _ = handle.topic();
let _ = handle.qos();
let _ = handle.packet_id();
let _ = handle.delivery_count();
let _ = handle.is_retry();
let _ = handle.timestamp();
let _ = handle.handle_id();
let _ = handle.requires_ack();
}
let elapsed = start.elapsed();
let per_call = elapsed / (iterations * 9);
assert!(per_call < Duration::from_micros(1),
"Method calls took too long: {:?} per call", per_call);
}
#[test]
fn test_qos_determination_performance() {
let qos_levels = vec![
QoS::AtMostOnce,
QoS::AtLeastOnce,
QoS::ExactlyOnce,
];
let start = std::time::Instant::now();
let iterations = 10000;
for i in 0..iterations {
let qos = &qos_levels[i % 3];
let requires_ack = match qos {
QoS::AtMostOnce => false,
QoS::AtLeastOnce | QoS::ExactlyOnce => true,
};
let packet_id = if requires_ack {
Some((i % 65535 + 1) as u16)
} else {
None
};
assert!(packet_id.is_some() == requires_ack);
}
let elapsed = start.elapsed();
let per_iteration = elapsed / iterations as u32;
assert!(per_iteration < Duration::from_nanos(100),
"QoS determination took too long: {:?} per iteration", per_iteration);
}
}