mqtt_proto/v5/
packet.rs

1use core::convert::AsRef;
2
3use crate::{
4    block_on, decode_raw_header, encode_packet, packet_from, total_len, AsyncRead, AsyncWrite,
5    Encodable, Error, QoS, QosPid, VarBytes,
6};
7
8use super::{
9    Auth, Connack, Connect, Disconnect, ErrorV5, Puback, Pubcomp, Publish, Pubrec, Pubrel, Suback,
10    Subscribe, Unsuback, Unsubscribe,
11};
12
13/// MQTT v5.0 packet types.
14#[derive(Debug, Clone, PartialEq, Eq)]
15#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
16pub enum Packet {
17    /// [MQTT 3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033)
18    Connect(Connect),
19    /// [MQTT 3.2](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901074)
20    Connack(Connack),
21    /// [MQTT 3.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100)
22    Publish(Publish),
23    /// [MQTT 3.4](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121)
24    Puback(Puback),
25    /// [MQTT 3.5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901131)
26    Pubrec(Pubrec),
27    /// [MQTT 3.6](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901141)
28    Pubrel(Pubrel),
29    /// [MQTT 3.7](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901151)
30    Pubcomp(Pubcomp),
31    /// [MQTT 3.8](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901161)
32    Subscribe(Subscribe),
33    /// [MQTT 3.9](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901171)
34    Suback(Suback),
35    /// [MQTT 3.10](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901179)
36    Unsubscribe(Unsubscribe),
37    /// [MQTT 3.11](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901187)
38    Unsuback(Unsuback),
39    /// [MQTT 3.12](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901195)
40    Pingreq,
41    /// [MQTT 3.13](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901200)
42    Pingresp,
43    /// [MQTT 3.14](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205)
44    Disconnect(Disconnect),
45    /// [MQTT 3.15](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217)
46    Auth(Auth),
47}
48
49impl Packet {
50    /// Return the packet type variant.
51    ///
52    /// This can be used for matching, categorising, debuging, etc. Most users
53    /// will match directly on `Packet` instead.
54    pub fn get_type(&self) -> PacketType {
55        match self {
56            Packet::Pingreq => PacketType::Pingreq,
57            Packet::Pingresp => PacketType::Pingresp,
58            Packet::Connect(_) => PacketType::Connect,
59            Packet::Connack(_) => PacketType::Connack,
60            Packet::Publish(_) => PacketType::Publish,
61            Packet::Puback(_) => PacketType::Puback,
62            Packet::Pubrec(_) => PacketType::Pubrec,
63            Packet::Pubrel(_) => PacketType::Pubrel,
64            Packet::Pubcomp(_) => PacketType::Pubcomp,
65            Packet::Subscribe(_) => PacketType::Subscribe,
66            Packet::Suback(_) => PacketType::Suback,
67            Packet::Unsubscribe(_) => PacketType::Unsubscribe,
68            Packet::Unsuback(_) => PacketType::Unsuback,
69            Packet::Disconnect(_) => PacketType::Disconnect,
70            Packet::Auth(_) => PacketType::Auth,
71        }
72    }
73
74    /// Asynchronously decode a packet from an async reader.
75    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
76        let header = Header::decode_async(reader).await?;
77        Ok(match header.typ {
78            PacketType::Pingreq => Packet::Pingreq,
79            PacketType::Pingresp => Packet::Pingresp,
80            PacketType::Connect => Connect::decode_async(reader, header).await?.into(),
81            PacketType::Connack => Connack::decode_async(reader, header).await?.into(),
82            PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
83            PacketType::Puback => Puback::decode_async(reader, header).await?.into(),
84            PacketType::Pubrec => Pubrec::decode_async(reader, header).await?.into(),
85            PacketType::Pubrel => Pubrel::decode_async(reader, header).await?.into(),
86            PacketType::Pubcomp => Pubcomp::decode_async(reader, header).await?.into(),
87            PacketType::Subscribe => Subscribe::decode_async(reader, header).await?.into(),
88            PacketType::Suback => Suback::decode_async(reader, header).await?.into(),
89            PacketType::Unsubscribe => Unsubscribe::decode_async(reader, header).await?.into(),
90            PacketType::Unsuback => Unsuback::decode_async(reader, header).await?.into(),
91            PacketType::Disconnect => Disconnect::decode_async(reader, header).await?.into(),
92            PacketType::Auth => Auth::decode_async(reader, header).await?.into(),
93        })
94    }
95
96    /// Asynchronously encode the packet to an async writer.
97    pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), ErrorV5> {
98        let data = self.encode().map_err(ErrorV5::Common)?;
99        writer.write_all(data.as_ref()).await?;
100        Ok(())
101    }
102
103    /// Decode a packet from some bytes. If not enough bytes to decode a packet,
104    /// it will return `Ok(None)`.
105    pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, ErrorV5> {
106        match block_on(Self::decode_async(&mut bytes)) {
107            Ok(pkt) => Ok(Some(pkt)),
108            Err(err) => {
109                if let ErrorV5::Common(e) = &err {
110                    if e.is_eof() {
111                        return Ok(None);
112                    }
113                }
114                Err(err)
115            }
116        }
117    }
118
119    /// Encode the packet to a dynamic vector or fixed array.
120    pub fn encode(&self) -> Result<VarBytes, Error> {
121        const VOID_PACKET_REMAINING_LEN: u8 = 0;
122        let data = match self {
123            Packet::Pingreq => {
124                const CONTROL_BYTE: u8 = 0b11000000;
125                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
126            }
127            Packet::Pingresp => {
128                const CONTROL_BYTE: u8 = 0b11010000;
129                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
130            }
131            Packet::Publish(publish) => {
132                let mut control_byte: u8 = match publish.qos_pid {
133                    QosPid::Level0 => 0b00110000,
134                    QosPid::Level1(_) => 0b00110010,
135                    QosPid::Level2(_) => 0b00110100,
136                };
137                if publish.dup {
138                    control_byte |= 0b00001000;
139                }
140                if publish.retain {
141                    control_byte |= 0b00000001;
142                }
143                encode_packet(control_byte, publish)?
144            }
145            Packet::Connect(inner) => {
146                const CONTROL_BYTE: u8 = 0b00010000;
147                encode_packet(CONTROL_BYTE, inner)?
148            }
149            Packet::Connack(inner) => {
150                const CONTROL_BYTE: u8 = 0b00100000;
151                encode_packet(CONTROL_BYTE, inner)?
152            }
153            Packet::Puback(inner) => {
154                const CONTROL_BYTE: u8 = 0b01000000;
155                encode_packet(CONTROL_BYTE, inner)?
156            }
157            Packet::Pubrec(inner) => {
158                const CONTROL_BYTE: u8 = 0b01010000;
159                encode_packet(CONTROL_BYTE, inner)?
160            }
161            Packet::Pubrel(inner) => {
162                const CONTROL_BYTE: u8 = 0b01100010;
163                encode_packet(CONTROL_BYTE, inner)?
164            }
165            Packet::Pubcomp(inner) => {
166                const CONTROL_BYTE: u8 = 0b01110000;
167                encode_packet(CONTROL_BYTE, inner)?
168            }
169            Packet::Subscribe(inner) => {
170                const CONTROL_BYTE: u8 = 0b10000010;
171                encode_packet(CONTROL_BYTE, inner)?
172            }
173            Packet::Suback(inner) => {
174                const CONTROL_BYTE: u8 = 0b10010000;
175                encode_packet(CONTROL_BYTE, inner)?
176            }
177            Packet::Unsubscribe(inner) => {
178                const CONTROL_BYTE: u8 = 0b10100010;
179                encode_packet(CONTROL_BYTE, inner)?
180            }
181            Packet::Unsuback(inner) => {
182                const CONTROL_BYTE: u8 = 0b10110000;
183                encode_packet(CONTROL_BYTE, inner)?
184            }
185            Packet::Disconnect(inner) => {
186                const CONTROL_BYTE: u8 = 0b11100000;
187                encode_packet(CONTROL_BYTE, inner)?
188            }
189            Packet::Auth(inner) => {
190                const CONTROL_BYTE: u8 = 0b11110000;
191                encode_packet(CONTROL_BYTE, inner)?
192            }
193        };
194        Ok(VarBytes::Dynamic(data))
195    }
196
197    /// Return the total length of bytes the packet encoded into.
198    pub fn encode_len(&self) -> Result<usize, ErrorV5> {
199        let remaining_len = match self {
200            Packet::Pingreq => return Ok(2),
201            Packet::Pingresp => return Ok(2),
202            Packet::Connect(inner) => inner.encode_len(),
203            Packet::Connack(inner) => inner.encode_len(),
204            Packet::Publish(inner) => inner.encode_len(),
205            Packet::Puback(inner) => inner.encode_len(),
206            Packet::Pubrec(inner) => inner.encode_len(),
207            Packet::Pubrel(inner) => inner.encode_len(),
208            Packet::Pubcomp(inner) => inner.encode_len(),
209            Packet::Subscribe(inner) => inner.encode_len(),
210            Packet::Suback(inner) => inner.encode_len(),
211            Packet::Unsubscribe(inner) => inner.encode_len(),
212            Packet::Unsuback(inner) => inner.encode_len(),
213            Packet::Disconnect(inner) => inner.encode_len(),
214            Packet::Auth(inner) => inner.encode_len(),
215        };
216        Ok(total_len(remaining_len)?)
217    }
218}
219
220/// MQTT v5.0 packet type variant, without the associated data.
221#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
222pub enum PacketType {
223    Connect,
224    Connack,
225    Publish,
226    Puback,
227    Pubrec,
228    Pubrel,
229    Pubcomp,
230    Subscribe,
231    Suback,
232    Unsubscribe,
233    Unsuback,
234    Pingreq,
235    Pingresp,
236    Disconnect,
237    Auth,
238}
239
240impl core::fmt::Display for PacketType {
241    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
242        write!(f, "{self:?}")
243    }
244}
245
246/// Fixed header type.
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub struct Header {
249    pub typ: PacketType,
250    pub dup: bool,
251    pub qos: QoS,
252    pub retain: bool,
253    pub remaining_len: u32,
254}
255
256impl Header {
257    pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
258        Self {
259            typ,
260            dup,
261            qos,
262            retain,
263            remaining_len,
264        }
265    }
266
267    pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, ErrorV5> {
268        const FLAGS_MASK: u8 = 0b1111;
269        let (typ, flags_ok) = match hd >> 4 {
270            1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
271            2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
272            3 => {
273                return Ok(Header {
274                    typ: PacketType::Publish,
275                    dup: hd & 0b1000 != 0,
276                    qos: QoS::from_u8((hd & 0b110) >> 1)?,
277                    retain: hd & 1 == 1,
278                    remaining_len,
279                });
280            }
281            4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
282            5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
283            6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
284            7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
285            8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
286            9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
287            10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
288            11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
289            12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
290            13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
291            14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
292            15 => (PacketType::Auth, hd & FLAGS_MASK == 0),
293            _ => return Err(Error::InvalidHeader.into()),
294        };
295        if !flags_ok {
296            return Err(Error::InvalidHeader.into());
297        }
298        Ok(Header {
299            typ,
300            dup: false,
301            qos: QoS::Level0,
302            retain: false,
303            remaining_len,
304        })
305    }
306
307    pub fn decode(mut reader: &[u8]) -> Result<Self, ErrorV5> {
308        block_on(Self::decode_async(&mut reader))
309    }
310
311    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
312        let (typ, remaining_len) = decode_raw_header(reader).await?;
313        Header::new_with(typ, remaining_len)
314    }
315}
316
317packet_from!(
318    Connect,
319    Connack,
320    Publish,
321    Puback,
322    Pubrec,
323    Pubrel,
324    Pubcomp,
325    Subscribe,
326    Suback,
327    Unsubscribe,
328    Unsuback,
329    Disconnect,
330    Auth
331);