#[cfg(test)]
mod tests {
use crate::codec::MqttCodec;
use crate::packet::{MqttPacket, Property, ProtocolLevel};
use bytes::BytesMut;
use tokio_util::codec::Decoder;
#[test]
fn test_decode_connect_v311() {
let mut codec = MqttCodec::new();
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x10, 0x10]);
buf.extend_from_slice(&[0x00, 0x04, b'M', b'Q', b'T', b'T']);
buf.extend_from_slice(&[0x04]);
buf.extend_from_slice(&[0x02]);
buf.extend_from_slice(&[0x00, 0x3C]);
buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']);
let result = codec.decode(&mut buf).unwrap();
match result {
Some(MqttPacket::Connect(connect)) => {
assert_eq!(connect.protocol_level, ProtocolLevel::V311);
assert_eq!(connect.client_id, "test");
assert_eq!(connect.clean_session, true);
assert_eq!(connect.keep_alive, 60);
}
_ => panic!("Failed to decode CONNECT packet"),
}
}
#[test]
fn test_decode_publish_qos1_zero_copy() {
let mut codec = MqttCodec::new();
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x32, 0x0D]);
buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']);
buf.extend_from_slice(&[0x00, 0x0A]);
buf.extend_from_slice(b"hello");
let result = codec.decode(&mut buf).unwrap();
match result {
Some(MqttPacket::Publish(publish)) => {
assert_eq!(publish.topic, "test");
assert_eq!(publish.qos, 1);
assert_eq!(publish.packet_id, Some(10));
assert!(publish.properties.is_empty());
assert_eq!(publish.payload.as_ref(), b"hello");
}
_ => panic!("Failed to decode PUBLISH packet"),
}
}
#[test]
fn test_decode_publish_v5_with_properties() {
let mut codec = MqttCodec::new();
codec.protocol_level = ProtocolLevel::V5; let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x30, 21]);
buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']);
buf.extend_from_slice(&[9]);
buf.extend_from_slice(&[0x01, 0x01]);
buf.extend_from_slice(&[0x03, 0x00, 0x04, b'J', b'S', b'O', b'N']);
buf.extend_from_slice(b"hello");
let result = codec.decode(&mut buf).unwrap();
match result {
Some(MqttPacket::Publish(publish)) => {
assert_eq!(publish.topic, "test");
assert_eq!(publish.properties.len(), 2);
assert_eq!(publish.properties[0], Property::PayloadFormatIndicator(1));
assert_eq!(
publish.properties[1],
Property::ContentType("JSON".to_string())
);
assert_eq!(publish.payload.as_ref(), b"hello");
}
_ => panic!("Failed to decode V5 PUBLISH packet"),
}
}
#[test]
fn test_max_packet_size_rejects_oversized() {
let mut codec = MqttCodec::with_max_packet_size(10);
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x32, 0x0D]); buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']); buf.extend_from_slice(&[0x00, 0x0A]); buf.extend_from_slice(b"hello");
let result = codec.decode(&mut buf);
match result {
Err(crate::MqttError::PayloadTooLarge { size, limit }) => {
assert_eq!(size, 13);
assert_eq!(limit, 10);
}
other => panic!("Expected PayloadTooLarge, got: {:?}", other),
}
assert!(
buf.is_empty(),
"oversized packet should be drained from buffer"
);
}
#[test]
fn test_max_packet_size_accepts_within_limit() {
let mut codec = MqttCodec::with_max_packet_size(20);
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x32, 0x0D]); buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']);
buf.extend_from_slice(&[0x00, 0x0A]);
buf.extend_from_slice(b"hello");
let result = codec.decode(&mut buf).unwrap();
match result {
Some(MqttPacket::Publish(publish)) => {
assert_eq!(publish.topic, "test");
assert_eq!(publish.payload.as_ref(), b"hello");
}
other => panic!("Expected Publish, got: {:?}", other),
}
}
#[test]
fn test_no_max_packet_size_accepts_any() {
let mut codec = MqttCodec::new();
assert!(codec.max_packet_size.is_none());
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x32, 0x0D]);
buf.extend_from_slice(&[0x00, 0x04, b't', b'e', b's', b't']);
buf.extend_from_slice(&[0x00, 0x0A]);
buf.extend_from_slice(b"hello");
let result = codec.decode(&mut buf).unwrap();
assert!(matches!(result, Some(MqttPacket::Publish(_))));
}
}