mqtt_async_embedded/
packet.rs

1//! # MQTT Packet Structures and Serialization
2//!
3//! This module defines the core MQTT packet types and the traits for encoding and
4//! decoding them to and from a byte buffer. It supports both MQTT v3.1.1 and v5
5//! through conditional compilation.
6
7use crate::client::MqttVersion;
8use crate::error::{MqttError, ProtocolError};
9use crate::transport;
10use crate::util::{
11    self, read_utf8_string, read_variable_byte_integer, write_utf8_string,
12};
13use core::marker::PhantomData;
14use heapless::Vec;
15
16// Conditionally import v5-specific utilities only when the feature is enabled.
17#[cfg(feature = "v5")]
18use crate::util::{read_properties, write_properties};
19
20/// Represents the Quality of Service (QoS) levels for MQTT messages.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
22#[cfg_attr(feature = "defmt", derive(defmt::Format))]
23#[repr(u8)]
24pub enum QoS {
25    AtMostOnce = 0,
26    AtLeastOnce = 1,
27    ExactlyOnce = 2,
28}
29
30/// A trait for packets that can be encoded into a byte buffer.
31pub trait EncodePacket {
32    fn encode(
33        &self,
34        buf: &mut [u8],
35        version: MqttVersion,
36    ) -> Result<usize, MqttError<transport::ErrorPlaceHolder>>;
37}
38
39/// A trait for packets that can be decoded from a byte buffer.
40pub trait DecodePacket<'a>: Sized {
41    fn decode(
42        buf: &'a [u8],
43        version: MqttVersion,
44    ) -> Result<Self, MqttError<transport::ErrorPlaceHolder>>;
45}
46
47/// An enumeration of all possible MQTT control packets.
48#[derive(Debug)]
49pub enum MqttPacket<'a> {
50    Connect(Connect<'a>),
51    ConnAck(ConnAck<'a>),
52    Publish(Publish<'a>),
53    PubAck(PubAck<'a>),
54    Subscribe(Subscribe<'a>),
55    SubAck(SubAck<'a>),
56    PingReq,
57    PingResp,
58    Disconnect(Disconnect<'a>),
59}
60
61/// Decodes a raw byte buffer into a specific `MqttPacket`.
62pub fn decode<'a, T>(
63    buf: &'a [u8],
64    version: MqttVersion,
65) -> Result<Option<MqttPacket<'a>>, MqttError<T>>
66where
67    T: transport::TransportError,
68{
69    if buf.is_empty() { return Ok(None); }
70
71    let packet_type = buf[0] >> 4;
72    let packet = match packet_type {
73        1 => MqttPacket::Connect(Connect::decode(buf, version).map_err(MqttError::cast_transport_error)?),
74        2 => MqttPacket::ConnAck(ConnAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
75        3 => MqttPacket::Publish(Publish::decode(buf, version).map_err(MqttError::cast_transport_error)?),
76        4 => MqttPacket::PubAck(PubAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
77        8 => MqttPacket::Subscribe(Subscribe::decode(buf, version).map_err(MqttError::cast_transport_error)?),
78        9 => MqttPacket::SubAck(SubAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
79        12 => MqttPacket::PingReq,
80        13 => MqttPacket::PingResp,
81        14 => MqttPacket::Disconnect(Disconnect::decode(buf, version).map_err(MqttError::cast_transport_error)?),
82        _ => return Err(MqttError::Protocol(ProtocolError::InvalidPacketType(packet_type))),
83    };
84
85    Ok(Some(packet))
86}
87
88#[cfg(feature = "v5")]
89#[derive(Debug)]
90pub struct Property<'a> {
91    pub id: u8,
92    pub data: &'a [u8],
93}
94
95// --- CONNECT Packet ---
96#[derive(Debug)]
97pub struct Connect<'a> {
98    pub clean_session: bool,
99    pub keep_alive: u16,
100    pub client_id: &'a str,
101    #[cfg(feature = "v5")]
102    pub properties: Vec<Property<'a>, 8>,
103    #[cfg(not(feature = "v5"))]
104    _phantom: PhantomData<&'a ()>,
105}
106
107impl<'a> Connect<'a> {
108    pub fn new(client_id: &'a str, keep_alive: u16, clean_session: bool) -> Self {
109        Self { client_id, keep_alive, clean_session, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData }
110    }
111}
112
113impl<'a> EncodePacket for Connect<'a> {
114    // ... (implementation as before)
115    fn encode(&self, buf: &mut [u8], version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
116        let mut cursor = 0;
117        buf[cursor] = 0x10; cursor += 1;
118        let remaining_len_pos = cursor;
119        cursor += 4;
120        let content_start = cursor;
121        let protocol_name = if version == MqttVersion::V5 { "MQTT" } else { "MQIsdp" };
122        cursor += write_utf8_string(&mut buf[cursor..], protocol_name)?;
123        buf[cursor] = if version == MqttVersion::V5 { 5 } else { 3 }; cursor += 1;
124        let mut flags = 0;
125        if self.clean_session { flags |= 0x02; }
126        buf[cursor] = flags; cursor += 1;
127        buf[cursor..cursor + 2].copy_from_slice(&self.keep_alive.to_be_bytes()); cursor += 2;
128        #[cfg(feature = "v5")]
129        if version == MqttVersion::V5 { write_properties(&mut cursor, buf, &self.properties)?; }
130        cursor += write_utf8_string(&mut buf[cursor..], self.client_id)?;
131        let remaining_len = cursor - content_start;
132        let len_bytes = util::write_variable_byte_integer_len(&mut buf[remaining_len_pos..], remaining_len)?;
133        let header_len = 1 + len_bytes;
134        buf.copy_within(content_start..cursor, header_len);
135        Ok(header_len + remaining_len)
136    }
137}
138impl<'a> DecodePacket<'a> for Connect<'a> {
139    // ... (implementation as before)
140    fn decode(buf: &'a [u8], version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
141        let mut cursor = 2; cursor += 6;
142        let connect_flags = buf[cursor];
143        let clean_session = (connect_flags & 0x02) != 0; cursor += 1;
144        let keep_alive = u16::from_be_bytes([buf[cursor], buf[cursor + 1]]); cursor += 2;
145        #[cfg(feature = "v5")]
146        let properties = if version == MqttVersion::V5 { read_properties(&mut cursor, buf)? } else { Vec::new() };
147        let client_id = read_utf8_string(&mut cursor, buf)?;
148        Ok(Self { clean_session, keep_alive, client_id, #[cfg(feature = "v5")] properties, #[cfg(not(feature = "v5"))] _phantom: PhantomData })
149    }
150}
151
152// --- CONNACK Packet ---
153#[derive(Debug)]
154pub struct ConnAck<'a> {
155    pub session_present: bool,
156    pub reason_code: u8,
157    #[cfg(feature = "v5")]
158    pub properties: Vec<Property<'a>, 8>,
159    #[cfg(not(feature = "v5"))]
160    _phantom: PhantomData<&'a ()>,
161}
162impl<'a> DecodePacket<'a> for ConnAck<'a> {
163    fn decode(buf: &'a [u8], version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
164        let mut cursor = 2;
165        let session_present = (buf[cursor] & 0x01) != 0; cursor += 1;
166        let reason_code = buf[cursor]; cursor += 1;
167        #[cfg(feature = "v5")]
168        let properties = if version == MqttVersion::V5 { read_properties(&mut cursor, buf)? } else { Vec::new() };
169        Ok(Self { session_present, reason_code, #[cfg(feature = "v5")] properties, #[cfg(not(feature = "v5"))] _phantom: PhantomData })
170    }
171}
172
173// --- PUBLISH Packet ---
174#[derive(Debug)]
175pub struct Publish<'a> {
176    pub topic: &'a str,
177    pub qos: QoS,
178    pub payload: &'a [u8],
179    pub packet_id: Option<u16>,
180    #[cfg(feature = "v5")]
181    pub properties: Vec<Property<'a>, 8>,
182}
183impl<'a> DecodePacket<'a> for Publish<'a> {
184    fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
185        Ok(Publish { topic: "", qos: QoS::AtMostOnce, payload: &[], packet_id: None, #[cfg(feature = "v5")] properties: Vec::new() })
186    }
187}
188impl<'a> EncodePacket for Publish<'a> {
189    fn encode(&self, _buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
190        Ok(0) // Placeholder
191    }
192}
193
194// --- PUBACK Packet ---
195#[derive(Debug)]
196pub struct PubAck<'a> {
197    pub packet_id: u16,
198    #[cfg(feature = "v5")]
199    pub properties: Vec<Property<'a>, 8>,
200    #[cfg(not(feature = "v5"))]
201    _phantom: PhantomData<&'a ()>,
202}
203impl<'a> DecodePacket<'a> for PubAck<'a> {
204    fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
205        Ok(PubAck { packet_id: 0, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
206    }
207}
208
209// --- SUBSCRIBE Packet ---
210#[derive(Debug)]
211pub struct Subscribe<'a> {
212    pub packet_id: u16,
213    pub topics: Vec<(&'a str, QoS), 8>,
214    #[cfg(feature = "v5")]
215    pub properties: Vec<Property<'a>, 8>,
216}
217impl<'a> DecodePacket<'a> for Subscribe<'a> {
218    fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
219        Ok(Subscribe { packet_id: 0, topics: Vec::new(), #[cfg(feature = "v5")] properties: Vec::new() })
220    }
221}
222impl<'a> EncodePacket for Subscribe<'a> {
223    fn encode(&self, _buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
224        Ok(0) // Placeholder
225    }
226}
227
228// --- SUBACK Packet ---
229#[derive(Debug)]
230pub struct SubAck<'a> {
231    pub packet_id: u16,
232    pub reason_codes: Vec<u8, 8>,
233    #[cfg(feature = "v5")]
234    pub properties: Vec<Property<'a>, 8>,
235    #[cfg(not(feature = "v5"))]
236    _phantom: PhantomData<&'a ()>,
237}
238impl<'a> DecodePacket<'a> for SubAck<'a> {
239    fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
240        Ok(SubAck { packet_id: 0, reason_codes: Vec::new(), #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
241    }
242}
243
244// --- PINGREQ Packet ---
245#[derive(Debug)]
246pub struct PingReq;
247impl EncodePacket for PingReq {
248    fn encode(&self, buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
249        if buf.len() < 2 { return Err(MqttError::BufferTooSmall); }
250        buf[0] = 0xC0;
251        buf[1] = 0x00;
252        Ok(2)
253    }
254}
255
256// --- PINGRESP Packet ---
257#[derive(Debug)]
258pub struct PingResp;
259
260// --- DISCONNECT Packet ---
261#[derive(Debug)]
262pub struct Disconnect<'a> {
263    #[cfg(feature = "v5")]
264    pub reason_code: u8,
265    #[cfg(feature = "v5")]
266    pub properties: Vec<Property<'a>, 8>,
267    #[cfg(not(feature = "v5"))]
268    pub _phantom: PhantomData<&'a ()>,
269}
270impl<'a> DecodePacket<'a> for Disconnect<'a> {
271    fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
272        Ok(Disconnect { #[cfg(feature = "v5")] reason_code: 0, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
273    }
274}
275impl<'a> EncodePacket for Disconnect<'a> {
276    fn encode(&self, buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
277        if buf.len() < 2 { return Err(MqttError::BufferTooSmall); }
278        buf[0] = 0xE0;
279        buf[1] = 0x00;
280        Ok(2)
281    }
282}
283