mqtt_proto/v5/
packet.rs

1use std::convert::AsRef;
2use std::fmt;
3use std::io;
4
5use futures_lite::future::block_on;
6use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
7
8use super::{
9    Auth, Connack, Connect, Disconnect, ErrorV5, Puback, Pubcomp, Publish, Pubrec, Pubrel, Suback,
10    Subscribe, Unsuback, Unsubscribe,
11};
12use crate::{
13    decode_raw_header, encode_packet, packet_from, total_len, Encodable, Error, QoS, QosPid,
14    VarBytes,
15};
16
17/// MQTT v5.0 packet types.
18#[derive(Debug, Clone, PartialEq, Eq)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20pub enum Packet {
21    /// [MQTT 3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033)
22    Connect(Connect),
23    /// [MQTT 3.2](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901074)
24    Connack(Connack),
25    /// [MQTT 3.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100)
26    Publish(Publish),
27    /// [MQTT 3.4](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121)
28    Puback(Puback),
29    /// [MQTT 3.5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901131)
30    Pubrec(Pubrec),
31    /// [MQTT 3.6](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901141)
32    Pubrel(Pubrel),
33    /// [MQTT 3.7](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901151)
34    Pubcomp(Pubcomp),
35    /// [MQTT 3.8](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901161)
36    Subscribe(Subscribe),
37    /// [MQTT 3.9](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901171)
38    Suback(Suback),
39    /// [MQTT 3.10](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901179)
40    Unsubscribe(Unsubscribe),
41    /// [MQTT 3.11](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901187)
42    Unsuback(Unsuback),
43    /// [MQTT 3.12](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901195)
44    Pingreq,
45    /// [MQTT 3.13](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901200)
46    Pingresp,
47    /// [MQTT 3.14](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205)
48    Disconnect(Disconnect),
49    /// [MQTT 3.15](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217)
50    Auth(Auth),
51}
52
53impl Packet {
54    /// Return the packet type variant.
55    ///
56    /// This can be used for matching, categorising, debuging, etc. Most users
57    /// will match directly on `Packet` instead.
58    pub fn get_type(&self) -> PacketType {
59        match self {
60            Packet::Pingreq => PacketType::Pingreq,
61            Packet::Pingresp => PacketType::Pingresp,
62            Packet::Connect(_) => PacketType::Connect,
63            Packet::Connack(_) => PacketType::Connack,
64            Packet::Publish(_) => PacketType::Publish,
65            Packet::Puback(_) => PacketType::Puback,
66            Packet::Pubrec(_) => PacketType::Pubrec,
67            Packet::Pubrel(_) => PacketType::Pubrel,
68            Packet::Pubcomp(_) => PacketType::Pubcomp,
69            Packet::Subscribe(_) => PacketType::Subscribe,
70            Packet::Suback(_) => PacketType::Suback,
71            Packet::Unsubscribe(_) => PacketType::Unsubscribe,
72            Packet::Unsuback(_) => PacketType::Unsuback,
73            Packet::Disconnect(_) => PacketType::Disconnect,
74            Packet::Auth(_) => PacketType::Auth,
75        }
76    }
77
78    /// Asynchronously decode a packet from an async reader.
79    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
80        let header = Header::decode_async(reader).await?;
81        Ok(match header.typ {
82            PacketType::Pingreq => Packet::Pingreq,
83            PacketType::Pingresp => Packet::Pingresp,
84            PacketType::Connect => Connect::decode_async(reader, header).await?.into(),
85            PacketType::Connack => Connack::decode_async(reader, header).await?.into(),
86            PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
87            PacketType::Puback => Puback::decode_async(reader, header).await?.into(),
88            PacketType::Pubrec => Pubrec::decode_async(reader, header).await?.into(),
89            PacketType::Pubrel => Pubrel::decode_async(reader, header).await?.into(),
90            PacketType::Pubcomp => Pubcomp::decode_async(reader, header).await?.into(),
91            PacketType::Subscribe => Subscribe::decode_async(reader, header).await?.into(),
92            PacketType::Suback => Suback::decode_async(reader, header).await?.into(),
93            PacketType::Unsubscribe => Unsubscribe::decode_async(reader, header).await?.into(),
94            PacketType::Unsuback => Unsuback::decode_async(reader, header).await?.into(),
95            PacketType::Disconnect => Disconnect::decode_async(reader, header).await?.into(),
96            PacketType::Auth => Auth::decode_async(reader, header).await?.into(),
97        })
98    }
99
100    /// Asynchronously encode the packet to an async writer.
101    pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), ErrorV5> {
102        let data = self.encode()?;
103        writer
104            .write_all(data.as_ref())
105            .await
106            .map_err(|err| Error::IoError(err.kind(), err.to_string()))?;
107        Ok(())
108    }
109
110    /// Decode a packet from some bytes. If not enough bytes to decode a packet,
111    /// it will return `Ok(None)`.
112    pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, ErrorV5> {
113        match block_on(Self::decode_async(&mut bytes)) {
114            Ok(pkt) => Ok(Some(pkt)),
115            Err(ErrorV5::Common(Error::IoError(kind, info))) => {
116                if kind == io::ErrorKind::UnexpectedEof {
117                    Ok(None)
118                } else {
119                    Err(Error::IoError(kind, info).into())
120                }
121            }
122            Err(err) => Err(err),
123        }
124    }
125
126    /// Encode the packet to a dynamic vector or fixed array.
127    pub fn encode(&self) -> Result<VarBytes, Error> {
128        const VOID_PACKET_REMAINING_LEN: u8 = 0;
129        let data = match self {
130            Packet::Pingreq => {
131                const CONTROL_BYTE: u8 = 0b11000000;
132                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
133            }
134            Packet::Pingresp => {
135                const CONTROL_BYTE: u8 = 0b11010000;
136                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
137            }
138            Packet::Publish(publish) => {
139                let mut control_byte: u8 = match publish.qos_pid {
140                    QosPid::Level0 => 0b00110000,
141                    QosPid::Level1(_) => 0b00110010,
142                    QosPid::Level2(_) => 0b00110100,
143                };
144                if publish.dup {
145                    control_byte |= 0b00001000;
146                }
147                if publish.retain {
148                    control_byte |= 0b00000001;
149                }
150                encode_packet(control_byte, publish)?
151            }
152            Packet::Connect(inner) => {
153                const CONTROL_BYTE: u8 = 0b00010000;
154                encode_packet(CONTROL_BYTE, inner)?
155            }
156            Packet::Connack(inner) => {
157                const CONTROL_BYTE: u8 = 0b00100000;
158                encode_packet(CONTROL_BYTE, inner)?
159            }
160            Packet::Puback(inner) => {
161                const CONTROL_BYTE: u8 = 0b01000000;
162                encode_packet(CONTROL_BYTE, inner)?
163            }
164            Packet::Pubrec(inner) => {
165                const CONTROL_BYTE: u8 = 0b01010000;
166                encode_packet(CONTROL_BYTE, inner)?
167            }
168            Packet::Pubrel(inner) => {
169                const CONTROL_BYTE: u8 = 0b01100010;
170                encode_packet(CONTROL_BYTE, inner)?
171            }
172            Packet::Pubcomp(inner) => {
173                const CONTROL_BYTE: u8 = 0b01110000;
174                encode_packet(CONTROL_BYTE, inner)?
175            }
176            Packet::Subscribe(inner) => {
177                const CONTROL_BYTE: u8 = 0b10000010;
178                encode_packet(CONTROL_BYTE, inner)?
179            }
180            Packet::Suback(inner) => {
181                const CONTROL_BYTE: u8 = 0b10010000;
182                encode_packet(CONTROL_BYTE, inner)?
183            }
184            Packet::Unsubscribe(inner) => {
185                const CONTROL_BYTE: u8 = 0b10100010;
186                encode_packet(CONTROL_BYTE, inner)?
187            }
188            Packet::Unsuback(inner) => {
189                const CONTROL_BYTE: u8 = 0b10110000;
190                encode_packet(CONTROL_BYTE, inner)?
191            }
192            Packet::Disconnect(inner) => {
193                const CONTROL_BYTE: u8 = 0b11100000;
194                encode_packet(CONTROL_BYTE, inner)?
195            }
196            Packet::Auth(inner) => {
197                const CONTROL_BYTE: u8 = 0b11110000;
198                encode_packet(CONTROL_BYTE, inner)?
199            }
200        };
201        Ok(VarBytes::Dynamic(data))
202    }
203
204    /// Return the total length of bytes the packet encoded into.
205    pub fn encode_len(&self) -> Result<usize, ErrorV5> {
206        let remaining_len = match self {
207            Packet::Pingreq => return Ok(2),
208            Packet::Pingresp => return Ok(2),
209            Packet::Connect(inner) => inner.encode_len(),
210            Packet::Connack(inner) => inner.encode_len(),
211            Packet::Publish(inner) => inner.encode_len(),
212            Packet::Puback(inner) => inner.encode_len(),
213            Packet::Pubrec(inner) => inner.encode_len(),
214            Packet::Pubrel(inner) => inner.encode_len(),
215            Packet::Pubcomp(inner) => inner.encode_len(),
216            Packet::Subscribe(inner) => inner.encode_len(),
217            Packet::Suback(inner) => inner.encode_len(),
218            Packet::Unsubscribe(inner) => inner.encode_len(),
219            Packet::Unsuback(inner) => inner.encode_len(),
220            Packet::Disconnect(inner) => inner.encode_len(),
221            Packet::Auth(inner) => inner.encode_len(),
222        };
223        Ok(total_len(remaining_len)?)
224    }
225}
226
227/// MQTT v5.0 packet type variant, without the associated data.
228#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
229pub enum PacketType {
230    Connect,
231    Connack,
232    Publish,
233    Puback,
234    Pubrec,
235    Pubrel,
236    Pubcomp,
237    Subscribe,
238    Suback,
239    Unsubscribe,
240    Unsuback,
241    Pingreq,
242    Pingresp,
243    Disconnect,
244    Auth,
245}
246
247impl fmt::Display for PacketType {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        write!(f, "{self:?}")
250    }
251}
252
253/// Fixed header type.
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
255pub struct Header {
256    pub typ: PacketType,
257    pub dup: bool,
258    pub qos: QoS,
259    pub retain: bool,
260    pub remaining_len: u32,
261}
262
263impl Header {
264    pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
265        Self {
266            typ,
267            dup,
268            qos,
269            retain,
270            remaining_len,
271        }
272    }
273
274    pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, ErrorV5> {
275        const FLAGS_MASK: u8 = 0b1111;
276        let (typ, flags_ok) = match hd >> 4 {
277            1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
278            2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
279            3 => {
280                return Ok(Header {
281                    typ: PacketType::Publish,
282                    dup: hd & 0b1000 != 0,
283                    qos: QoS::from_u8((hd & 0b110) >> 1)?,
284                    retain: hd & 1 == 1,
285                    remaining_len,
286                });
287            }
288            4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
289            5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
290            6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
291            7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
292            8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
293            9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
294            10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
295            11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
296            12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
297            13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
298            14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
299            15 => (PacketType::Auth, hd & FLAGS_MASK == 0),
300            _ => return Err(Error::InvalidHeader.into()),
301        };
302        if !flags_ok {
303            return Err(Error::InvalidHeader.into());
304        }
305        Ok(Header {
306            typ,
307            dup: false,
308            qos: QoS::Level0,
309            retain: false,
310            remaining_len,
311        })
312    }
313
314    pub fn decode(mut reader: &[u8]) -> Result<Self, ErrorV5> {
315        block_on(Self::decode_async(&mut reader))
316    }
317
318    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
319        let (typ, remaining_len) = decode_raw_header(reader).await?;
320        Header::new_with(typ, remaining_len)
321    }
322}
323
324packet_from!(
325    Connect,
326    Connack,
327    Publish,
328    Puback,
329    Pubrec,
330    Pubrel,
331    Pubcomp,
332    Subscribe,
333    Suback,
334    Unsubscribe,
335    Unsuback,
336    Disconnect,
337    Auth
338);