sage_mqtt/
packet.rs

1use crate::{
2    codec, Auth, ConnAck, Connect, Disconnect, PacketType, PingReq, PingResp, PubAck, PubComp,
3    PubRec, PubRel, Publish, ReasonCode::ProtocolError, Result as SageResult, SubAck, Subscribe,
4    UnSubAck, UnSubscribe,
5};
6use std::{fmt, marker::Unpin};
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
8
9#[derive(Debug)]
10struct FixedHeader {
11    pub packet_type: PacketType,
12    pub remaining_size: usize,
13}
14
15impl FixedHeader {
16    async fn encode<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
17        let mut n = codec::write_control_packet_type(self.packet_type, writer).await?;
18        n += codec::write_variable_byte_integer(self.remaining_size as u32, writer).await?;
19        Ok(n)
20    }
21
22    async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
23        let packet_type = codec::read_control_packet_type(reader).await?;
24        let remaining_size = codec::read_variable_byte_integer(reader).await? as usize;
25        Ok(FixedHeader {
26            packet_type,
27            remaining_size,
28        })
29    }
30}
31
32/// The standard type to manipulate a AsyncRead/AsyncWrite-able MQTT packet. Each packet
33/// is an enum value with its own type.
34#[derive(Debug, Clone)]
35pub enum Packet {
36    /// CONNECT MQTT packet. Opens a connection request.
37    Connect(Connect),
38
39    /// CONNACK MQTT packet. Aknowledge a connectio request.
40    ConnAck(ConnAck),
41
42    /// PUBLISH MQTT packet. Delivery a message to or from a server.
43    Publish(Publish),
44
45    /// PUBACK MQTT packet. Ackowledge a QoS 1 or QoS 2 message.
46    PubAck(PubAck),
47
48    /// PUBREC MQTT packet. Ackowledge a QoS 2 message.
49    PubRec(PubRec),
50
51    /// PUBREL MQTT packet. Ackowledge a QoS 2 message.
52    PubRel(PubRel),
53
54    /// PUBCOMP MQTT packet. Ackowledge a QoS 2 message.
55    PubComp(PubComp),
56
57    /// SUBSCRIBE MQTT packet. Subscribe a client to topics.
58    Subscribe(Subscribe),
59
60    /// SUBACK MQTT packet. Acknowledge a client SUBSCRIBE packet.
61    SubAck(SubAck),
62
63    /// UNSUBSCRIBE MQTT packet. Unsubscribe a client from topics.
64    UnSubscribe(UnSubscribe),
65
66    /// UNSUBACK MQTT packet. Acknowledge a client UNSUBSCRIBE packet.
67    UnSubAck(UnSubAck),
68
69    /// PINGREQ MQTT packet. Send a ping request.
70    PingReq,
71
72    /// PINGRESP MQTT packet. Respond to a ping request.
73    PingResp,
74
75    /// DISCONNECT MQTT packet. Disconnect a connextion and optionally a session.
76    Disconnect(Disconnect),
77
78    /// AUTH MQTT packet. Performs authentication exchanges between clients and server.
79    Auth(Auth),
80}
81
82impl fmt::Display for Packet {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Packet::Connect(_) => write!(f, "Connect"),
86            Packet::ConnAck(connack) => write!(f, "ConnAck [{:?}]", connack.reason_code),
87            Packet::Publish(_) => write!(f, "Publish"),
88            Packet::PubAck(_) => write!(f, "PubAck"),
89            Packet::PubRec(_) => write!(f, "PubRec"),
90            Packet::PubRel(_) => write!(f, "PubRel"),
91            Packet::PubComp(_) => write!(f, "PubComp"),
92            Packet::Subscribe(_) => write!(f, "Subscribe"),
93            Packet::SubAck(_) => write!(f, "SubAck"),
94            Packet::UnSubscribe(_) => write!(f, "UnSubscribe"),
95            Packet::UnSubAck(_) => write!(f, "UnSubAck"),
96            Packet::PingReq => write!(f, "PingReq"),
97            Packet::PingResp => write!(f, "PingResp"),
98            Packet::Disconnect(disconnect) => {
99                write!(f, "Disconnect [{:?}]", disconnect.reason_code)
100            }
101            Packet::Auth(_) => write!(f, "Auth"),
102        }
103    }
104}
105
106impl From<Connect> for Packet {
107    fn from(control: Connect) -> Self {
108        Packet::Connect(control)
109    }
110}
111impl From<ConnAck> for Packet {
112    fn from(control: ConnAck) -> Self {
113        Packet::ConnAck(control)
114    }
115}
116impl From<Publish> for Packet {
117    fn from(control: Publish) -> Self {
118        Packet::Publish(control)
119    }
120}
121impl From<PubAck> for Packet {
122    fn from(control: PubAck) -> Self {
123        Packet::PubAck(control)
124    }
125}
126impl From<PubRec> for Packet {
127    fn from(control: PubRec) -> Self {
128        Packet::PubRec(control)
129    }
130}
131impl From<PubRel> for Packet {
132    fn from(control: PubRel) -> Self {
133        Packet::PubRel(control)
134    }
135}
136impl From<PubComp> for Packet {
137    fn from(control: PubComp) -> Self {
138        Packet::PubComp(control)
139    }
140}
141impl From<Subscribe> for Packet {
142    fn from(control: Subscribe) -> Self {
143        Packet::Subscribe(control)
144    }
145}
146impl From<SubAck> for Packet {
147    fn from(control: SubAck) -> Self {
148        Packet::SubAck(control)
149    }
150}
151impl From<UnSubscribe> for Packet {
152    fn from(control: UnSubscribe) -> Self {
153        Packet::UnSubscribe(control)
154    }
155}
156impl From<UnSubAck> for Packet {
157    fn from(control: UnSubAck) -> Self {
158        Packet::UnSubAck(control)
159    }
160}
161impl From<PingReq> for Packet {
162    fn from(_: PingReq) -> Self {
163        Packet::PingReq
164    }
165}
166impl From<PingResp> for Packet {
167    fn from(_: PingResp) -> Self {
168        Packet::PingResp
169    }
170}
171impl From<Disconnect> for Packet {
172    fn from(control: Disconnect) -> Self {
173        Packet::Disconnect(control)
174    }
175}
176impl From<Auth> for Packet {
177    fn from(control: Auth) -> Self {
178        Packet::Auth(control)
179    }
180}
181
182impl Packet {
183    /// Write the entire `Packet` to `writer`, returning the number of
184    /// bytes written.
185    /// In case of failure, the operation will return any MQTT-related error, or
186    /// `std::io::Error`.
187    pub async fn encode<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
188        let mut variable_and_payload = Vec::new();
189        let (packet_type, remaining_size) = match self {
190            Packet::Connect(packet) => (
191                PacketType::Connect,
192                packet.write(&mut variable_and_payload).await?,
193            ),
194            Packet::ConnAck(packet) => (
195                PacketType::ConnAck,
196                packet.write(&mut variable_and_payload).await?,
197            ),
198            Packet::PingReq => (PacketType::PingReq, 0),
199            Packet::PingResp => (PacketType::PingResp, 0),
200            Packet::UnSubAck(packet) => (
201                PacketType::UnSubAck,
202                packet.write(&mut variable_and_payload).await?,
203            ),
204            Packet::Auth(packet) => (
205                PacketType::Auth,
206                packet.write(&mut variable_and_payload).await?,
207            ),
208            Packet::PubAck(packet) => (
209                PacketType::PubAck,
210                packet.write(&mut variable_and_payload).await?,
211            ),
212            Packet::UnSubscribe(packet) => (
213                PacketType::UnSubscribe,
214                packet.write(&mut variable_and_payload).await?,
215            ),
216            Packet::PubRec(packet) => (
217                PacketType::PubRec,
218                packet.write(&mut variable_and_payload).await?,
219            ),
220            Packet::Disconnect(packet) => (
221                PacketType::Disconnect,
222                packet.write(&mut variable_and_payload).await?,
223            ),
224            Packet::PubRel(packet) => (
225                PacketType::PubRel,
226                packet.write(&mut variable_and_payload).await?,
227            ),
228            Packet::SubAck(packet) => (
229                PacketType::SubAck,
230                packet.write(&mut variable_and_payload).await?,
231            ),
232            Packet::PubComp(packet) => (
233                PacketType::PubComp,
234                packet.write(&mut variable_and_payload).await?,
235            ),
236            Packet::Subscribe(packet) => (
237                PacketType::Subscribe,
238                packet.write(&mut variable_and_payload).await?,
239            ),
240            Packet::Publish(packet) => (
241                PacketType::Publish {
242                    duplicate: packet.duplicate,
243                    qos: packet.qos,
244                    retain: packet.retain,
245                },
246                packet.write(&mut variable_and_payload).await?,
247            ),
248        };
249
250        let mut fixed_header_buffer = Vec::new();
251
252        let fixed_size = FixedHeader {
253            packet_type,
254            remaining_size,
255        }
256        .encode(&mut fixed_header_buffer)
257        .await?;
258
259        writer.write_all(&fixed_header_buffer).await?;
260        writer.write_all(&variable_and_payload).await?;
261        Ok(fixed_size + remaining_size)
262    }
263
264    /// Read a control packet from `reader`, returning a new `Packet`.
265    /// In case of failure, the operation will return any MQTT-related error, or
266    /// `std::io::Error`.
267    pub async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
268        let fixed_header = FixedHeader::decode(reader).await?;
269
270        let packet = match fixed_header.packet_type {
271            PacketType::Connect => Packet::Connect(Connect::read(reader).await?),
272            PacketType::ConnAck => Packet::ConnAck(ConnAck::read(reader).await?),
273            PacketType::PubAck => {
274                Packet::PubAck(PubAck::read(reader, fixed_header.remaining_size == 2).await?)
275            }
276            PacketType::PubRec => {
277                Packet::PubRec(PubRec::read(reader, fixed_header.remaining_size == 2).await?)
278            }
279            PacketType::PingReq => Packet::PingReq,
280            PacketType::PingResp => Packet::PingResp,
281            PacketType::SubAck => {
282                Packet::SubAck(SubAck::read(reader, fixed_header.remaining_size).await?)
283            }
284            PacketType::UnSubscribe => {
285                Packet::UnSubscribe(UnSubscribe::read(reader, fixed_header.remaining_size).await?)
286            }
287            PacketType::Auth => Packet::Auth(Auth::read(reader).await?),
288            PacketType::PubRel => {
289                Packet::PubRel(PubRel::read(reader, fixed_header.remaining_size == 2).await?)
290            }
291            PacketType::Disconnect => Packet::Disconnect(Disconnect::read(reader).await?),
292            PacketType::PubComp => {
293                Packet::PubComp(PubComp::read(reader, fixed_header.remaining_size == 2).await?)
294            }
295
296            PacketType::Subscribe => {
297                Packet::Subscribe(Subscribe::read(reader, fixed_header.remaining_size).await?)
298            }
299
300            PacketType::UnSubAck => {
301                Packet::UnSubAck(UnSubAck::read(reader, fixed_header.remaining_size).await?)
302            }
303
304            PacketType::Publish {
305                duplicate,
306                qos,
307                retain,
308            } => Packet::Publish(
309                Publish::read(
310                    reader,
311                    duplicate,
312                    qos,
313                    retain,
314                    fixed_header.remaining_size as u64,
315                )
316                .await?,
317            ),
318            _ => return Err(ProtocolError.into()),
319        };
320
321        Ok(packet)
322    }
323}