use alloc::string::String;
use alloc::vec::Vec;
use core::fmt;
use crate::codec::CodecError;
use crate::data_types::{
decode_two_byte_int, decode_utf8_string, encode_two_byte_int, encode_utf8_string,
};
use crate::vbi::{decode_vbi, encode_vbi};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AckBody {
pub packet_id: u16,
pub reason_code: u8,
pub properties: Vec<u8>,
}
pub fn encode_ack_body(a: &AckBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(4 + a.properties.len());
out.extend_from_slice(&encode_two_byte_int(a.packet_id));
out.push(a.reason_code);
let prop_len = encode_vbi(u32::try_from(a.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?;
out.extend_from_slice(&prop_len);
out.extend_from_slice(&a.properties);
Ok(out)
}
pub fn decode_ack_body(bytes: &[u8]) -> Result<AckBody, CodecError> {
if bytes.len() < 2 {
return Err(CodecError::HeaderTooShort);
}
let (packet_id, off) = decode_two_byte_int(bytes)?;
if bytes.len() == off {
return Ok(AckBody {
packet_id,
reason_code: 0,
properties: Vec::new(),
});
}
let reason_code = bytes[off];
let mut cursor = off + 1;
let properties = if cursor < bytes.len() {
let (prop_len, vbi_consumed) = decode_vbi(&bytes[cursor..])?;
cursor += vbi_consumed;
let pl = prop_len as usize;
if bytes.len() < cursor + pl {
return Err(CodecError::RemainingLengthMismatch);
}
let p = bytes[cursor..cursor + pl].to_vec();
cursor += pl;
p
} else {
Vec::new()
};
let _ = cursor; Ok(AckBody {
packet_id,
reason_code,
properties,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectBody {
pub protocol_name: String,
pub protocol_version: u8,
pub connect_flags: u8,
pub keep_alive: u16,
pub properties: Vec<u8>,
pub client_id: String,
pub will_properties: Vec<u8>,
pub will_topic: Option<String>,
pub will_payload: Vec<u8>,
pub user_name: Option<String>,
pub password: Vec<u8>,
}
pub mod connect_flags {
pub const RESERVED: u8 = 0x01;
pub const CLEAN_START: u8 = 0x02;
pub const WILL: u8 = 0x04;
pub const WILL_QOS_MASK: u8 = 0x18;
pub const WILL_RETAIN: u8 = 0x20;
pub const PASSWORD: u8 = 0x40;
pub const USER_NAME: u8 = 0x80;
}
pub fn encode_connect_body(c: &ConnectBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(64 + c.properties.len() + c.client_id.len());
out.extend_from_slice(&encode_utf8_string(&c.protocol_name)?);
out.push(c.protocol_version);
out.push(c.connect_flags);
out.extend_from_slice(&encode_two_byte_int(c.keep_alive));
out.extend_from_slice(
&encode_vbi(u32::try_from(c.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&c.properties);
out.extend_from_slice(&encode_utf8_string(&c.client_id)?);
if c.connect_flags & connect_flags::WILL != 0 {
out.extend_from_slice(
&encode_vbi(u32::try_from(c.will_properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&c.will_properties);
if let Some(t) = &c.will_topic {
out.extend_from_slice(&encode_utf8_string(t)?);
}
out.extend_from_slice(&encode_two_byte_int(
u16::try_from(c.will_payload.len()).unwrap_or(u16::MAX),
));
out.extend_from_slice(&c.will_payload);
}
if c.connect_flags & connect_flags::USER_NAME != 0 {
if let Some(u) = &c.user_name {
out.extend_from_slice(&encode_utf8_string(u)?);
}
}
if c.connect_flags & connect_flags::PASSWORD != 0 {
out.extend_from_slice(&encode_two_byte_int(
u16::try_from(c.password.len()).unwrap_or(u16::MAX),
));
out.extend_from_slice(&c.password);
}
Ok(out)
}
pub fn decode_connect_body(bytes: &[u8]) -> Result<ConnectBody, CodecError> {
let mut cur = 0;
let (protocol_name, off) = decode_utf8_string(&bytes[cur..])?;
cur += off;
if bytes.len() < cur + 4 {
return Err(CodecError::HeaderTooShort);
}
let protocol_version = bytes[cur];
cur += 1;
let connect_flags = bytes[cur];
cur += 1;
let (keep_alive, off) = decode_two_byte_int(&bytes[cur..])?;
cur += off;
let (props, n) = consume_properties(&bytes[cur..])?;
cur += n;
let (client_id, off) = decode_utf8_string(&bytes[cur..])?;
cur += off;
let mut will_properties = Vec::new();
let mut will_topic = None;
let mut will_payload = Vec::new();
if connect_flags & connect_flags::WILL != 0 {
let (wp, n) = consume_properties(&bytes[cur..])?;
will_properties = wp;
cur += n;
let (t, off) = decode_utf8_string(&bytes[cur..])?;
will_topic = Some(t);
cur += off;
if bytes.len() < cur + 2 {
return Err(CodecError::HeaderTooShort);
}
let (wpl, off) = decode_two_byte_int(&bytes[cur..])?;
cur += off;
let pl = wpl as usize;
if bytes.len() < cur + pl {
return Err(CodecError::RemainingLengthMismatch);
}
will_payload = bytes[cur..cur + pl].to_vec();
cur += pl;
}
let mut user_name = None;
if connect_flags & connect_flags::USER_NAME != 0 {
let (u, off) = decode_utf8_string(&bytes[cur..])?;
user_name = Some(u);
cur += off;
}
let mut password = Vec::new();
if connect_flags & connect_flags::PASSWORD != 0 {
if bytes.len() < cur + 2 {
return Err(CodecError::HeaderTooShort);
}
let (pl, off) = decode_two_byte_int(&bytes[cur..])?;
cur += off;
let l = pl as usize;
if bytes.len() < cur + l {
return Err(CodecError::RemainingLengthMismatch);
}
password = bytes[cur..cur + l].to_vec();
}
Ok(ConnectBody {
protocol_name,
protocol_version,
connect_flags,
keep_alive,
properties: props,
client_id,
will_properties,
will_topic,
will_payload,
user_name,
password,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnackBody {
pub session_present: bool,
pub reason_code: u8,
pub properties: Vec<u8>,
}
pub fn encode_connack_body(c: &ConnackBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(4 + c.properties.len());
out.push(if c.session_present { 0x01 } else { 0x00 });
out.push(c.reason_code);
out.extend_from_slice(
&encode_vbi(u32::try_from(c.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&c.properties);
Ok(out)
}
pub fn decode_connack_body(bytes: &[u8]) -> Result<ConnackBody, CodecError> {
if bytes.len() < 2 {
return Err(CodecError::HeaderTooShort);
}
let session_present = bytes[0] & 0x01 != 0;
let reason_code = bytes[1];
let (properties, _) = consume_properties(&bytes[2..])?;
Ok(ConnackBody {
session_present,
reason_code,
properties,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscription {
pub topic_filter: String,
pub options: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeBody {
pub packet_id: u16,
pub properties: Vec<u8>,
pub subscriptions: Vec<Subscription>,
}
pub fn encode_subscribe_body(s: &SubscribeBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(8 + s.properties.len() + s.subscriptions.len() * 16);
out.extend_from_slice(&encode_two_byte_int(s.packet_id));
out.extend_from_slice(
&encode_vbi(u32::try_from(s.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&s.properties);
for sub in &s.subscriptions {
out.extend_from_slice(&encode_utf8_string(&sub.topic_filter)?);
out.push(sub.options);
}
Ok(out)
}
pub fn decode_subscribe_body(bytes: &[u8]) -> Result<SubscribeBody, CodecError> {
if bytes.len() < 2 {
return Err(CodecError::HeaderTooShort);
}
let (packet_id, off) = decode_two_byte_int(bytes)?;
let mut cur = off;
let (properties, n) = consume_properties(&bytes[cur..])?;
cur += n;
let mut subscriptions = Vec::new();
while cur < bytes.len() {
let (filter, off) = decode_utf8_string(&bytes[cur..])?;
cur += off;
if bytes.len() <= cur {
return Err(CodecError::HeaderTooShort);
}
let options = bytes[cur];
cur += 1;
subscriptions.push(Subscription {
topic_filter: filter,
options,
});
}
Ok(SubscribeBody {
packet_id,
properties,
subscriptions,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubackBody {
pub packet_id: u16,
pub properties: Vec<u8>,
pub reason_codes: Vec<u8>,
}
pub fn encode_suback_body(s: &SubackBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(4 + s.properties.len() + s.reason_codes.len());
out.extend_from_slice(&encode_two_byte_int(s.packet_id));
out.extend_from_slice(
&encode_vbi(u32::try_from(s.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&s.properties);
out.extend_from_slice(&s.reason_codes);
Ok(out)
}
pub fn decode_suback_body(bytes: &[u8]) -> Result<SubackBody, CodecError> {
if bytes.len() < 2 {
return Err(CodecError::HeaderTooShort);
}
let (packet_id, off) = decode_two_byte_int(bytes)?;
let (properties, n) = consume_properties(&bytes[off..])?;
let cur = off + n;
let reason_codes = bytes[cur..].to_vec();
Ok(SubackBody {
packet_id,
properties,
reason_codes,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsubscribeBody {
pub packet_id: u16,
pub properties: Vec<u8>,
pub topic_filters: Vec<String>,
}
pub fn encode_unsubscribe_body(u: &UnsubscribeBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(4 + u.properties.len() + u.topic_filters.len() * 16);
out.extend_from_slice(&encode_two_byte_int(u.packet_id));
out.extend_from_slice(
&encode_vbi(u32::try_from(u.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&u.properties);
for f in &u.topic_filters {
out.extend_from_slice(&encode_utf8_string(f)?);
}
Ok(out)
}
pub fn decode_unsubscribe_body(bytes: &[u8]) -> Result<UnsubscribeBody, CodecError> {
if bytes.len() < 2 {
return Err(CodecError::HeaderTooShort);
}
let (packet_id, off) = decode_two_byte_int(bytes)?;
let mut cur = off;
let (properties, n) = consume_properties(&bytes[cur..])?;
cur += n;
let mut topic_filters = Vec::new();
while cur < bytes.len() {
let (f, off) = decode_utf8_string(&bytes[cur..])?;
cur += off;
topic_filters.push(f);
}
Ok(UnsubscribeBody {
packet_id,
properties,
topic_filters,
})
}
pub type UnsubackBody = SubackBody;
pub fn encode_unsuback_body(u: &UnsubackBody) -> Result<Vec<u8>, CodecError> {
encode_suback_body(u)
}
pub fn decode_unsuback_body(bytes: &[u8]) -> Result<UnsubackBody, CodecError> {
decode_suback_body(bytes)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DisconnectBody {
pub reason_code: u8,
pub properties: Vec<u8>,
}
pub fn encode_disconnect_body(d: &DisconnectBody) -> Result<Vec<u8>, CodecError> {
let mut out = Vec::with_capacity(2 + d.properties.len());
out.push(d.reason_code);
out.extend_from_slice(
&encode_vbi(u32::try_from(d.properties.len()).unwrap_or(u32::MAX))
.ok_or(CodecError::Vbi(crate::vbi::VbiError::Malformed))?,
);
out.extend_from_slice(&d.properties);
Ok(out)
}
pub fn decode_disconnect_body(bytes: &[u8]) -> Result<DisconnectBody, CodecError> {
if bytes.is_empty() {
return Ok(DisconnectBody {
reason_code: 0,
properties: Vec::new(),
});
}
let reason_code = bytes[0];
let (properties, _) = consume_properties(&bytes[1..])?;
Ok(DisconnectBody {
reason_code,
properties,
})
}
pub type AuthBody = DisconnectBody;
pub fn encode_auth_body(a: &AuthBody) -> Result<Vec<u8>, CodecError> {
encode_disconnect_body(a)
}
pub fn decode_auth_body(bytes: &[u8]) -> Result<AuthBody, CodecError> {
decode_disconnect_body(bytes)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PropertyDataType {
Byte,
TwoByteInt,
FourByteInt,
VariableByteInt,
Utf8String,
BinaryData,
Utf8StringPair,
}
#[must_use]
pub fn property_data_type(id: u8) -> Option<PropertyDataType> {
Some(match id {
0x01 => PropertyDataType::Byte, 0x02 => PropertyDataType::FourByteInt, 0x03 => PropertyDataType::Utf8String, 0x08 => PropertyDataType::Utf8String, 0x09 => PropertyDataType::BinaryData, 0x0B => PropertyDataType::VariableByteInt, 0x11 => PropertyDataType::FourByteInt, 0x12 => PropertyDataType::Utf8String, 0x13 => PropertyDataType::TwoByteInt, 0x15 => PropertyDataType::Utf8String, 0x16 => PropertyDataType::BinaryData, 0x17 => PropertyDataType::Byte, 0x18 => PropertyDataType::FourByteInt, 0x19 => PropertyDataType::Byte, 0x1A => PropertyDataType::Utf8String, 0x1C => PropertyDataType::Utf8String, 0x1F => PropertyDataType::Utf8String, 0x21 => PropertyDataType::TwoByteInt, 0x22 => PropertyDataType::TwoByteInt, 0x23 => PropertyDataType::TwoByteInt, 0x24 => PropertyDataType::Byte, 0x25 => PropertyDataType::Byte, 0x26 => PropertyDataType::Utf8StringPair, 0x27 => PropertyDataType::FourByteInt, 0x28 => PropertyDataType::Byte, 0x29 => PropertyDataType::Byte, 0x2A => PropertyDataType::Byte, _ => return None,
})
}
impl fmt::Display for PropertyDataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Byte => "Byte",
Self::TwoByteInt => "TwoByteInt",
Self::FourByteInt => "FourByteInt",
Self::VariableByteInt => "VariableByteInt",
Self::Utf8String => "Utf8String",
Self::BinaryData => "BinaryData",
Self::Utf8StringPair => "Utf8StringPair",
})
}
}
fn consume_properties(bytes: &[u8]) -> Result<(Vec<u8>, usize), CodecError> {
if bytes.is_empty() {
return Ok((Vec::new(), 0));
}
let (prop_len, vbi_consumed) = decode_vbi(bytes)?;
let pl = prop_len as usize;
let cur = vbi_consumed;
if bytes.len() < cur + pl {
return Err(CodecError::RemainingLengthMismatch);
}
Ok((bytes[cur..cur + pl].to_vec(), cur + pl))
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
fn round<T, E, D>(value: T, encode: E, decode: D)
where
T: PartialEq + core::fmt::Debug + Clone,
E: Fn(&T) -> Result<Vec<u8>, CodecError>,
D: Fn(&[u8]) -> Result<T, CodecError>,
{
let bytes = encode(&value).expect("encode");
let parsed = decode(&bytes).expect("decode");
assert_eq!(parsed, value);
}
#[test]
fn ack_body_round_trip_with_properties() {
let ack = AckBody {
packet_id: 0x1234,
reason_code: 0x10, properties: alloc::vec![0x1F, 0x00, 0x05, b'h', b'e', b'l', b'l', b'o'],
};
round(ack, encode_ack_body, decode_ack_body);
}
#[test]
fn ack_body_short_form_no_reason_code_no_properties() {
let bytes = alloc::vec![0xAB, 0xCD];
let parsed = decode_ack_body(&bytes).expect("decode");
assert_eq!(parsed.packet_id, 0xABCD);
assert_eq!(parsed.reason_code, 0);
assert!(parsed.properties.is_empty());
}
#[test]
fn connect_body_round_trip_minimal() {
let c = ConnectBody {
protocol_name: "MQTT".to_string(),
protocol_version: 5,
connect_flags: 0x02, keep_alive: 60,
properties: Vec::new(),
client_id: "test-client".to_string(),
will_properties: Vec::new(),
will_topic: None,
will_payload: Vec::new(),
user_name: None,
password: Vec::new(),
};
round(c, encode_connect_body, decode_connect_body);
}
#[test]
fn connect_body_round_trip_with_will_and_credentials() {
let c = ConnectBody {
protocol_name: "MQTT".to_string(),
protocol_version: 5,
connect_flags: connect_flags::CLEAN_START
| connect_flags::WILL
| connect_flags::USER_NAME
| connect_flags::PASSWORD,
keep_alive: 30,
properties: Vec::new(),
client_id: "edge-1".to_string(),
will_properties: Vec::new(),
will_topic: Some("status".to_string()),
will_payload: alloc::vec![0xDE, 0xAD],
user_name: Some("alice".to_string()),
password: alloc::vec![1, 2, 3, 4],
};
round(c, encode_connect_body, decode_connect_body);
}
#[test]
fn connack_body_round_trip() {
let c = ConnackBody {
session_present: true,
reason_code: 0x00,
properties: alloc::vec![0x21, 0x00, 0x10], };
round(c, encode_connack_body, decode_connack_body);
}
#[test]
fn subscribe_body_round_trip_with_two_filters() {
let s = SubscribeBody {
packet_id: 1,
properties: Vec::new(),
subscriptions: alloc::vec![
Subscription {
topic_filter: "sensors/+".to_string(),
options: 0x01, },
Subscription {
topic_filter: "alerts/#".to_string(),
options: 0x02, }
],
};
round(s, encode_subscribe_body, decode_subscribe_body);
}
#[test]
fn suback_body_round_trip_with_reason_codes() {
let s = SubackBody {
packet_id: 1,
properties: Vec::new(),
reason_codes: alloc::vec![0x00, 0x01, 0x80], };
round(s, encode_suback_body, decode_suback_body);
}
#[test]
fn unsubscribe_body_round_trip() {
let u = UnsubscribeBody {
packet_id: 5,
properties: Vec::new(),
topic_filters: alloc::vec!["a/b".to_string(), "c/d".to_string()],
};
round(u, encode_unsubscribe_body, decode_unsubscribe_body);
}
#[test]
fn unsuback_body_round_trip_via_alias() {
let u = UnsubackBody {
packet_id: 5,
properties: Vec::new(),
reason_codes: alloc::vec![0x00, 0x11], };
round(u, encode_unsuback_body, decode_unsuback_body);
}
#[test]
fn disconnect_body_round_trip_with_reason_string_property() {
let d = DisconnectBody {
reason_code: 0x82, properties: alloc::vec![0x1F, 0x00, 0x03, b'b', b'a', b'd'],
};
round(d, encode_disconnect_body, decode_disconnect_body);
}
#[test]
fn disconnect_body_short_form_implicit_normal_disconnection() {
let parsed = decode_disconnect_body(&[]).expect("decode");
assert_eq!(parsed.reason_code, 0);
assert!(parsed.properties.is_empty());
}
#[test]
fn auth_body_round_trip_via_alias() {
let a = AuthBody {
reason_code: 0x18, properties: alloc::vec![0x15, 0x00, 0x06, b'S', b'C', b'R', b'A', b'M', b'!'],
};
round(a, encode_auth_body, decode_auth_body);
}
#[test]
fn property_data_type_table_covers_all_spec_identifiers() {
for id in [
0x01_u8, 0x02, 0x03, 0x08, 0x09, 0x0B, 0x11, 0x12, 0x13, 0x15, 0x16, 0x17, 0x18, 0x19,
0x1A, 0x1C, 0x1F, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A,
] {
assert!(
property_data_type(id).is_some(),
"missing data type for property id 0x{id:02X}"
);
}
}
#[test]
fn property_data_type_returns_none_for_unknown_id() {
assert!(property_data_type(0x00).is_none());
assert!(property_data_type(0xFF).is_none());
}
#[test]
fn property_data_type_user_property_is_utf8_pair() {
assert_eq!(
property_data_type(0x26),
Some(PropertyDataType::Utf8StringPair)
);
}
}