use std::convert::TryFrom;
use super::{
property::check_multiple_subscription_identifiers, property::check_property_type_list,
Properties, PropertyType,
};
use crate::{
ByteArray, DecodeError, DecodePacket, EncodeError, EncodePacket, FixedHeader, Packet, PacketId,
PacketType, QoS, SubTopic, VarIntError,
};
#[repr(u8)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum RetainHandling {
#[default]
Send = 0,
SendFirst = 1,
NoSend = 2,
}
impl TryFrom<u8> for RetainHandling {
type Error = DecodeError;
fn try_from(v: u8) -> Result<Self, Self::Error> {
match v {
0 => Ok(Self::Send),
1 => Ok(Self::SendFirst),
2 => Ok(Self::NoSend),
_ => Err(DecodeError::OtherErrors),
}
}
}
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SubscribeTopic {
topic: SubTopic,
qos: QoS,
no_local: bool,
retain_as_published: bool,
retain_handling: RetainHandling,
}
impl SubscribeTopic {
pub fn new(topic: &str, qos: QoS) -> Result<Self, EncodeError> {
let topic = SubTopic::new(topic)?;
Ok(Self {
topic,
qos,
..Self::default()
})
}
pub fn set_topic(&mut self, topic: &str) -> Result<&mut Self, EncodeError> {
self.topic = SubTopic::new(topic)?;
Ok(self)
}
#[must_use]
pub fn topic(&self) -> &str {
self.topic.as_ref()
}
pub fn set_qos(&mut self, qos: QoS) -> &mut Self {
self.qos = qos;
self
}
#[must_use]
pub const fn qos(&self) -> QoS {
self.qos
}
pub fn set_no_local(&mut self, no_local: bool) -> &mut Self {
self.no_local = no_local;
self
}
#[must_use]
pub const fn no_local(&self) -> bool {
self.no_local
}
pub fn set_retain_as_published(&mut self, retain_as_published: bool) -> &mut Self {
self.retain_as_published = retain_as_published;
self
}
#[must_use]
pub const fn retain_as_published(&self) -> bool {
self.retain_as_published
}
pub fn set_retain_handling(&mut self, retain_handling: RetainHandling) -> &mut Self {
self.retain_handling = retain_handling;
self
}
#[must_use]
pub const fn retain_handling(&self) -> RetainHandling {
self.retain_handling
}
pub fn bytes(&self) -> usize {
1 + self.topic.bytes()
}
}
impl EncodePacket for SubscribeTopic {
fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
self.topic.encode(buf)?;
let mut flag: u8 = 0b0000_0011 & (self.qos as u8);
if self.no_local {
flag |= 0b0000_0100;
}
if self.retain_as_published {
flag |= 0b0000_1000;
}
flag |= 0b0011_0000 & (self.retain_handling as u8);
buf.push(flag);
Ok(self.bytes())
}
}
impl DecodePacket for SubscribeTopic {
fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
let topic = SubTopic::decode(ba)?;
let flag = ba.read_byte()?;
let qos = QoS::try_from(flag & 0b0000_0011)?;
let no_local = (flag & 0b0000_0100) == 0b0000_0100;
let retain_as_published = (flag & 0b0000_1000) == 0b0000_1000;
let retain_handling = RetainHandling::try_from(flag & 0b0011_0000)?;
if flag & 0b1100_0000 != 0b0000_0000 {
return Err(DecodeError::OtherErrors);
}
Ok(Self {
topic,
qos,
no_local,
retain_as_published,
retain_handling,
})
}
}
#[allow(clippy::module_name_repetitions)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SubscribePacket {
packet_id: PacketId,
properties: Properties,
topics: Vec<SubscribeTopic>,
}
pub const SUBSCRIBE_PROPERTIES: &[PropertyType] = &[
PropertyType::SubscriptionIdentifier,
PropertyType::UserProperty,
];
impl SubscribePacket {
pub fn new(topic: &str, qos: QoS, packet_id: PacketId) -> Result<Self, EncodeError> {
let topic = SubscribeTopic::new(topic, qos)?;
Ok(Self {
packet_id,
properties: Properties::new(),
topics: vec![topic],
})
}
pub fn set_packet_id(&mut self, packet_id: PacketId) -> &mut Self {
self.packet_id = packet_id;
self
}
#[must_use]
pub const fn packet_id(&self) -> PacketId {
self.packet_id
}
pub fn properties_mut(&mut self) -> &mut Properties {
&mut self.properties
}
#[must_use]
pub const fn properties(&self) -> &Properties {
&self.properties
}
pub fn set_topics(&mut self, topics: &[SubscribeTopic]) -> &mut Self {
self.topics.clear();
self.topics.extend_from_slice(topics);
self
}
#[must_use]
pub fn topics(&self) -> &[SubscribeTopic] {
&self.topics
}
pub fn mut_topics(&mut self) -> &mut Vec<SubscribeTopic> {
&mut self.topics
}
fn get_fixed_header(&self) -> Result<FixedHeader, VarIntError> {
let mut remaining_length = PacketId::bytes();
for topic in &self.topics {
remaining_length += topic.bytes();
}
FixedHeader::new(PacketType::Subscribe, remaining_length)
}
}
impl DecodePacket for SubscribePacket {
fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
let fixed_header = FixedHeader::decode(ba)?;
if fixed_header.packet_type() != PacketType::Subscribe {
return Err(DecodeError::InvalidPacketType);
}
let packet_id = PacketId::decode(ba)?;
if packet_id.value() == 0 {
return Err(DecodeError::InvalidPacketId);
}
let properties = Properties::decode(ba)?;
if let Err(property_type) =
check_property_type_list(properties.props(), SUBSCRIBE_PROPERTIES)
{
log::error!(
"v5/SubscribePacket: property type {:?} cannot be used in properties!",
property_type
);
return Err(DecodeError::InvalidPropertyType);
}
if let Err(property_type) = check_multiple_subscription_identifiers(properties.props()) {
log::error!(
"v5/SubscribePacket: property type {:?} cannot be used in properties!",
property_type
);
return Err(DecodeError::InvalidPropertyType);
}
let mut remaining_length = PacketId::bytes() + properties.bytes();
let mut topics = Vec::new();
while remaining_length < fixed_header.remaining_length() {
let topic = SubscribeTopic::decode(ba)?;
remaining_length += topic.bytes();
topics.push(topic);
}
if topics.is_empty() {
return Err(DecodeError::EmptyTopicFilter);
}
Ok(Self {
packet_id,
properties,
topics,
})
}
}
impl EncodePacket for SubscribePacket {
fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
let old_len = buf.len();
let fixed_header = self.get_fixed_header()?;
fixed_header.encode(buf)?;
self.packet_id.encode(buf)?;
for topic in &self.topics {
topic.encode(buf)?;
}
Ok(buf.len() - old_len)
}
}
impl Packet for SubscribePacket {
fn packet_type(&self) -> PacketType {
PacketType::Subscribe
}
fn bytes(&self) -> Result<usize, VarIntError> {
let fixed_header = self.get_fixed_header()?;
Ok(fixed_header.bytes() + fixed_header.remaining_length())
}
}