mqtt_proto/v3/
packet.rs

1use futures_lite::future::block_on;
2use std::convert::AsRef;
3use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
4
5use super::{Connack, Connect, Publish, Suback, Subscribe, Unsubscribe};
6use crate::{
7    decode_raw_header, encode_packet, packet_from, read_u16, total_len, Encodable, Error, Pid, QoS,
8    QosPid, VarBytes,
9};
10
11/// MQTT v3.x packet types.
12#[derive(Debug, Clone, PartialEq, Eq)]
13#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
14pub enum Packet {
15    /// [MQTT 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028)
16    Connect(Connect),
17    /// [MQTT 3.2](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033)
18    Connack(Connack),
19    /// [MQTT 3.3](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037)
20    Publish(Publish),
21    /// [MQTT 3.4](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043)
22    Puback(Pid),
23    /// [MQTT 3.5](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048)
24    Pubrec(Pid),
25    /// [MQTT 3.6](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053)
26    Pubrel(Pid),
27    /// [MQTT 3.7](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058)
28    Pubcomp(Pid),
29    /// [MQTT 3.8](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063)
30    Subscribe(Subscribe),
31    /// [MQTT 3.9](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068)
32    Suback(Suback),
33    /// [MQTT 3.10](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072)
34    Unsubscribe(Unsubscribe),
35    /// [MQTT 3.11](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077)
36    Unsuback(Pid),
37    /// [MQTT 3.12](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081)
38    Pingreq,
39    /// [MQTT 3.13](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718086)
40    Pingresp,
41    /// [MQTT 3.14](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090)
42    Disconnect,
43}
44
45impl Packet {
46    /// Return the packet type variant.
47    ///
48    /// This can be used for matching, categorising, debuging, etc. Most users
49    /// will match directly on `Packet` instead.
50    pub fn get_type(&self) -> PacketType {
51        match self {
52            Packet::Pingreq => PacketType::Pingreq,
53            Packet::Pingresp => PacketType::Pingresp,
54            Packet::Connect(_) => PacketType::Connect,
55            Packet::Connack(_) => PacketType::Connack,
56            Packet::Publish(_) => PacketType::Publish,
57            Packet::Puback(_) => PacketType::Puback,
58            Packet::Pubrec(_) => PacketType::Pubrec,
59            Packet::Pubrel(_) => PacketType::Pubrel,
60            Packet::Pubcomp(_) => PacketType::Pubcomp,
61            Packet::Subscribe(_) => PacketType::Subscribe,
62            Packet::Suback(_) => PacketType::Suback,
63            Packet::Unsubscribe(_) => PacketType::Unsubscribe,
64            Packet::Unsuback(_) => PacketType::Unsuback,
65            Packet::Disconnect => PacketType::Disconnect,
66        }
67    }
68
69    /// Asynchronously decode a packet from an async reader.
70    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
71        let header = Header::decode_async(reader).await?;
72        Ok(match header.typ {
73            PacketType::Pingreq => Packet::Pingreq,
74            PacketType::Pingresp => Packet::Pingresp,
75            PacketType::Disconnect => Packet::Disconnect,
76
77            PacketType::Connect => Connect::decode_async(reader).await?.into(),
78            PacketType::Connack => Connack::decode_async(reader).await?.into(),
79            PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
80            PacketType::Puback => Packet::Puback(Pid::try_from(read_u16(reader).await?)?),
81            PacketType::Pubrec => Packet::Pubrec(Pid::try_from(read_u16(reader).await?)?),
82            PacketType::Pubrel => Packet::Pubrel(Pid::try_from(read_u16(reader).await?)?),
83            PacketType::Pubcomp => Packet::Pubcomp(Pid::try_from(read_u16(reader).await?)?),
84            PacketType::Subscribe => Subscribe::decode_async(reader, header.remaining_len as usize)
85                .await?
86                .into(),
87            PacketType::Suback => Suback::decode_async(reader, header.remaining_len as usize)
88                .await?
89                .into(),
90            PacketType::Unsubscribe => {
91                Unsubscribe::decode_async(reader, header.remaining_len as usize)
92                    .await?
93                    .into()
94            }
95            PacketType::Unsuback => Packet::Unsuback(Pid::try_from(read_u16(reader).await?)?),
96        })
97    }
98
99    /// Asynchronously encode the packet to an async writer.
100    pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), Error> {
101        let data = self.encode()?;
102        writer.write_all(data.as_ref()).await?;
103        Ok(())
104    }
105
106    /// Decode a packet from some bytes. If not enough bytes to decode a packet,
107    /// it will return `Ok(None)`.
108    pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, Error> {
109        match block_on(Self::decode_async(&mut bytes)) {
110            Ok(pkt) => Ok(Some(pkt)),
111            Err(err) => {
112                if err.is_eof() {
113                    Ok(None)
114                } else {
115                    Err(err)
116                }
117            }
118        }
119    }
120
121    /// Encode the packet to a dynamic vector or fixed array.
122    pub fn encode(&self) -> Result<VarBytes, Error> {
123        const VOID_PACKET_REMAINING_LEN: u8 = 0;
124        let data = match self {
125            Packet::Pingreq => {
126                const CONTROL_BYTE: u8 = 0b11000000;
127                VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
128            }
129            Packet::Pingresp => {
130                const CONTROL_BYTE: u8 = 0b11010000;
131                VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
132            }
133            Packet::Connect(connect) => {
134                const CONTROL_BYTE: u8 = 0b00010000;
135                VarBytes::Dynamic(encode_packet(CONTROL_BYTE, connect)?)
136            }
137            Packet::Connack(connack) => {
138                const CONTROL_BYTE: u8 = 0b00100000;
139                const REMAINING_LEN: u8 = 2;
140                let flags: u8 = connack.session_present.into();
141                let rc: u8 = connack.code as u8;
142                VarBytes::Fixed4([CONTROL_BYTE, REMAINING_LEN, flags, rc])
143            }
144            Packet::Publish(publish) => {
145                let mut control_byte: u8 = match publish.qos_pid {
146                    QosPid::Level0 => 0b00110000,
147                    QosPid::Level1(_) => 0b00110010,
148                    QosPid::Level2(_) => 0b00110100,
149                };
150                if publish.dup {
151                    control_byte |= 0b00001000;
152                }
153                if publish.retain {
154                    control_byte |= 0b00000001;
155                }
156                VarBytes::Dynamic(encode_packet(control_byte, publish)?)
157            }
158            Packet::Puback(pid) => {
159                const CONTROL_BYTE: u8 = 0b01000000;
160                VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
161            }
162            Packet::Pubrec(pid) => {
163                const CONTROL_BYTE: u8 = 0b01010000;
164                VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
165            }
166            Packet::Pubrel(pid) => {
167                const CONTROL_BYTE: u8 = 0b01100010;
168                VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
169            }
170            Packet::Pubcomp(pid) => {
171                const CONTROL_BYTE: u8 = 0b01110000;
172                VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
173            }
174            Packet::Subscribe(subscribe) => {
175                const CONTROL_BYTE: u8 = 0b10000010;
176                VarBytes::Dynamic(encode_packet(CONTROL_BYTE, subscribe)?)
177            }
178            Packet::Suback(suback) => {
179                const CONTROL_BYTE: u8 = 0b10010000;
180                VarBytes::Dynamic(encode_packet(CONTROL_BYTE, suback)?)
181            }
182            Packet::Unsubscribe(unsubscribe) => {
183                const CONTROL_BYTE: u8 = 0b10100010;
184                VarBytes::Dynamic(encode_packet(CONTROL_BYTE, unsubscribe)?)
185            }
186            Packet::Unsuback(pid) => {
187                const CONTROL_BYTE: u8 = 0b10110000;
188                VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
189            }
190            Packet::Disconnect => {
191                const CONTROL_BYTE: u8 = 0b11100000;
192                VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
193            }
194        };
195        Ok(data)
196    }
197
198    /// Return the total length of bytes the packet encoded into.
199    pub fn encode_len(&self) -> Result<usize, Error> {
200        let remaining_len = match self {
201            Packet::Pingreq => return Ok(2),
202            Packet::Pingresp => return Ok(2),
203            Packet::Disconnect => return Ok(2),
204            Packet::Connack(_) => return Ok(4),
205            Packet::Puback(_) => return Ok(4),
206            Packet::Pubrec(_) => return Ok(4),
207            Packet::Pubrel(_) => return Ok(4),
208            Packet::Pubcomp(_) => return Ok(4),
209            Packet::Unsuback(_) => return Ok(4),
210            Packet::Connect(inner) => inner.encode_len(),
211            Packet::Publish(inner) => inner.encode_len(),
212            Packet::Subscribe(inner) => inner.encode_len(),
213            Packet::Suback(inner) => inner.encode_len(),
214            Packet::Unsubscribe(inner) => inner.encode_len(),
215        };
216        total_len(remaining_len)
217    }
218}
219
220/// MQTT v3.x packet type variant, without the associated data.
221#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
222pub enum PacketType {
223    Connect,
224    Connack,
225    Publish,
226    Puback,
227    Pubrec,
228    Pubrel,
229    Pubcomp,
230    Subscribe,
231    Suback,
232    Unsubscribe,
233    Unsuback,
234    Pingreq,
235    Pingresp,
236    Disconnect,
237}
238
239/// Fixed header type.
240#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
241pub struct Header {
242    pub typ: PacketType,
243    pub dup: bool,
244    pub qos: QoS,
245    pub retain: bool,
246    pub remaining_len: u32,
247}
248
249impl Header {
250    pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
251        Self {
252            typ,
253            dup,
254            qos,
255            retain,
256            remaining_len,
257        }
258    }
259
260    pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, Error> {
261        const FLAGS_MASK: u8 = 0b1111;
262        let (typ, flags_ok) = match hd >> 4 {
263            1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
264            2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
265            3 => {
266                return Ok(Header {
267                    typ: PacketType::Publish,
268                    dup: hd & 0b1000 != 0,
269                    qos: QoS::from_u8((hd & 0b110) >> 1)?,
270                    retain: hd & 1 == 1,
271                    remaining_len,
272                });
273            }
274            4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
275            5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
276            6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
277            7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
278            8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
279            9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
280            10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
281            11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
282            12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
283            13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
284            14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
285            _ => return Err(Error::InvalidHeader),
286        };
287        if !flags_ok {
288            return Err(Error::InvalidHeader);
289        }
290        Ok(Header {
291            typ,
292            dup: false,
293            qos: QoS::Level0,
294            retain: false,
295            remaining_len,
296        })
297    }
298
299    pub fn decode(mut reader: &[u8]) -> Result<Self, Error> {
300        block_on(Self::decode_async(&mut reader))
301    }
302
303    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
304        let (typ, remaining_len) = decode_raw_header(reader).await?;
305        Header::new_with(typ, remaining_len)
306    }
307}
308
309#[inline]
310fn encode_with_pid(control_byte: u8, pid: Pid) -> [u8; 4] {
311    const REMAINING_LEN: u8 = 2;
312    let val = pid.value();
313    [
314        control_byte,
315        REMAINING_LEN,
316        (val >> 8) as u8,
317        (val & 0xFF) as u8,
318    ]
319}
320
321packet_from!(Connect, Publish, Suback, Connack, Subscribe, Unsubscribe);