use crate::mqtt::packet::MqttString;
use crate::mqtt::packet::Qos;
use crate::mqtt::packet::RetainHandling;
use crate::mqtt::result_code::MqttError;
use alloc::string::ToString;
use alloc::{string::String, vec::Vec};
use core::convert::TryInto;
use core::fmt;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
#[cfg(feature = "std")]
use std::io::IoSlice;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SubOpts {
sub_opts_buf: [u8; 1],
}
impl SubOpts {
pub fn new() -> Self {
Self { sub_opts_buf: [0] }
}
pub fn from_u8(value: u8) -> Result<Self, MqttError> {
if (value & 0b1100_0000) != 0 {
return Err(MqttError::MalformedPacket);
}
let qos_value = value & 0b0000_0011;
if qos_value > 2 {
return Err(MqttError::MalformedPacket);
}
let rh_value = (value & 0b0011_0000) >> 4;
if rh_value > 2 {
return Err(MqttError::MalformedPacket);
}
Ok(Self {
sub_opts_buf: [value],
})
}
pub fn qos(&self) -> Qos {
let qos_value = self.sub_opts_buf[0] & 0b0000_0011;
match qos_value {
0 => Qos::AtMostOnce,
1 => Qos::AtLeastOnce,
2 => Qos::ExactlyOnce,
_ => unreachable!("Invalid QoS value: {}, this should never happen", qos_value),
}
}
pub fn set_qos(mut self, qos: Qos) -> Self {
self.sub_opts_buf[0] &= 0b1111_1100;
self.sub_opts_buf[0] |= qos as u8;
self
}
pub fn nl(&self) -> bool {
(self.sub_opts_buf[0] & 0b0000_0100) != 0
}
pub fn set_nl(mut self, nl: bool) -> Self {
if nl {
self.sub_opts_buf[0] |= 0b0000_0100; } else {
self.sub_opts_buf[0] &= !0b0000_0100; }
self
}
pub fn rap(&self) -> bool {
(self.sub_opts_buf[0] & 0b0000_1000) != 0
}
pub fn set_rap(mut self, rap: bool) -> Self {
if rap {
self.sub_opts_buf[0] |= 0b0000_1000; } else {
self.sub_opts_buf[0] &= !0b0000_1000; }
self
}
pub fn rh(&self) -> RetainHandling {
let rh_value = (self.sub_opts_buf[0] & 0b0011_0000) >> 4;
match rh_value {
0 => RetainHandling::SendRetained,
1 => RetainHandling::SendRetainedIfNotExists,
2 => RetainHandling::DoNotSendRetained,
_ => unreachable!(
"Invalid RetainHandling value: {}, this should never happen",
rh_value
),
}
}
pub fn set_rh(mut self, rh: RetainHandling) -> Self {
self.sub_opts_buf[0] &= 0b1100_1111;
self.sub_opts_buf[0] |= (rh as u8) << 4;
self
}
pub fn to_buffer(&self) -> &[u8; 1] {
&self.sub_opts_buf
}
}
impl Default for SubOpts {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for SubOpts {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_string(self) {
Ok(json) => write!(f, "{json}"),
Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
}
}
}
impl Serialize for SubOpts {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("SubOpts", 4)?;
state.serialize_field("qos", &self.qos().to_string())?;
state.serialize_field("nl", &self.nl())?;
state.serialize_field("rap", &self.rap())?;
state.serialize_field("rh", &self.rh().to_string())?;
state.end()
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct SubEntry {
topic_filter: MqttString,
sub_opts: SubOpts,
}
impl SubEntry {
pub fn new<T>(topic_filter: T, sub_opts: SubOpts) -> Result<Self, MqttError>
where
T: TryInto<MqttString, Error = MqttError>,
{
let topic_filter = topic_filter.try_into()?;
Ok(Self {
topic_filter,
sub_opts,
})
}
pub fn topic_filter(&self) -> &str {
&self.topic_filter.as_str()
}
pub fn sub_opts(&self) -> &SubOpts {
&self.sub_opts
}
pub fn set_topic_filter(&mut self, topic_filter: String) -> Result<(), MqttError> {
self.topic_filter = MqttString::new(topic_filter)?;
Ok(())
}
pub fn set_sub_opts(&mut self, sub_opts: SubOpts) {
self.sub_opts = sub_opts;
}
#[cfg(feature = "std")]
pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
let mut buffers = self.topic_filter.to_buffers();
buffers.push(IoSlice::new(self.sub_opts.to_buffer()));
buffers
}
pub fn to_continuous_buffer(&self) -> Vec<u8> {
let mut buf = self.topic_filter.to_continuous_buffer();
buf.extend_from_slice(self.sub_opts.to_buffer());
buf
}
pub fn size(&self) -> usize {
self.topic_filter.size() + self.sub_opts.to_buffer().len()
}
pub fn parse(data: &[u8]) -> Result<(Self, usize), MqttError> {
let mut cursor = 0;
let (topic_filter, consumed) = MqttString::decode(&data[cursor..])?;
cursor += consumed;
if cursor >= data.len() {
return Err(MqttError::MalformedPacket);
}
let sub_opts = SubOpts::from_u8(data[cursor])?;
cursor += 1;
Ok((
Self {
topic_filter,
sub_opts,
},
cursor,
))
}
}
impl Default for SubEntry {
fn default() -> Self {
Self {
topic_filter: MqttString::new(String::new()).unwrap(),
sub_opts: SubOpts::default(),
}
}
}
impl fmt::Display for SubEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_string(self) {
Ok(json) => write!(f, "{json}"),
Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
}
}
}
impl Serialize for SubEntry {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("SubEntry", 2)?;
state.serialize_field("topic_filter", self.topic_filter())?;
state.serialize_field("options", self.sub_opts())?;
state.end()
}
}