use crate::packet::connect::ConnectPacket;
use crate::packet::publish::PublishPacket;
use crate::packet::subscribe::{SubscribePacket, TopicFilter};
use crate::packet::MqttPacket;
use crate::protocol::v5::properties::Properties;
use crate::QoS;
use proptest::prelude::*;
fn mqtt_string_strategy() -> impl Strategy<Value = String> {
prop::string::string_regex("[^\x00]{1,100}")
.unwrap()
.prop_filter("Non-empty string", |s| !s.is_empty())
}
fn topic_name_strategy() -> impl Strategy<Value = String> {
prop::string::string_regex("[^\x00+#]{1,50}")
.unwrap()
.prop_filter("Valid topic", |s| {
!s.is_empty() && !s.contains('+') && !s.contains('#')
})
}
fn client_id_strategy() -> impl Strategy<Value = String> {
prop::string::string_regex("[^\x00]{0,23}").unwrap()
}
fn qos_strategy() -> impl Strategy<Value = QoS> {
prop_oneof![
Just(QoS::AtMostOnce),
Just(QoS::AtLeastOnce),
Just(QoS::ExactlyOnce),
]
}
fn payload_strategy() -> impl Strategy<Value = Vec<u8>> {
prop::collection::vec(any::<u8>(), 0..1000)
}
proptest! {
#[test]
fn test_connect_packet_roundtrip(
client_id in client_id_strategy(),
keep_alive in 0u16..65535,
clean_start in any::<bool>(),
credentials in prop::option::of((mqtt_string_strategy(), prop::option::of(prop::collection::vec(any::<u8>(), 0..100)))),
) {
let (username, password) = match credentials {
Some((user, pass)) => (Some(user), pass),
None => (None, None),
};
let connect = ConnectPacket {
client_id,
keep_alive,
clean_start,
will: None, username,
password,
properties: Properties::new(),
protocol_version: 5,
will_properties: Properties::new(),
};
let mut buffer = Vec::new();
let encode_result = connect.encode(&mut buffer);
prop_assert!(encode_result.is_ok(), "Encoding should succeed: {:?}", encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let fixed_header = crate::packet::FixedHeader::decode(&mut buffer_bytes).unwrap();
let decode_result = ConnectPacket::decode_body(&mut buffer_bytes, &fixed_header);
prop_assert!(decode_result.is_ok(), "Decoding should succeed: {:?}", decode_result);
let decoded = decode_result.unwrap();
prop_assert_eq!(decoded.client_id, connect.client_id);
prop_assert_eq!(decoded.keep_alive, connect.keep_alive);
prop_assert_eq!(decoded.clean_start, connect.clean_start);
prop_assert_eq!(decoded.username, connect.username);
prop_assert_eq!(decoded.password, connect.password);
prop_assert_eq!(decoded.protocol_version, connect.protocol_version);
}
#[test]
fn test_publish_packet_roundtrip(
topic in topic_name_strategy(),
payload in payload_strategy(),
qos in qos_strategy(),
retain in any::<bool>(),
dup in any::<bool>(),
) {
let packet_id = if matches!(qos, QoS::AtMostOnce) { None } else { Some(42u16) };
let dup = if matches!(qos, QoS::AtMostOnce) { false } else { dup };
let publish = PublishPacket {
topic_name: topic,
payload: payload.into(),
qos,
retain,
dup,
packet_id,
properties: Properties::new(),
protocol_version: 5,
stream_id: None,
};
let mut buffer = Vec::new();
let encode_result = publish.encode(&mut buffer);
prop_assert!(encode_result.is_ok(), "Encoding should succeed: {:?}", encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let fixed_header = crate::packet::FixedHeader::decode(&mut buffer_bytes).unwrap();
let decode_result = PublishPacket::decode_body(&mut buffer_bytes, &fixed_header);
prop_assert!(decode_result.is_ok(), "Decoding should succeed: {:?}", decode_result);
let decoded = decode_result.unwrap();
prop_assert_eq!(decoded.topic_name, publish.topic_name);
prop_assert_eq!(decoded.payload, publish.payload);
prop_assert_eq!(decoded.qos, publish.qos);
prop_assert_eq!(decoded.retain, publish.retain);
prop_assert_eq!(decoded.dup, publish.dup);
prop_assert_eq!(decoded.packet_id, publish.packet_id);
}
#[test]
fn test_subscribe_packet_roundtrip(
packet_id in 1u16..65535,
topics in prop::collection::vec(
(topic_name_strategy(), qos_strategy()),
1..10
),
) {
let filters: Vec<TopicFilter> = topics
.into_iter()
.map(|(topic, qos)| TopicFilter::new(topic, qos))
.collect();
let subscribe = SubscribePacket {
packet_id,
filters: filters.clone(),
properties: Properties::new(),
protocol_version: 5,
};
let mut buffer = Vec::new();
let encode_result = subscribe.encode(&mut buffer);
prop_assert!(encode_result.is_ok(), "Encoding should succeed: {:?}", encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let fixed_header = crate::packet::FixedHeader::decode(&mut buffer_bytes).unwrap();
let decode_result = SubscribePacket::decode_body(&mut buffer_bytes, &fixed_header);
prop_assert!(decode_result.is_ok(), "Decoding should succeed: {:?}", decode_result);
let decoded = decode_result.unwrap();
prop_assert_eq!(decoded.packet_id, subscribe.packet_id);
prop_assert_eq!(decoded.filters.len(), subscribe.filters.len());
for (original, decoded_filter) in filters.iter().zip(decoded.filters.iter()) {
prop_assert_eq!(&decoded_filter.filter, &original.filter);
prop_assert_eq!(decoded_filter.options.qos, original.options.qos);
}
}
#[test]
fn test_variable_length_integer_roundtrip(value in 0u32..268_435_455) {
let mut buffer = Vec::new();
let encode_result = crate::encoding::encode_variable_int(&mut buffer, value);
prop_assert!(encode_result.is_ok(), "Encoding should succeed for value {}: {:?}", value, encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let decode_result = crate::encoding::decode_variable_int(&mut buffer_bytes);
prop_assert!(decode_result.is_ok(), "Decoding should succeed for value {}: {:?}", value, decode_result);
let decoded_value = decode_result.unwrap();
prop_assert_eq!(decoded_value, value, "Roundtrip should preserve value");
}
#[test]
fn test_string_encoding_roundtrip(
input in prop::string::string_regex("[^\x00]{0,200}").unwrap()
) {
use crate::encoding::{encode_string, decode_string};
let mut buffer = Vec::new();
let encode_result = encode_string(&mut buffer, &input);
prop_assert!(encode_result.is_ok(), "String encoding should succeed: {:?}", encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let decode_result = decode_string(&mut buffer_bytes);
prop_assert!(decode_result.is_ok(), "String decoding should succeed: {:?}", decode_result);
let decoded = decode_result.unwrap();
prop_assert_eq!(decoded, input, "String roundtrip should preserve content");
}
#[test]
fn test_binary_encoding_roundtrip(
input in prop::collection::vec(any::<u8>(), 0..1000)
) {
use crate::encoding::{encode_binary, decode_binary};
let mut buffer = Vec::new();
let encode_result = encode_binary(&mut buffer, &input);
prop_assert!(encode_result.is_ok(), "Binary encoding should succeed: {:?}", encode_result);
let mut buffer_bytes = bytes::Bytes::from(buffer);
let decode_result = decode_binary(&mut buffer_bytes);
prop_assert!(decode_result.is_ok(), "Binary decoding should succeed: {:?}", decode_result);
let decoded = decode_result.unwrap();
prop_assert_eq!(decoded, input, "Binary roundtrip should preserve content");
}
}
#[cfg(test)]
mod property_invariant_tests {
use super::*;
proptest! {
#[test]
fn test_packet_size_bounds(
client_id in client_id_strategy(),
topic in topic_name_strategy(),
payload in prop::collection::vec(any::<u8>(), 0..100),
) {
let connect = ConnectPacket {
client_id: client_id.clone(),
keep_alive: 60,
clean_start: true,
will: None,
username: None,
password: None,
properties: Properties::new(),
protocol_version: 5,
will_properties: Properties::new(),
};
let mut buffer = Vec::new();
if connect.encode(&mut buffer).is_ok() {
prop_assert!(buffer.len() < 1000, "CONNECT packet too large: {} bytes", buffer.len());
prop_assert!(buffer.len() >= 10, "CONNECT packet too small: {} bytes", buffer.len());
}
let publish = PublishPacket {
topic_name: topic.clone(),
payload: payload.clone().into(),
qos: QoS::AtMostOnce,
retain: false,
dup: false,
packet_id: None,
properties: Properties::new(),
protocol_version: 5,
stream_id: None,
};
let mut pub_buffer = Vec::new();
if publish.encode(&mut pub_buffer).is_ok() {
let expected_min = topic.len() + payload.len();
let expected_max = expected_min + 100;
prop_assert!(pub_buffer.len() >= expected_min,
"PUBLISH packet too small: {} bytes, expected at least {}",
pub_buffer.len(), expected_min);
prop_assert!(pub_buffer.len() <= expected_max,
"PUBLISH packet too large: {} bytes, expected at most {}",
pub_buffer.len(), expected_max);
}
}
#[test]
fn test_packet_encoding_deterministic(
client_id in client_id_strategy(),
keep_alive in 0u16..65535,
) {
let connect = ConnectPacket {
client_id,
keep_alive,
clean_start: true,
will: None,
username: None,
password: None,
properties: Properties::new(),
protocol_version: 5,
will_properties: Properties::new(),
};
let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();
let result1 = connect.encode(&mut buffer1);
let result2 = connect.encode(&mut buffer2);
if result1.is_ok() && result2.is_ok() {
prop_assert_eq!(buffer1, buffer2, "Packet encoding should be deterministic");
}
}
}
}