use crate::error::{MqttError, Result};
use crate::packet::{FixedHeader, MqttPacket, PacketType};
use crate::prelude::{format, String, ToString, Vec};
use crate::protocol::v5::properties::Properties;
use crate::types::ProtocolVersion;
use crate::QoS;
use bytes::{Buf, BufMut};
#[derive(Debug, Clone)]
pub struct SubAckPacket {
pub packet_id: u16,
pub reason_codes: Vec<SubAckReasonCode>,
pub properties: Properties,
pub protocol_version: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SubAckReasonCode {
GrantedQoS0 = 0x00,
GrantedQoS1 = 0x01,
GrantedQoS2 = 0x02,
UnspecifiedError = 0x80,
ImplementationSpecificError = 0x83,
NotAuthorized = 0x87,
TopicFilterInvalid = 0x8F,
PacketIdentifierInUse = 0x91,
QuotaExceeded = 0x97,
SharedSubscriptionsNotSupported = 0x9E,
SubscriptionIdentifiersNotSupported = 0xA1,
WildcardSubscriptionsNotSupported = 0xA2,
}
impl SubAckReasonCode {
#[must_use]
pub fn from_qos(qos: QoS) -> Self {
match qos {
QoS::AtMostOnce => Self::GrantedQoS0,
QoS::AtLeastOnce => Self::GrantedQoS1,
QoS::ExactlyOnce => Self::GrantedQoS2,
}
}
#[must_use]
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0x00 => Some(Self::GrantedQoS0),
0x01 => Some(Self::GrantedQoS1),
0x02 => Some(Self::GrantedQoS2),
0x80 => Some(Self::UnspecifiedError),
0x83 => Some(Self::ImplementationSpecificError),
0x87 => Some(Self::NotAuthorized),
0x8F => Some(Self::TopicFilterInvalid),
0x91 => Some(Self::PacketIdentifierInUse),
0x97 => Some(Self::QuotaExceeded),
0x9E => Some(Self::SharedSubscriptionsNotSupported),
0xA1 => Some(Self::SubscriptionIdentifiersNotSupported),
0xA2 => Some(Self::WildcardSubscriptionsNotSupported),
_ => None,
}
}
#[must_use]
pub fn is_success(&self) -> bool {
matches!(
self,
Self::GrantedQoS0 | Self::GrantedQoS1 | Self::GrantedQoS2
)
}
#[must_use]
pub fn granted_qos(&self) -> Option<QoS> {
match self {
Self::GrantedQoS0 => Some(QoS::AtMostOnce),
Self::GrantedQoS1 => Some(QoS::AtLeastOnce),
Self::GrantedQoS2 => Some(QoS::ExactlyOnce),
_ => None,
}
}
}
impl SubAckPacket {
#[must_use]
pub fn new(packet_id: u16) -> Self {
Self {
packet_id,
reason_codes: Vec::new(),
properties: Properties::default(),
protocol_version: 5,
}
}
#[must_use]
pub fn new_v311(packet_id: u16) -> Self {
Self {
packet_id,
reason_codes: Vec::new(),
properties: Properties::default(),
protocol_version: 4,
}
}
fn reason_code_to_v311(code: SubAckReasonCode) -> u8 {
match code {
SubAckReasonCode::GrantedQoS0 => 0x00,
SubAckReasonCode::GrantedQoS1 => 0x01,
SubAckReasonCode::GrantedQoS2 => 0x02,
_ => 0x80,
}
}
#[must_use]
pub fn add_reason_code(mut self, code: SubAckReasonCode) -> Self {
self.reason_codes.push(code);
self
}
#[must_use]
pub fn add_granted_qos(mut self, qos: QoS) -> Self {
self.reason_codes.push(SubAckReasonCode::from_qos(qos));
self
}
#[must_use]
pub fn with_reason_string(mut self, reason: String) -> Self {
self.properties.set_reason_string(reason);
self
}
#[must_use]
pub fn with_user_property(mut self, key: String, value: String) -> Self {
self.properties.add_user_property(key, value);
self
}
}
impl MqttPacket for SubAckPacket {
fn packet_type(&self) -> PacketType {
PacketType::SubAck
}
fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<()> {
buf.put_u16(self.packet_id);
if self.protocol_version == 5 {
self.properties.encode(buf)?;
}
if self.reason_codes.is_empty() {
return Err(MqttError::MalformedPacket(
"SUBACK packet must contain at least one reason code".to_string(),
));
}
if self.protocol_version == 5 {
for code in &self.reason_codes {
buf.put_u8(*code as u8);
}
} else {
for code in &self.reason_codes {
buf.put_u8(Self::reason_code_to_v311(*code));
}
}
Ok(())
}
fn decode_body<B: Buf>(buf: &mut B, fixed_header: &FixedHeader) -> Result<Self> {
Self::decode_body_with_version(buf, fixed_header, 5)
}
}
impl SubAckPacket {
pub fn decode_body_with_version<B: Buf>(
buf: &mut B,
_fixed_header: &FixedHeader,
protocol_version: u8,
) -> Result<Self> {
ProtocolVersion::try_from(protocol_version)
.map_err(|()| MqttError::UnsupportedProtocolVersion)?;
if buf.remaining() < 2 {
return Err(MqttError::MalformedPacket(
"SUBACK missing packet identifier".to_string(),
));
}
let packet_id = buf.get_u16();
let properties = if protocol_version == 5 {
Properties::decode(buf)?
} else {
Properties::default()
};
let mut reason_codes = Vec::new();
if !buf.has_remaining() {
return Err(MqttError::MalformedPacket(
"SUBACK packet must contain at least one reason code".to_string(),
));
}
while buf.has_remaining() {
let code_byte = buf.get_u8();
let code = SubAckReasonCode::from_u8(code_byte).ok_or_else(|| {
MqttError::MalformedPacket(format!("Invalid SUBACK reason code: 0x{code_byte:02X}"))
})?;
reason_codes.push(code);
}
Ok(Self {
packet_id,
reason_codes,
properties,
protocol_version,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::v5::properties::PropertyId;
use bytes::BytesMut;
#[test]
fn test_suback_reason_code_from_qos() {
assert_eq!(
SubAckReasonCode::from_qos(QoS::AtMostOnce),
SubAckReasonCode::GrantedQoS0
);
assert_eq!(
SubAckReasonCode::from_qos(QoS::AtLeastOnce),
SubAckReasonCode::GrantedQoS1
);
assert_eq!(
SubAckReasonCode::from_qos(QoS::ExactlyOnce),
SubAckReasonCode::GrantedQoS2
);
}
#[test]
fn test_suback_reason_code_is_success() {
assert!(SubAckReasonCode::GrantedQoS0.is_success());
assert!(SubAckReasonCode::GrantedQoS1.is_success());
assert!(SubAckReasonCode::GrantedQoS2.is_success());
assert!(!SubAckReasonCode::NotAuthorized.is_success());
assert!(!SubAckReasonCode::TopicFilterInvalid.is_success());
}
#[test]
fn test_suback_basic() {
let packet = SubAckPacket::new(123)
.add_granted_qos(QoS::AtLeastOnce)
.add_granted_qos(QoS::ExactlyOnce)
.add_reason_code(SubAckReasonCode::NotAuthorized);
assert_eq!(packet.packet_id, 123);
assert_eq!(packet.reason_codes.len(), 3);
assert_eq!(packet.reason_codes[0], SubAckReasonCode::GrantedQoS1);
assert_eq!(packet.reason_codes[1], SubAckReasonCode::GrantedQoS2);
assert_eq!(packet.reason_codes[2], SubAckReasonCode::NotAuthorized);
}
#[test]
fn test_suback_encode_decode() {
let packet = SubAckPacket::new(789)
.add_granted_qos(QoS::AtMostOnce)
.add_granted_qos(QoS::AtLeastOnce)
.add_reason_code(SubAckReasonCode::TopicFilterInvalid)
.with_reason_string("Invalid wildcard usage".to_string());
let mut buf = BytesMut::new();
packet.encode(&mut buf).unwrap();
let fixed_header = FixedHeader::decode(&mut buf).unwrap();
assert_eq!(fixed_header.packet_type, PacketType::SubAck);
let decoded = SubAckPacket::decode_body(&mut buf, &fixed_header).unwrap();
assert_eq!(decoded.packet_id, 789);
assert_eq!(decoded.reason_codes.len(), 3);
assert_eq!(decoded.reason_codes[0], SubAckReasonCode::GrantedQoS0);
assert_eq!(decoded.reason_codes[1], SubAckReasonCode::GrantedQoS1);
assert_eq!(
decoded.reason_codes[2],
SubAckReasonCode::TopicFilterInvalid
);
let reason_str = decoded.properties.get(PropertyId::ReasonString);
assert!(reason_str.is_some());
}
#[test]
fn test_suback_empty_reason_codes() {
let packet = SubAckPacket::new(123);
let mut buf = BytesMut::new();
let result = packet.encode(&mut buf);
assert!(result.is_err());
}
}