1use crate::client::MqttVersion;
8use crate::error::{MqttError, ProtocolError};
9use crate::transport;
10use crate::util::{
11 self, read_utf8_string, read_variable_byte_integer, write_utf8_string,
12};
13use core::marker::PhantomData;
14use heapless::Vec;
15
16#[cfg(feature = "v5")]
18use crate::util::{read_properties, write_properties};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
22#[cfg_attr(feature = "defmt", derive(defmt::Format))]
23#[repr(u8)]
24pub enum QoS {
25 AtMostOnce = 0,
26 AtLeastOnce = 1,
27 ExactlyOnce = 2,
28}
29
30pub trait EncodePacket {
32 fn encode(
33 &self,
34 buf: &mut [u8],
35 version: MqttVersion,
36 ) -> Result<usize, MqttError<transport::ErrorPlaceHolder>>;
37}
38
39pub trait DecodePacket<'a>: Sized {
41 fn decode(
42 buf: &'a [u8],
43 version: MqttVersion,
44 ) -> Result<Self, MqttError<transport::ErrorPlaceHolder>>;
45}
46
47#[derive(Debug)]
49pub enum MqttPacket<'a> {
50 Connect(Connect<'a>),
51 ConnAck(ConnAck<'a>),
52 Publish(Publish<'a>),
53 PubAck(PubAck<'a>),
54 Subscribe(Subscribe<'a>),
55 SubAck(SubAck<'a>),
56 PingReq,
57 PingResp,
58 Disconnect(Disconnect<'a>),
59}
60
61pub fn decode<'a, T>(
63 buf: &'a [u8],
64 version: MqttVersion,
65) -> Result<Option<MqttPacket<'a>>, MqttError<T>>
66where
67 T: transport::TransportError,
68{
69 if buf.is_empty() { return Ok(None); }
70
71 let packet_type = buf[0] >> 4;
72 let packet = match packet_type {
73 1 => MqttPacket::Connect(Connect::decode(buf, version).map_err(MqttError::cast_transport_error)?),
74 2 => MqttPacket::ConnAck(ConnAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
75 3 => MqttPacket::Publish(Publish::decode(buf, version).map_err(MqttError::cast_transport_error)?),
76 4 => MqttPacket::PubAck(PubAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
77 8 => MqttPacket::Subscribe(Subscribe::decode(buf, version).map_err(MqttError::cast_transport_error)?),
78 9 => MqttPacket::SubAck(SubAck::decode(buf, version).map_err(MqttError::cast_transport_error)?),
79 12 => MqttPacket::PingReq,
80 13 => MqttPacket::PingResp,
81 14 => MqttPacket::Disconnect(Disconnect::decode(buf, version).map_err(MqttError::cast_transport_error)?),
82 _ => return Err(MqttError::Protocol(ProtocolError::InvalidPacketType(packet_type))),
83 };
84
85 Ok(Some(packet))
86}
87
88#[cfg(feature = "v5")]
89#[derive(Debug)]
90pub struct Property<'a> {
91 pub id: u8,
92 pub data: &'a [u8],
93}
94
95#[derive(Debug)]
97pub struct Connect<'a> {
98 pub clean_session: bool,
99 pub keep_alive: u16,
100 pub client_id: &'a str,
101 #[cfg(feature = "v5")]
102 pub properties: Vec<Property<'a>, 8>,
103 #[cfg(not(feature = "v5"))]
104 _phantom: PhantomData<&'a ()>,
105}
106
107impl<'a> Connect<'a> {
108 pub fn new(client_id: &'a str, keep_alive: u16, clean_session: bool) -> Self {
109 Self { client_id, keep_alive, clean_session, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData }
110 }
111}
112
113impl<'a> EncodePacket for Connect<'a> {
114 fn encode(&self, buf: &mut [u8], version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
116 let mut cursor = 0;
117 buf[cursor] = 0x10; cursor += 1;
118 let remaining_len_pos = cursor;
119 cursor += 4;
120 let content_start = cursor;
121 let protocol_name = if version == MqttVersion::V5 { "MQTT" } else { "MQIsdp" };
122 cursor += write_utf8_string(&mut buf[cursor..], protocol_name)?;
123 buf[cursor] = if version == MqttVersion::V5 { 5 } else { 3 }; cursor += 1;
124 let mut flags = 0;
125 if self.clean_session { flags |= 0x02; }
126 buf[cursor] = flags; cursor += 1;
127 buf[cursor..cursor + 2].copy_from_slice(&self.keep_alive.to_be_bytes()); cursor += 2;
128 #[cfg(feature = "v5")]
129 if version == MqttVersion::V5 { write_properties(&mut cursor, buf, &self.properties)?; }
130 cursor += write_utf8_string(&mut buf[cursor..], self.client_id)?;
131 let remaining_len = cursor - content_start;
132 let len_bytes = util::write_variable_byte_integer_len(&mut buf[remaining_len_pos..], remaining_len)?;
133 let header_len = 1 + len_bytes;
134 buf.copy_within(content_start..cursor, header_len);
135 Ok(header_len + remaining_len)
136 }
137}
138impl<'a> DecodePacket<'a> for Connect<'a> {
139 fn decode(buf: &'a [u8], version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
141 let mut cursor = 2; cursor += 6;
142 let connect_flags = buf[cursor];
143 let clean_session = (connect_flags & 0x02) != 0; cursor += 1;
144 let keep_alive = u16::from_be_bytes([buf[cursor], buf[cursor + 1]]); cursor += 2;
145 #[cfg(feature = "v5")]
146 let properties = if version == MqttVersion::V5 { read_properties(&mut cursor, buf)? } else { Vec::new() };
147 let client_id = read_utf8_string(&mut cursor, buf)?;
148 Ok(Self { clean_session, keep_alive, client_id, #[cfg(feature = "v5")] properties, #[cfg(not(feature = "v5"))] _phantom: PhantomData })
149 }
150}
151
152#[derive(Debug)]
154pub struct ConnAck<'a> {
155 pub session_present: bool,
156 pub reason_code: u8,
157 #[cfg(feature = "v5")]
158 pub properties: Vec<Property<'a>, 8>,
159 #[cfg(not(feature = "v5"))]
160 _phantom: PhantomData<&'a ()>,
161}
162impl<'a> DecodePacket<'a> for ConnAck<'a> {
163 fn decode(buf: &'a [u8], version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
164 let mut cursor = 2;
165 let session_present = (buf[cursor] & 0x01) != 0; cursor += 1;
166 let reason_code = buf[cursor]; cursor += 1;
167 #[cfg(feature = "v5")]
168 let properties = if version == MqttVersion::V5 { read_properties(&mut cursor, buf)? } else { Vec::new() };
169 Ok(Self { session_present, reason_code, #[cfg(feature = "v5")] properties, #[cfg(not(feature = "v5"))] _phantom: PhantomData })
170 }
171}
172
173#[derive(Debug)]
175pub struct Publish<'a> {
176 pub topic: &'a str,
177 pub qos: QoS,
178 pub payload: &'a [u8],
179 pub packet_id: Option<u16>,
180 #[cfg(feature = "v5")]
181 pub properties: Vec<Property<'a>, 8>,
182}
183impl<'a> DecodePacket<'a> for Publish<'a> {
184 fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
185 Ok(Publish { topic: "", qos: QoS::AtMostOnce, payload: &[], packet_id: None, #[cfg(feature = "v5")] properties: Vec::new() })
186 }
187}
188impl<'a> EncodePacket for Publish<'a> {
189 fn encode(&self, _buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
190 Ok(0) }
192}
193
194#[derive(Debug)]
196pub struct PubAck<'a> {
197 pub packet_id: u16,
198 #[cfg(feature = "v5")]
199 pub properties: Vec<Property<'a>, 8>,
200 #[cfg(not(feature = "v5"))]
201 _phantom: PhantomData<&'a ()>,
202}
203impl<'a> DecodePacket<'a> for PubAck<'a> {
204 fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
205 Ok(PubAck { packet_id: 0, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
206 }
207}
208
209#[derive(Debug)]
211pub struct Subscribe<'a> {
212 pub packet_id: u16,
213 pub topics: Vec<(&'a str, QoS), 8>,
214 #[cfg(feature = "v5")]
215 pub properties: Vec<Property<'a>, 8>,
216}
217impl<'a> DecodePacket<'a> for Subscribe<'a> {
218 fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
219 Ok(Subscribe { packet_id: 0, topics: Vec::new(), #[cfg(feature = "v5")] properties: Vec::new() })
220 }
221}
222impl<'a> EncodePacket for Subscribe<'a> {
223 fn encode(&self, _buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
224 Ok(0) }
226}
227
228#[derive(Debug)]
230pub struct SubAck<'a> {
231 pub packet_id: u16,
232 pub reason_codes: Vec<u8, 8>,
233 #[cfg(feature = "v5")]
234 pub properties: Vec<Property<'a>, 8>,
235 #[cfg(not(feature = "v5"))]
236 _phantom: PhantomData<&'a ()>,
237}
238impl<'a> DecodePacket<'a> for SubAck<'a> {
239 fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
240 Ok(SubAck { packet_id: 0, reason_codes: Vec::new(), #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
241 }
242}
243
244#[derive(Debug)]
246pub struct PingReq;
247impl EncodePacket for PingReq {
248 fn encode(&self, buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
249 if buf.len() < 2 { return Err(MqttError::BufferTooSmall); }
250 buf[0] = 0xC0;
251 buf[1] = 0x00;
252 Ok(2)
253 }
254}
255
256#[derive(Debug)]
258pub struct PingResp;
259
260#[derive(Debug)]
262pub struct Disconnect<'a> {
263 #[cfg(feature = "v5")]
264 pub reason_code: u8,
265 #[cfg(feature = "v5")]
266 pub properties: Vec<Property<'a>, 8>,
267 #[cfg(not(feature = "v5"))]
268 pub _phantom: PhantomData<&'a ()>,
269}
270impl<'a> DecodePacket<'a> for Disconnect<'a> {
271 fn decode(_buf: &'a [u8], _version: MqttVersion) -> Result<Self, MqttError<transport::ErrorPlaceHolder>> {
272 Ok(Disconnect { #[cfg(feature = "v5")] reason_code: 0, #[cfg(feature = "v5")] properties: Vec::new(), #[cfg(not(feature = "v5"))] _phantom: PhantomData })
273 }
274}
275impl<'a> EncodePacket for Disconnect<'a> {
276 fn encode(&self, buf: &mut [u8], _version: MqttVersion) -> Result<usize, MqttError<transport::ErrorPlaceHolder>> {
277 if buf.len() < 2 { return Err(MqttError::BufferTooSmall); }
278 buf[0] = 0xE0;
279 buf[1] = 0x00;
280 Ok(2)
281 }
282}
283