use crate::mqtt::packet::escape_binary_json_string;
use crate::mqtt::packet::mqtt_binary::MqttBinary;
use crate::mqtt::packet::mqtt_string::MqttString;
use crate::mqtt::packet::DecodeResult;
use crate::mqtt::packet::VariableByteInteger;
use crate::mqtt::result_code::MqttError;
use alloc::{string::String, vec::Vec};
use core::convert::{TryFrom, TryInto};
use core::fmt;
use num_enum::TryFromPrimitive;
use serde::ser::SerializeStruct;
use serde::ser::Serializer;
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use std::io::IoSlice;
#[derive(Deserialize, PartialEq, Eq, Copy, Clone, TryFromPrimitive)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[repr(u8)]
pub enum PropertyId {
PayloadFormatIndicator = 1,
MessageExpiryInterval = 2,
ContentType = 3,
ResponseTopic = 8,
CorrelationData = 9,
SubscriptionIdentifier = 11,
SessionExpiryInterval = 17,
AssignedClientIdentifier = 18,
ServerKeepAlive = 19,
AuthenticationMethod = 21,
AuthenticationData = 22,
RequestProblemInformation = 23,
WillDelayInterval = 24,
RequestResponseInformation = 25,
ResponseInformation = 26,
ServerReference = 28,
ReasonString = 31,
ReceiveMaximum = 33,
TopicAliasMaximum = 34,
TopicAlias = 35,
MaximumQos = 36,
RetainAvailable = 37,
UserProperty = 38,
MaximumPacketSize = 39,
WildcardSubscriptionAvailable = 40,
SubscriptionIdentifierAvailable = 41,
SharedSubscriptionAvailable = 42,
}
impl PropertyId {
pub fn as_u8(self) -> u8 {
self as u8
}
pub fn as_str(&self) -> &'static str {
match self {
PropertyId::PayloadFormatIndicator => "payload_format_indicator",
PropertyId::MessageExpiryInterval => "message_expiry_interval",
PropertyId::ContentType => "content_type",
PropertyId::ResponseTopic => "response_topic",
PropertyId::CorrelationData => "correlation_data",
PropertyId::SubscriptionIdentifier => "subscription_identifier",
PropertyId::SessionExpiryInterval => "session_expiry_interval",
PropertyId::AssignedClientIdentifier => "assigned_client_identifier",
PropertyId::ServerKeepAlive => "server_keep_alive",
PropertyId::AuthenticationMethod => "authentication_method",
PropertyId::AuthenticationData => "authentication_data",
PropertyId::RequestProblemInformation => "request_problem_information",
PropertyId::WillDelayInterval => "will_delay_interval",
PropertyId::RequestResponseInformation => "request_response_information",
PropertyId::ResponseInformation => "response_information",
PropertyId::ServerReference => "server_reference",
PropertyId::ReasonString => "reason_string",
PropertyId::ReceiveMaximum => "receive_maximum",
PropertyId::TopicAliasMaximum => "topic_alias_maximum",
PropertyId::TopicAlias => "topic_alias",
PropertyId::MaximumQos => "maximum_qos",
PropertyId::RetainAvailable => "retain_available",
PropertyId::UserProperty => "user_property",
PropertyId::MaximumPacketSize => "maximum_packet_size",
PropertyId::WildcardSubscriptionAvailable => "wildcard_subscription_available",
PropertyId::SubscriptionIdentifierAvailable => "subscription_identifier_available",
PropertyId::SharedSubscriptionAvailable => "shared_subscription_available",
}
}
}
impl Serialize for PropertyId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}
impl fmt::Display for PropertyId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_string(self) {
Ok(json) => write!(f, "{json}"),
Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
}
}
}
impl fmt::Debug for PropertyId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TryFromPrimitive)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[repr(u8)]
pub enum PayloadFormat {
Binary = 0,
String = 1,
}
impl fmt::Display for PayloadFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
PayloadFormat::Binary => "binary",
PayloadFormat::String => "string",
};
write!(f, "{s}")
}
}
pub trait PropertySize {
fn size(&self) -> usize;
}
impl PropertySize for u8 {
fn size(&self) -> usize {
1
}
}
impl PropertySize for u16 {
fn size(&self) -> usize {
2
}
}
impl PropertySize for u32 {
fn size(&self) -> usize {
4
}
}
impl PropertySize for String {
fn size(&self) -> usize {
2 + self.len()
}
}
impl PropertySize for Vec<u8> {
fn size(&self) -> usize {
2 + self.len()
}
}
impl PropertySize for VariableByteInteger {
fn size(&self) -> usize {
match self.to_u32() {
0..=0x7F => 1,
0x80..=0x3FFF => 2,
0x4000..=0x1F_FFFF => 3,
_ => 4,
}
}
}
macro_rules! mqtt_property_common {
($name:ident, $id:expr, $ty:ty) => {
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct $name {
id_bytes: [u8; 1],
value: $ty,
}
impl $name {
pub fn id(&self) -> PropertyId {
$id
}
}
impl From<$name> for Property {
fn from(v: $name) -> Self {
Property::$name(v)
}
}
};
}
macro_rules! mqtt_property_binary {
($name:ident, $id:expr) => {
mqtt_property_common!($name, $id, MqttBinary);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let escaped = escape_binary_json_string(self.val());
let mut state = serializer.serialize_struct(stringify!($name), 2)?;
state.serialize_field("id", &($id as u8))?;
state.serialize_field("val", &escaped)?;
state.end()
}
}
impl $name {
pub fn new<T>(v: T) -> Result<Self, MqttError>
where
T: TryInto<MqttBinary, Error = MqttError>,
{
let binary = v.try_into()?;
Ok(Self {
id_bytes: [$id as u8],
value: binary,
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
let (mqtt_binary, consumed) = MqttBinary::decode(bytes)?;
Ok((
Self {
id_bytes: [$id as u8],
value: mqtt_binary,
},
consumed,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
let mut result = vec![IoSlice::new(&self.id_bytes)];
let mut binary_bufs = self.value.to_buffers();
result.append(&mut binary_bufs);
result
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.append(&mut self.value.to_continuous_buffer());
buf
}
pub fn val(&self) -> &[u8] {
self.value.as_slice()
}
pub fn size(&self) -> usize {
1 + self.value.size() }
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match escape_binary_json_string(self.val()) {
Some(escaped) => write!(
f,
"{{\"id\": \"{}\", \"value\": \"{}\"}}",
self.id(),
escaped
),
None => write!(
f,
"{{\"id\": \"{}\", \"value\": \"{:?}\"}}",
self.id(),
self.val()
),
}
}
}
};
}
macro_rules! mqtt_property_string {
($name:ident, $id:expr) => {
mqtt_property_common!($name, $id, MqttString);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 2)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("val", self.val())?;
s.end()
}
}
impl $name {
pub fn new<T>(s: T) -> Result<Self, MqttError>
where
T: TryInto<MqttString, Error = MqttError>,
{
let value = s.try_into()?;
Ok(Self {
id_bytes: [$id as u8],
value,
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
let (mqtt_string, consumed) = MqttString::decode(bytes)?;
Ok((
Self {
id_bytes: [$id as u8],
value: mqtt_string,
},
consumed,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
let mut result = vec![IoSlice::new(&self.id_bytes)];
let mut string_bufs = self.value.to_buffers();
result.append(&mut string_bufs);
result
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.append(&mut self.value.to_continuous_buffer());
buf
}
pub fn val(&self) -> &str {
self.value.as_str()
}
pub fn size(&self) -> usize {
1 + self.value.size() }
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"value\": \"{}\"}}",
self.id(),
self.val()
)
}
}
};
}
macro_rules! mqtt_property_string_pair {
($name:ident, $id:expr) => {
mqtt_property_common!($name, $id, (MqttString, MqttString));
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 3)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("key", self.key())?;
s.serialize_field("val", self.val())?;
s.end()
}
}
impl $name {
pub fn new<K, V>(key: K, val: V) -> Result<Self, MqttError>
where
K: TryInto<MqttString, Error = MqttError>,
V: TryInto<MqttString, Error = MqttError>,
{
let key_mqtt = key.try_into()?;
let val_mqtt = val.try_into()?;
Ok(Self {
id_bytes: [$id as u8],
value: (key_mqtt, val_mqtt),
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
let (key, key_consumed) = MqttString::decode(bytes)?;
let (val, val_consumed) = MqttString::decode(&bytes[key_consumed..])?;
Ok((
Self {
id_bytes: [$id as u8],
value: (key, val),
},
key_consumed + val_consumed,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
let mut result = vec![IoSlice::new(&self.id_bytes)];
let mut key_bufs = self.value.0.to_buffers();
let mut val_bufs = self.value.1.to_buffers();
result.append(&mut key_bufs);
result.append(&mut val_bufs);
result
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.append(&mut self.value.0.to_continuous_buffer());
buf.append(&mut self.value.1.to_continuous_buffer());
buf
}
pub fn key(&self) -> &str {
self.value.0.as_str()
}
pub fn val(&self) -> &str {
self.value.1.as_str()
}
pub fn size(&self) -> usize {
1 + self.value.0.size() + self.value.1.size() }
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"key\": \"{}\", \"val\": \"{}\"}}",
self.id(),
self.key(),
self.val()
)
}
}
};
}
macro_rules! mqtt_property_u8_custom_new {
($name:ident, $id:expr, $validator:expr) => {
mqtt_property_common!($name, $id, [u8; 1]);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 2)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("val", &self.val())?;
s.end()
}
}
impl $name {
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
if bytes.len() < 1 {
return Err(MqttError::MalformedPacket);
}
if let Some(validator) = $validator {
validator(bytes[0])?;
}
Ok((
Self {
id_bytes: [$id as u8],
value: [bytes[0]],
},
1,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.extend_from_slice(&self.value);
buf
}
pub fn val(&self) -> u8 {
self.value[0]
}
pub fn size(&self) -> usize {
1 + self.value.len()
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"value\": {}}}",
self.id(),
self.val()
)
}
}
};
}
macro_rules! mqtt_property_u8 {
($name:ident, $id:expr, $validator:expr) => {
mqtt_property_u8_custom_new!($name, $id, $validator);
impl $name {
pub fn new(v: u8) -> Result<Self, MqttError> {
if let Some(validator) = $validator {
validator(v)?;
}
Ok(Self {
id_bytes: [$id as u8],
value: [v],
})
}
}
};
}
macro_rules! mqtt_property_u16 {
($name:ident, $id:expr, $validator:expr) => {
mqtt_property_common!($name, $id, [u8; 2]);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 2)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("val", &self.val())?;
s.end()
}
}
impl $name {
pub fn new(v: u16) -> Result<Self, MqttError> {
if let Some(validator) = $validator {
validator(v)?;
}
Ok(Self {
id_bytes: [$id as u8],
value: v.to_be_bytes(),
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
if bytes.len() < 2 {
return Err(MqttError::MalformedPacket);
}
let v = u16::from_be_bytes([bytes[0], bytes[1]]);
if let Some(validator) = $validator {
validator(v)?;
}
Ok((
Self {
id_bytes: [$id as u8],
value: bytes[..2].try_into().unwrap(),
},
2,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.extend_from_slice(&self.value);
buf
}
pub fn val(&self) -> u16 {
u16::from_be_bytes([self.value[0], self.value[1]])
}
pub fn size(&self) -> usize {
1 + self.value.len()
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"value\": {}}}",
self.id(),
self.val()
)
}
}
};
}
macro_rules! mqtt_property_u32 {
($name:ident, $id:expr, $validator:expr) => {
mqtt_property_common!($name, $id, [u8; 4]);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 2)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("value", &self.val())?;
s.end()
}
}
impl $name {
pub fn new(v: u32) -> Result<Self, MqttError> {
if let Some(validator) = $validator {
validator(v)?;
}
Ok(Self {
id_bytes: [$id as u8],
value: v.to_be_bytes(),
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
if bytes.len() < 4 {
return Err(MqttError::MalformedPacket);
}
let v = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
if let Some(validator) = $validator {
validator(v)?;
}
Ok((
Self {
id_bytes: [$id as u8],
value: bytes[..4].try_into().unwrap(),
},
4,
))
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.extend_from_slice(&self.value);
buf
}
pub fn val(&self) -> u32 {
u32::from_be_bytes([self.value[0], self.value[1], self.value[2], self.value[3]])
}
pub fn size(&self) -> usize {
1 + self.value.len()
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"value\": {}}}",
self.id(),
self.val()
)
}
}
};
}
macro_rules! mqtt_property_variable_integer {
($name:ident, $id:expr, $validator:expr) => {
mqtt_property_common!($name, $id, VariableByteInteger);
impl serde::Serialize for $name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct(stringify!($name), 2)?;
s.serialize_field("id", &($id as u8))?;
s.serialize_field("val", &self.val())?;
s.end()
}
}
impl $name {
pub fn new(v: u32) -> Result<Self, MqttError> {
let vbi = VariableByteInteger::from_u32(v).ok_or(MqttError::ValueOutOfRange)?;
if let Some(validator) = $validator {
validator(v)?;
}
Ok(Self {
id_bytes: [$id as u8],
value: vbi,
})
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
match VariableByteInteger::decode_stream(bytes) {
DecodeResult::Ok(vbi, len) => {
if let Some(validator) = $validator {
validator(vbi.to_u32())?;
}
Ok((
Self {
id_bytes: [$id as u8],
value: vbi,
},
len,
))
}
DecodeResult::Incomplete => Err(MqttError::InsufficientBytes),
DecodeResult::Err(_) => Err(MqttError::InsufficientBytes),
}
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
vec![
IoSlice::new(&self.id_bytes),
IoSlice::new(&self.value.as_bytes()),
]
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.id_bytes);
buf.append(&mut self.value.to_continuous_buffer());
buf
}
pub fn val(&self) -> u32 {
self.value.to_u32()
}
pub fn size(&self) -> usize {
1 + self.value.size()
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{\"id\": \"{}\", \"value\": {}}}",
self.id(),
self.val()
)
}
}
};
}
type U16Validator = fn(u16) -> Result<(), MqttError>;
type U32Validator = fn(u32) -> Result<(), MqttError>;
mqtt_property_u8_custom_new!(
PayloadFormatIndicator,
PropertyId::PayloadFormatIndicator,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
impl PayloadFormatIndicator {
pub fn new(v: PayloadFormat) -> Result<Self, MqttError> {
Ok(Self {
id_bytes: [PropertyId::PayloadFormatIndicator.as_u8(); 1],
value: [v as u8],
})
}
}
mqtt_property_u32!(
MessageExpiryInterval,
PropertyId::MessageExpiryInterval,
None::<U32Validator>
);
mqtt_property_string!(ContentType, PropertyId::ContentType);
mqtt_property_string!(ResponseTopic, PropertyId::ResponseTopic);
mqtt_property_binary!(CorrelationData, PropertyId::CorrelationData);
mqtt_property_variable_integer!(
SubscriptionIdentifier,
PropertyId::SubscriptionIdentifier,
Some(|v| {
if v == 0 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u32!(
SessionExpiryInterval,
PropertyId::SessionExpiryInterval,
None::<U32Validator>
);
mqtt_property_string!(
AssignedClientIdentifier,
PropertyId::AssignedClientIdentifier
);
mqtt_property_u16!(
ServerKeepAlive,
PropertyId::ServerKeepAlive,
None::<U16Validator>
);
mqtt_property_string!(AuthenticationMethod, PropertyId::AuthenticationMethod);
mqtt_property_binary!(AuthenticationData, PropertyId::AuthenticationData);
mqtt_property_u8!(
RequestProblemInformation,
PropertyId::RequestProblemInformation,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u32!(
WillDelayInterval,
PropertyId::WillDelayInterval,
None::<U32Validator>
);
mqtt_property_u8!(
RequestResponseInformation,
PropertyId::RequestResponseInformation,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_string!(ResponseInformation, PropertyId::ResponseInformation);
mqtt_property_string!(ServerReference, PropertyId::ServerReference);
mqtt_property_string!(ReasonString, PropertyId::ReasonString);
mqtt_property_u16!(
ReceiveMaximum,
PropertyId::ReceiveMaximum,
Some(|v| {
if v == 0 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u16!(
TopicAliasMaximum,
PropertyId::TopicAliasMaximum,
None::<U16Validator>
);
mqtt_property_u16!(
TopicAlias,
PropertyId::TopicAlias,
Some(|v| {
if v == 0 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u8!(
MaximumQos,
PropertyId::MaximumQos,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u8!(
RetainAvailable,
PropertyId::RetainAvailable,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_string_pair!(UserProperty, PropertyId::UserProperty);
mqtt_property_u32!(
MaximumPacketSize,
PropertyId::MaximumPacketSize,
Some(|v| {
if v == 0 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u8!(
WildcardSubscriptionAvailable,
PropertyId::WildcardSubscriptionAvailable,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u8!(
SubscriptionIdentifierAvailable,
PropertyId::SubscriptionIdentifierAvailable,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
mqtt_property_u8!(
SharedSubscriptionAvailable,
PropertyId::SharedSubscriptionAvailable,
Some(|v| {
if v > 1 {
Err(MqttError::ProtocolError)
} else {
Ok(())
}
})
);
#[derive(Debug, Serialize, PartialEq, Eq, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Property {
PayloadFormatIndicator(PayloadFormatIndicator),
MessageExpiryInterval(MessageExpiryInterval),
ContentType(ContentType),
ResponseTopic(ResponseTopic),
CorrelationData(CorrelationData),
SubscriptionIdentifier(SubscriptionIdentifier),
SessionExpiryInterval(SessionExpiryInterval),
AssignedClientIdentifier(AssignedClientIdentifier),
ServerKeepAlive(ServerKeepAlive),
AuthenticationMethod(AuthenticationMethod),
AuthenticationData(AuthenticationData),
RequestProblemInformation(RequestProblemInformation),
WillDelayInterval(WillDelayInterval),
RequestResponseInformation(RequestResponseInformation),
ResponseInformation(ResponseInformation),
ServerReference(ServerReference),
ReasonString(ReasonString),
ReceiveMaximum(ReceiveMaximum),
TopicAliasMaximum(TopicAliasMaximum),
TopicAlias(TopicAlias),
MaximumQos(MaximumQos),
RetainAvailable(RetainAvailable),
UserProperty(UserProperty),
MaximumPacketSize(MaximumPacketSize),
WildcardSubscriptionAvailable(WildcardSubscriptionAvailable),
SubscriptionIdentifierAvailable(SubscriptionIdentifierAvailable),
SharedSubscriptionAvailable(SharedSubscriptionAvailable),
}
impl fmt::Display for Property {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Property::PayloadFormatIndicator(p) => write!(f, "{p}"),
Property::MessageExpiryInterval(p) => write!(f, "{p}"),
Property::ContentType(p) => write!(f, "{p}"),
Property::ResponseTopic(p) => write!(f, "{p}"),
Property::CorrelationData(p) => write!(f, "{p}"),
Property::SubscriptionIdentifier(p) => write!(f, "{p}"),
Property::SessionExpiryInterval(p) => write!(f, "{p}"),
Property::AssignedClientIdentifier(p) => write!(f, "{p}"),
Property::ServerKeepAlive(p) => write!(f, "{p}"),
Property::AuthenticationMethod(p) => write!(f, "{p}"),
Property::AuthenticationData(p) => write!(f, "{p}"),
Property::RequestProblemInformation(p) => write!(f, "{p}"),
Property::WillDelayInterval(p) => write!(f, "{p}"),
Property::RequestResponseInformation(p) => write!(f, "{p}"),
Property::ResponseInformation(p) => write!(f, "{p}"),
Property::ServerReference(p) => write!(f, "{p}"),
Property::ReasonString(p) => write!(f, "{p}"),
Property::ReceiveMaximum(p) => write!(f, "{p}"),
Property::TopicAliasMaximum(p) => write!(f, "{p}"),
Property::TopicAlias(p) => write!(f, "{p}"),
Property::MaximumQos(p) => write!(f, "{p}"),
Property::RetainAvailable(p) => write!(f, "{p}"),
Property::UserProperty(p) => write!(f, "{p}"),
Property::MaximumPacketSize(p) => write!(f, "{p}"),
Property::WildcardSubscriptionAvailable(p) => write!(f, "{p}"),
Property::SubscriptionIdentifierAvailable(p) => write!(f, "{p}"),
Property::SharedSubscriptionAvailable(p) => write!(f, "{p}"),
}
}
}
pub trait PropertyValueAccess {
fn as_u8(&self) -> Option<u8>;
fn as_u16(&self) -> Option<u16>;
fn as_u32(&self) -> Option<u32>;
fn as_str(&self) -> Option<&str>;
fn as_bytes(&self) -> Option<&[u8]>;
fn as_key_value(&self) -> Option<(&str, &str)>;
}
impl PropertyValueAccess for Property {
fn as_u8(&self) -> Option<u8> {
match self {
Property::PayloadFormatIndicator(p) => Some(p.val()),
Property::MaximumQos(p) => Some(p.val()),
Property::RetainAvailable(p) => Some(p.val()),
Property::RequestProblemInformation(p) => Some(p.val()),
Property::RequestResponseInformation(p) => Some(p.val()),
Property::WildcardSubscriptionAvailable(p) => Some(p.val()),
Property::SubscriptionIdentifierAvailable(p) => Some(p.val()),
Property::SharedSubscriptionAvailable(p) => Some(p.val()),
_ => None,
}
}
fn as_u16(&self) -> Option<u16> {
match self {
Property::TopicAlias(p) => Some(p.val()),
Property::ReceiveMaximum(p) => Some(p.val()),
Property::TopicAliasMaximum(p) => Some(p.val()),
Property::ServerKeepAlive(p) => Some(p.val()),
_ => None,
}
}
fn as_u32(&self) -> Option<u32> {
match self {
Property::MessageExpiryInterval(p) => Some(p.val()),
Property::SessionExpiryInterval(p) => Some(p.val()),
Property::WillDelayInterval(p) => Some(p.val()),
Property::MaximumPacketSize(p) => Some(p.val()),
Property::SubscriptionIdentifier(p) => Some(p.val()),
_ => None,
}
}
fn as_str(&self) -> Option<&str> {
match self {
Property::ContentType(p) => Some(p.val()),
Property::ResponseTopic(p) => Some(p.val()),
Property::AssignedClientIdentifier(p) => Some(p.val()),
Property::AuthenticationMethod(p) => Some(p.val()),
Property::ResponseInformation(p) => Some(p.val()),
Property::ServerReference(p) => Some(p.val()),
Property::ReasonString(p) => Some(p.val()),
_ => None,
}
}
fn as_bytes(&self) -> Option<&[u8]> {
match self {
Property::CorrelationData(p) => Some(p.val()),
Property::AuthenticationData(p) => Some(p.val()),
_ => None,
}
}
fn as_key_value(&self) -> Option<(&str, &str)> {
match self {
Property::UserProperty(p) => Some((p.key(), p.val())),
_ => None,
}
}
}
impl Property {
pub fn id(&self) -> PropertyId {
match self {
Property::PayloadFormatIndicator(p) => p.id(),
Property::MessageExpiryInterval(p) => p.id(),
Property::ContentType(p) => p.id(),
Property::ResponseTopic(p) => p.id(),
Property::CorrelationData(p) => p.id(),
Property::SubscriptionIdentifier(p) => p.id(),
Property::SessionExpiryInterval(p) => p.id(),
Property::AssignedClientIdentifier(p) => p.id(),
Property::ServerKeepAlive(p) => p.id(),
Property::AuthenticationMethod(p) => p.id(),
Property::AuthenticationData(p) => p.id(),
Property::RequestProblemInformation(p) => p.id(),
Property::WillDelayInterval(p) => p.id(),
Property::RequestResponseInformation(p) => p.id(),
Property::ResponseInformation(p) => p.id(),
Property::ServerReference(p) => p.id(),
Property::ReasonString(p) => p.id(),
Property::ReceiveMaximum(p) => p.id(),
Property::TopicAliasMaximum(p) => p.id(),
Property::TopicAlias(p) => p.id(),
Property::MaximumQos(p) => p.id(),
Property::RetainAvailable(p) => p.id(),
Property::UserProperty(p) => p.id(),
Property::MaximumPacketSize(p) => p.id(),
Property::WildcardSubscriptionAvailable(p) => p.id(),
Property::SubscriptionIdentifierAvailable(p) => p.id(),
Property::SharedSubscriptionAvailable(p) => p.id(),
}
}
pub fn size(&self) -> usize {
match self {
Property::PayloadFormatIndicator(p) => p.size(),
Property::MessageExpiryInterval(p) => p.size(),
Property::ContentType(p) => p.size(),
Property::ResponseTopic(p) => p.size(),
Property::CorrelationData(p) => p.size(),
Property::SubscriptionIdentifier(p) => p.size(),
Property::SessionExpiryInterval(p) => p.size(),
Property::AssignedClientIdentifier(p) => p.size(),
Property::ServerKeepAlive(p) => p.size(),
Property::AuthenticationMethod(p) => p.size(),
Property::AuthenticationData(p) => p.size(),
Property::RequestProblemInformation(p) => p.size(),
Property::WillDelayInterval(p) => p.size(),
Property::RequestResponseInformation(p) => p.size(),
Property::ResponseInformation(p) => p.size(),
Property::ServerReference(p) => p.size(),
Property::ReasonString(p) => p.size(),
Property::ReceiveMaximum(p) => p.size(),
Property::TopicAliasMaximum(p) => p.size(),
Property::TopicAlias(p) => p.size(),
Property::MaximumQos(p) => p.size(),
Property::RetainAvailable(p) => p.size(),
Property::UserProperty(p) => p.size(),
Property::MaximumPacketSize(p) => p.size(),
Property::WildcardSubscriptionAvailable(p) => p.size(),
Property::SubscriptionIdentifierAvailable(p) => p.size(),
Property::SharedSubscriptionAvailable(p) => p.size(),
}
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
match self {
Property::PayloadFormatIndicator(p) => p.to_buffers(),
Property::MessageExpiryInterval(p) => p.to_buffers(),
Property::ContentType(p) => p.to_buffers(),
Property::ResponseTopic(p) => p.to_buffers(),
Property::CorrelationData(p) => p.to_buffers(),
Property::SubscriptionIdentifier(p) => p.to_buffers(),
Property::SessionExpiryInterval(p) => p.to_buffers(),
Property::AssignedClientIdentifier(p) => p.to_buffers(),
Property::ServerKeepAlive(p) => p.to_buffers(),
Property::AuthenticationMethod(p) => p.to_buffers(),
Property::AuthenticationData(p) => p.to_buffers(),
Property::RequestProblemInformation(p) => p.to_buffers(),
Property::WillDelayInterval(p) => p.to_buffers(),
Property::RequestResponseInformation(p) => p.to_buffers(),
Property::ResponseInformation(p) => p.to_buffers(),
Property::ServerReference(p) => p.to_buffers(),
Property::ReasonString(p) => p.to_buffers(),
Property::ReceiveMaximum(p) => p.to_buffers(),
Property::TopicAliasMaximum(p) => p.to_buffers(),
Property::TopicAlias(p) => p.to_buffers(),
Property::MaximumQos(p) => p.to_buffers(),
Property::RetainAvailable(p) => p.to_buffers(),
Property::UserProperty(p) => p.to_buffers(),
Property::MaximumPacketSize(p) => p.to_buffers(),
Property::WildcardSubscriptionAvailable(p) => p.to_buffers(),
Property::SubscriptionIdentifierAvailable(p) => p.to_buffers(),
Property::SharedSubscriptionAvailable(p) => p.to_buffers(),
}
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
match self {
Property::PayloadFormatIndicator(p) => p.to_continuous_buffer(),
Property::MessageExpiryInterval(p) => p.to_continuous_buffer(),
Property::ContentType(p) => p.to_continuous_buffer(),
Property::ResponseTopic(p) => p.to_continuous_buffer(),
Property::CorrelationData(p) => p.to_continuous_buffer(),
Property::SubscriptionIdentifier(p) => p.to_continuous_buffer(),
Property::SessionExpiryInterval(p) => p.to_continuous_buffer(),
Property::AssignedClientIdentifier(p) => p.to_continuous_buffer(),
Property::ServerKeepAlive(p) => p.to_continuous_buffer(),
Property::AuthenticationMethod(p) => p.to_continuous_buffer(),
Property::AuthenticationData(p) => p.to_continuous_buffer(),
Property::RequestProblemInformation(p) => p.to_continuous_buffer(),
Property::WillDelayInterval(p) => p.to_continuous_buffer(),
Property::RequestResponseInformation(p) => p.to_continuous_buffer(),
Property::ResponseInformation(p) => p.to_continuous_buffer(),
Property::ServerReference(p) => p.to_continuous_buffer(),
Property::ReasonString(p) => p.to_continuous_buffer(),
Property::ReceiveMaximum(p) => p.to_continuous_buffer(),
Property::TopicAliasMaximum(p) => p.to_continuous_buffer(),
Property::TopicAlias(p) => p.to_continuous_buffer(),
Property::MaximumQos(p) => p.to_continuous_buffer(),
Property::RetainAvailable(p) => p.to_continuous_buffer(),
Property::UserProperty(p) => p.to_continuous_buffer(),
Property::MaximumPacketSize(p) => p.to_continuous_buffer(),
Property::WildcardSubscriptionAvailable(p) => p.to_continuous_buffer(),
Property::SubscriptionIdentifierAvailable(p) => p.to_continuous_buffer(),
Property::SharedSubscriptionAvailable(p) => p.to_continuous_buffer(),
}
}
pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
if bytes.is_empty() {
return Err(MqttError::MalformedPacket);
}
let id = PropertyId::try_from(bytes[0]).map_err(|_| MqttError::MalformedPacket)?;
let (prop, len) = match id {
PropertyId::PayloadFormatIndicator => {
let (p, l) = PayloadFormatIndicator::parse(&bytes[1..])?;
(Self::PayloadFormatIndicator(p), l + 1)
}
PropertyId::MessageExpiryInterval => {
let (p, l) = MessageExpiryInterval::parse(&bytes[1..])?;
(Self::MessageExpiryInterval(p), l + 1)
}
PropertyId::ContentType => {
let (p, l) = ContentType::parse(&bytes[1..])?;
(Self::ContentType(p), l + 1)
}
PropertyId::ResponseTopic => {
let (p, l) = ResponseTopic::parse(&bytes[1..])?;
(Self::ResponseTopic(p), l + 1)
}
PropertyId::CorrelationData => {
let (p, l) = CorrelationData::parse(&bytes[1..])?;
(Self::CorrelationData(p), l + 1)
}
PropertyId::SubscriptionIdentifier => {
let (p, l) = SubscriptionIdentifier::parse(&bytes[1..])?;
(Self::SubscriptionIdentifier(p), l + 1)
}
PropertyId::SessionExpiryInterval => {
let (p, l) = SessionExpiryInterval::parse(&bytes[1..])?;
(Self::SessionExpiryInterval(p), l + 1)
}
PropertyId::AssignedClientIdentifier => {
let (p, l) = AssignedClientIdentifier::parse(&bytes[1..])?;
(Self::AssignedClientIdentifier(p), l + 1)
}
PropertyId::ServerKeepAlive => {
let (p, l) = ServerKeepAlive::parse(&bytes[1..])?;
(Self::ServerKeepAlive(p), l + 1)
}
PropertyId::AuthenticationMethod => {
let (p, l) = AuthenticationMethod::parse(&bytes[1..])?;
(Self::AuthenticationMethod(p), l + 1)
}
PropertyId::AuthenticationData => {
let (p, l) = AuthenticationData::parse(&bytes[1..])?;
(Self::AuthenticationData(p), l + 1)
}
PropertyId::RequestProblemInformation => {
let (p, l) = RequestProblemInformation::parse(&bytes[1..])?;
(Self::RequestProblemInformation(p), l + 1)
}
PropertyId::WillDelayInterval => {
let (p, l) = WillDelayInterval::parse(&bytes[1..])?;
(Self::WillDelayInterval(p), l + 1)
}
PropertyId::RequestResponseInformation => {
let (p, l) = RequestResponseInformation::parse(&bytes[1..])?;
(Self::RequestResponseInformation(p), l + 1)
}
PropertyId::ResponseInformation => {
let (p, l) = ResponseInformation::parse(&bytes[1..])?;
(Self::ResponseInformation(p), l + 1)
}
PropertyId::ServerReference => {
let (p, l) = ServerReference::parse(&bytes[1..])?;
(Self::ServerReference(p), l + 1)
}
PropertyId::ReasonString => {
let (p, l) = ReasonString::parse(&bytes[1..])?;
(Self::ReasonString(p), l + 1)
}
PropertyId::ReceiveMaximum => {
let (p, l) = ReceiveMaximum::parse(&bytes[1..])?;
(Self::ReceiveMaximum(p), l + 1)
}
PropertyId::TopicAliasMaximum => {
let (p, l) = TopicAliasMaximum::parse(&bytes[1..])?;
(Self::TopicAliasMaximum(p), l + 1)
}
PropertyId::TopicAlias => {
let (p, l) = TopicAlias::parse(&bytes[1..])?;
(Self::TopicAlias(p), l + 1)
}
PropertyId::MaximumQos => {
let (p, l) = MaximumQos::parse(&bytes[1..])?;
(Self::MaximumQos(p), l + 1)
}
PropertyId::RetainAvailable => {
let (p, l) = RetainAvailable::parse(&bytes[1..])?;
(Self::RetainAvailable(p), l + 1)
}
PropertyId::UserProperty => {
let (p, l) = UserProperty::parse(&bytes[1..])?;
(Self::UserProperty(p), l + 1)
}
PropertyId::MaximumPacketSize => {
let (p, l) = MaximumPacketSize::parse(&bytes[1..])?;
(Self::MaximumPacketSize(p), l + 1)
}
PropertyId::WildcardSubscriptionAvailable => {
let (p, l) = WildcardSubscriptionAvailable::parse(&bytes[1..])?;
(Self::WildcardSubscriptionAvailable(p), l + 1)
}
PropertyId::SubscriptionIdentifierAvailable => {
let (p, l) = SubscriptionIdentifierAvailable::parse(&bytes[1..])?;
(Self::SubscriptionIdentifierAvailable(p), l + 1)
}
PropertyId::SharedSubscriptionAvailable => {
let (p, l) = SharedSubscriptionAvailable::parse(&bytes[1..])?;
(Self::SharedSubscriptionAvailable(p), l + 1)
}
};
Ok((prop, len))
}
}
pub type Properties = Vec<Property>;
pub trait PropertiesToContinuousBuffer {
fn to_continuous_buffer(&self) -> Vec<u8>;
}
#[cfg(feature = "std")]
pub trait PropertiesToBuffers {
fn to_buffers(&self) -> Vec<IoSlice<'_>>;
}
impl PropertiesToContinuousBuffer for Properties {
fn to_continuous_buffer(&self) -> Vec<u8> {
let mut result = Vec::new();
for prop in self {
result.append(&mut prop.to_continuous_buffer());
}
result
}
}
#[cfg(feature = "std")]
impl PropertiesToBuffers for Properties {
fn to_buffers(&self) -> Vec<IoSlice<'_>> {
let mut result = Vec::new();
for prop in self {
result.append(&mut prop.to_buffers());
}
result
}
}
pub trait PropertiesSize {
fn size(&self) -> usize;
}
impl PropertiesSize for Properties {
fn size(&self) -> usize {
self.iter().map(|prop| prop.size()).sum()
}
}
pub trait PropertiesParse {
fn parse(data: &[u8]) -> Result<(Self, usize), MqttError>
where
Self: Sized;
}
impl PropertiesParse for Properties {
fn parse(data: &[u8]) -> Result<(Self, usize), MqttError> {
if data.is_empty() {
return Err(MqttError::MalformedPacket);
}
let (prop_len, consumed) = match VariableByteInteger::decode_stream(data) {
DecodeResult::Ok(vbi, cons) => (vbi, cons),
_ => return Err(MqttError::MalformedPacket),
};
let mut cursor = consumed;
let mut props = Properties::new();
if prop_len.to_u32() == 0 {
return Ok((props, cursor));
}
let props_end = cursor + prop_len.to_u32() as usize;
if props_end > data.len() {
return Err(MqttError::MalformedPacket);
}
while cursor < props_end {
let (p, c) = Property::parse(&data[cursor..props_end])?;
props.push(p);
cursor += c;
}
Ok((props, cursor))
}
}