mqtt_proto/v3/
packet.rs

1use core::convert::AsRef;
2
3use crate::{
4    block_on, decode_raw_header, encode_packet, packet_from, read_u16, total_len, AsyncRead,
5    AsyncWrite, Encodable, Error, Pid, QoS, QosPid, VarBytes,
6};
7
8use super::{Connack, Connect, Publish, Suback, Subscribe, Unsubscribe};
9
10/// MQTT v3.x packet types.
11#[derive(Debug, Clone, PartialEq, Eq)]
12#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
13pub enum Packet {
14    /// [MQTT 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028)
15    Connect(Connect),
16    /// [MQTT 3.2](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033)
17    Connack(Connack),
18    /// [MQTT 3.3](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037)
19    Publish(Publish),
20    /// [MQTT 3.4](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043)
21    Puback(Pid),
22    /// [MQTT 3.5](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048)
23    Pubrec(Pid),
24    /// [MQTT 3.6](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053)
25    Pubrel(Pid),
26    /// [MQTT 3.7](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058)
27    Pubcomp(Pid),
28    /// [MQTT 3.8](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063)
29    Subscribe(Subscribe),
30    /// [MQTT 3.9](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068)
31    Suback(Suback),
32    /// [MQTT 3.10](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072)
33    Unsubscribe(Unsubscribe),
34    /// [MQTT 3.11](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077)
35    Unsuback(Pid),
36    /// [MQTT 3.12](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081)
37    Pingreq,
38    /// [MQTT 3.13](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718086)
39    Pingresp,
40    /// [MQTT 3.14](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090)
41    Disconnect,
42}
43
44impl Packet {
45    /// Return the packet type variant.
46    ///
47    /// This can be used for matching, categorising, debuging, etc. Most users
48    /// will match directly on `Packet` instead.
49    pub fn get_type(&self) -> PacketType {
50        match self {
51            Packet::Pingreq => PacketType::Pingreq,
52            Packet::Pingresp => PacketType::Pingresp,
53            Packet::Connect(_) => PacketType::Connect,
54            Packet::Connack(_) => PacketType::Connack,
55            Packet::Publish(_) => PacketType::Publish,
56            Packet::Puback(_) => PacketType::Puback,
57            Packet::Pubrec(_) => PacketType::Pubrec,
58            Packet::Pubrel(_) => PacketType::Pubrel,
59            Packet::Pubcomp(_) => PacketType::Pubcomp,
60            Packet::Subscribe(_) => PacketType::Subscribe,
61            Packet::Suback(_) => PacketType::Suback,
62            Packet::Unsubscribe(_) => PacketType::Unsubscribe,
63            Packet::Unsuback(_) => PacketType::Unsuback,
64            Packet::Disconnect => PacketType::Disconnect,
65        }
66    }
67
68    /// Asynchronously decode a packet from an async reader.
69    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
70        let header = Header::decode_async(reader).await?;
71        Ok(match header.typ {
72            PacketType::Pingreq => Packet::Pingreq,
73            PacketType::Pingresp => Packet::Pingresp,
74            PacketType::Disconnect => Packet::Disconnect,
75
76            PacketType::Connect => Connect::decode_async(reader).await?.into(),
77            PacketType::Connack => Connack::decode_async(reader).await?.into(),
78            PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
79            PacketType::Puback => Packet::Puback(Pid::try_from(read_u16(reader).await?)?),
80            PacketType::Pubrec => Packet::Pubrec(Pid::try_from(read_u16(reader).await?)?),
81            PacketType::Pubrel => Packet::Pubrel(Pid::try_from(read_u16(reader).await?)?),
82            PacketType::Pubcomp => Packet::Pubcomp(Pid::try_from(read_u16(reader).await?)?),
83            PacketType::Subscribe => Subscribe::decode_async(reader, header.remaining_len as usize)
84                .await?
85                .into(),
86            PacketType::Suback => Suback::decode_async(reader, header.remaining_len as usize)
87                .await?
88                .into(),
89            PacketType::Unsubscribe => {
90                Unsubscribe::decode_async(reader, header.remaining_len as usize)
91                    .await?
92                    .into()
93            }
94            PacketType::Unsuback => Packet::Unsuback(Pid::try_from(read_u16(reader).await?)?),
95        })
96    }
97
98    /// Asynchronously encode the packet to an async writer.
99    pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), Error> {
100        let data = self.encode()?;
101        writer.write_all(data.as_ref()).await?;
102        Ok(())
103    }
104
105    /// Decode a packet from some bytes. If not enough bytes to decode a packet,
106    /// it will return `Ok(None)`.
107    pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, Error> {
108        match block_on(Self::decode_async(&mut bytes)) {
109            Ok(pkt) => Ok(Some(pkt)),
110            Err(err) => {
111                if err.is_eof() {
112                    Ok(None)
113                } else {
114                    Err(err)
115                }
116            }
117        }
118    }
119
120    /// Encode the packet to a dynamic vector or fixed array.
121    pub fn encode(&self) -> Result<VarBytes, Error> {
122        const VOID_PACKET_REMAINING_LEN: u8 = 0;
123        let data = match self {
124            Packet::Pingreq => {
125                const CONTROL_BYTE: u8 = 0b11000000;
126                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
127            }
128            Packet::Pingresp => {
129                const CONTROL_BYTE: u8 = 0b11010000;
130                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
131            }
132            Packet::Connect(connect) => {
133                const CONTROL_BYTE: u8 = 0b00010000;
134                encode_packet(CONTROL_BYTE, connect)?
135            }
136            Packet::Connack(connack) => {
137                const CONTROL_BYTE: u8 = 0b00100000;
138                const REMAINING_LEN: u8 = 2;
139                let flags: u8 = connack.session_present.into();
140                let rc: u8 = connack.code as u8;
141                return Ok(VarBytes::Fixed4([CONTROL_BYTE, REMAINING_LEN, flags, rc]));
142            }
143            Packet::Publish(publish) => {
144                let mut control_byte: u8 = match publish.qos_pid {
145                    QosPid::Level0 => 0b00110000,
146                    QosPid::Level1(_) => 0b00110010,
147                    QosPid::Level2(_) => 0b00110100,
148                };
149                if publish.dup {
150                    control_byte |= 0b00001000;
151                }
152                if publish.retain {
153                    control_byte |= 0b00000001;
154                }
155                encode_packet(control_byte, publish)?
156            }
157            Packet::Puback(pid) => {
158                const CONTROL_BYTE: u8 = 0b01000000;
159                return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
160            }
161            Packet::Pubrec(pid) => {
162                const CONTROL_BYTE: u8 = 0b01010000;
163                return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
164            }
165            Packet::Pubrel(pid) => {
166                const CONTROL_BYTE: u8 = 0b01100010;
167                return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
168            }
169            Packet::Pubcomp(pid) => {
170                const CONTROL_BYTE: u8 = 0b01110000;
171                return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
172            }
173            Packet::Subscribe(subscribe) => {
174                const CONTROL_BYTE: u8 = 0b10000010;
175                encode_packet(CONTROL_BYTE, subscribe)?
176            }
177            Packet::Suback(suback) => {
178                const CONTROL_BYTE: u8 = 0b10010000;
179                encode_packet(CONTROL_BYTE, suback)?
180            }
181            Packet::Unsubscribe(unsubscribe) => {
182                const CONTROL_BYTE: u8 = 0b10100010;
183                encode_packet(CONTROL_BYTE, unsubscribe)?
184            }
185            Packet::Unsuback(pid) => {
186                const CONTROL_BYTE: u8 = 0b10110000;
187                return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
188            }
189            Packet::Disconnect => {
190                const CONTROL_BYTE: u8 = 0b11100000;
191                return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
192            }
193        };
194        Ok(VarBytes::Dynamic(data))
195    }
196
197    /// Return the total length of bytes the packet encoded into.
198    pub fn encode_len(&self) -> Result<usize, Error> {
199        let remaining_len = match self {
200            Packet::Pingreq => return Ok(2),
201            Packet::Pingresp => return Ok(2),
202            Packet::Disconnect => return Ok(2),
203            Packet::Connack(_) => return Ok(4),
204            Packet::Puback(_) => return Ok(4),
205            Packet::Pubrec(_) => return Ok(4),
206            Packet::Pubrel(_) => return Ok(4),
207            Packet::Pubcomp(_) => return Ok(4),
208            Packet::Unsuback(_) => return Ok(4),
209            Packet::Connect(inner) => inner.encode_len(),
210            Packet::Publish(inner) => inner.encode_len(),
211            Packet::Subscribe(inner) => inner.encode_len(),
212            Packet::Suback(inner) => inner.encode_len(),
213            Packet::Unsubscribe(inner) => inner.encode_len(),
214        };
215        total_len(remaining_len)
216    }
217}
218
219/// MQTT v3.x packet type variant, without the associated data.
220#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
221pub enum PacketType {
222    Connect,
223    Connack,
224    Publish,
225    Puback,
226    Pubrec,
227    Pubrel,
228    Pubcomp,
229    Subscribe,
230    Suback,
231    Unsubscribe,
232    Unsuback,
233    Pingreq,
234    Pingresp,
235    Disconnect,
236}
237
238/// Fixed header type.
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
240pub struct Header {
241    pub typ: PacketType,
242    pub dup: bool,
243    pub qos: QoS,
244    pub retain: bool,
245    pub remaining_len: u32,
246}
247
248impl Header {
249    pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
250        Self {
251            typ,
252            dup,
253            qos,
254            retain,
255            remaining_len,
256        }
257    }
258
259    pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, Error> {
260        const FLAGS_MASK: u8 = 0b1111;
261        let (typ, flags_ok) = match hd >> 4 {
262            1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
263            2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
264            3 => {
265                return Ok(Header {
266                    typ: PacketType::Publish,
267                    dup: hd & 0b1000 != 0,
268                    qos: QoS::from_u8((hd & 0b110) >> 1)?,
269                    retain: hd & 1 == 1,
270                    remaining_len,
271                });
272            }
273            4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
274            5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
275            6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
276            7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
277            8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
278            9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
279            10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
280            11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
281            12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
282            13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
283            14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
284            _ => return Err(Error::InvalidHeader),
285        };
286        if !flags_ok {
287            return Err(Error::InvalidHeader);
288        }
289        Ok(Header {
290            typ,
291            dup: false,
292            qos: QoS::Level0,
293            retain: false,
294            remaining_len,
295        })
296    }
297
298    pub fn decode(mut reader: &[u8]) -> Result<Self, Error> {
299        block_on(Self::decode_async(&mut reader))
300    }
301
302    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
303        let (typ, remaining_len) = decode_raw_header(reader).await?;
304        Header::new_with(typ, remaining_len)
305    }
306}
307
308#[inline]
309fn encode_with_pid(control_byte: u8, pid: Pid) -> [u8; 4] {
310    const REMAINING_LEN: u8 = 2;
311    let val = pid.value();
312    [
313        control_byte,
314        REMAINING_LEN,
315        (val >> 8) as u8,
316        (val & 0xFF) as u8,
317    ]
318}
319
320packet_from!(Connect, Publish, Suback, Connack, Subscribe, Unsubscribe);