1use std::convert::AsRef;
2use std::fmt;
3use std::io;
4
5use futures_lite::future::block_on;
6use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
7
8use super::{
9 Auth, Connack, Connect, Disconnect, ErrorV5, Puback, Pubcomp, Publish, Pubrec, Pubrel, Suback,
10 Subscribe, Unsuback, Unsubscribe,
11};
12use crate::{
13 decode_raw_header, encode_packet, packet_from, total_len, Encodable, Error, QoS, QosPid,
14 VarBytes,
15};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20pub enum Packet {
21 Connect(Connect),
23 Connack(Connack),
25 Publish(Publish),
27 Puback(Puback),
29 Pubrec(Pubrec),
31 Pubrel(Pubrel),
33 Pubcomp(Pubcomp),
35 Subscribe(Subscribe),
37 Suback(Suback),
39 Unsubscribe(Unsubscribe),
41 Unsuback(Unsuback),
43 Pingreq,
45 Pingresp,
47 Disconnect(Disconnect),
49 Auth(Auth),
51}
52
53impl Packet {
54 pub fn get_type(&self) -> PacketType {
59 match self {
60 Packet::Pingreq => PacketType::Pingreq,
61 Packet::Pingresp => PacketType::Pingresp,
62 Packet::Connect(_) => PacketType::Connect,
63 Packet::Connack(_) => PacketType::Connack,
64 Packet::Publish(_) => PacketType::Publish,
65 Packet::Puback(_) => PacketType::Puback,
66 Packet::Pubrec(_) => PacketType::Pubrec,
67 Packet::Pubrel(_) => PacketType::Pubrel,
68 Packet::Pubcomp(_) => PacketType::Pubcomp,
69 Packet::Subscribe(_) => PacketType::Subscribe,
70 Packet::Suback(_) => PacketType::Suback,
71 Packet::Unsubscribe(_) => PacketType::Unsubscribe,
72 Packet::Unsuback(_) => PacketType::Unsuback,
73 Packet::Disconnect(_) => PacketType::Disconnect,
74 Packet::Auth(_) => PacketType::Auth,
75 }
76 }
77
78 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
80 let header = Header::decode_async(reader).await?;
81 Ok(match header.typ {
82 PacketType::Pingreq => Packet::Pingreq,
83 PacketType::Pingresp => Packet::Pingresp,
84 PacketType::Connect => Connect::decode_async(reader, header).await?.into(),
85 PacketType::Connack => Connack::decode_async(reader, header).await?.into(),
86 PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
87 PacketType::Puback => Puback::decode_async(reader, header).await?.into(),
88 PacketType::Pubrec => Pubrec::decode_async(reader, header).await?.into(),
89 PacketType::Pubrel => Pubrel::decode_async(reader, header).await?.into(),
90 PacketType::Pubcomp => Pubcomp::decode_async(reader, header).await?.into(),
91 PacketType::Subscribe => Subscribe::decode_async(reader, header).await?.into(),
92 PacketType::Suback => Suback::decode_async(reader, header).await?.into(),
93 PacketType::Unsubscribe => Unsubscribe::decode_async(reader, header).await?.into(),
94 PacketType::Unsuback => Unsuback::decode_async(reader, header).await?.into(),
95 PacketType::Disconnect => Disconnect::decode_async(reader, header).await?.into(),
96 PacketType::Auth => Auth::decode_async(reader, header).await?.into(),
97 })
98 }
99
100 pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), ErrorV5> {
102 let data = self.encode()?;
103 writer
104 .write_all(data.as_ref())
105 .await
106 .map_err(|err| Error::IoError(err.kind(), err.to_string()))?;
107 Ok(())
108 }
109
110 pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, ErrorV5> {
113 match block_on(Self::decode_async(&mut bytes)) {
114 Ok(pkt) => Ok(Some(pkt)),
115 Err(ErrorV5::Common(Error::IoError(kind, info))) => {
116 if kind == io::ErrorKind::UnexpectedEof {
117 Ok(None)
118 } else {
119 Err(Error::IoError(kind, info).into())
120 }
121 }
122 Err(err) => Err(err),
123 }
124 }
125
126 pub fn encode(&self) -> Result<VarBytes, Error> {
128 const VOID_PACKET_REMAINING_LEN: u8 = 0;
129 let data = match self {
130 Packet::Pingreq => {
131 const CONTROL_BYTE: u8 = 0b11000000;
132 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
133 }
134 Packet::Pingresp => {
135 const CONTROL_BYTE: u8 = 0b11010000;
136 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
137 }
138 Packet::Publish(publish) => {
139 let mut control_byte: u8 = match publish.qos_pid {
140 QosPid::Level0 => 0b00110000,
141 QosPid::Level1(_) => 0b00110010,
142 QosPid::Level2(_) => 0b00110100,
143 };
144 if publish.dup {
145 control_byte |= 0b00001000;
146 }
147 if publish.retain {
148 control_byte |= 0b00000001;
149 }
150 encode_packet(control_byte, publish)?
151 }
152 Packet::Connect(inner) => {
153 const CONTROL_BYTE: u8 = 0b00010000;
154 encode_packet(CONTROL_BYTE, inner)?
155 }
156 Packet::Connack(inner) => {
157 const CONTROL_BYTE: u8 = 0b00100000;
158 encode_packet(CONTROL_BYTE, inner)?
159 }
160 Packet::Puback(inner) => {
161 const CONTROL_BYTE: u8 = 0b01000000;
162 encode_packet(CONTROL_BYTE, inner)?
163 }
164 Packet::Pubrec(inner) => {
165 const CONTROL_BYTE: u8 = 0b01010000;
166 encode_packet(CONTROL_BYTE, inner)?
167 }
168 Packet::Pubrel(inner) => {
169 const CONTROL_BYTE: u8 = 0b01100010;
170 encode_packet(CONTROL_BYTE, inner)?
171 }
172 Packet::Pubcomp(inner) => {
173 const CONTROL_BYTE: u8 = 0b01110000;
174 encode_packet(CONTROL_BYTE, inner)?
175 }
176 Packet::Subscribe(inner) => {
177 const CONTROL_BYTE: u8 = 0b10000010;
178 encode_packet(CONTROL_BYTE, inner)?
179 }
180 Packet::Suback(inner) => {
181 const CONTROL_BYTE: u8 = 0b10010000;
182 encode_packet(CONTROL_BYTE, inner)?
183 }
184 Packet::Unsubscribe(inner) => {
185 const CONTROL_BYTE: u8 = 0b10100010;
186 encode_packet(CONTROL_BYTE, inner)?
187 }
188 Packet::Unsuback(inner) => {
189 const CONTROL_BYTE: u8 = 0b10110000;
190 encode_packet(CONTROL_BYTE, inner)?
191 }
192 Packet::Disconnect(inner) => {
193 const CONTROL_BYTE: u8 = 0b11100000;
194 encode_packet(CONTROL_BYTE, inner)?
195 }
196 Packet::Auth(inner) => {
197 const CONTROL_BYTE: u8 = 0b11110000;
198 encode_packet(CONTROL_BYTE, inner)?
199 }
200 };
201 Ok(VarBytes::Dynamic(data))
202 }
203
204 pub fn encode_len(&self) -> Result<usize, ErrorV5> {
206 let remaining_len = match self {
207 Packet::Pingreq => return Ok(2),
208 Packet::Pingresp => return Ok(2),
209 Packet::Connect(inner) => inner.encode_len(),
210 Packet::Connack(inner) => inner.encode_len(),
211 Packet::Publish(inner) => inner.encode_len(),
212 Packet::Puback(inner) => inner.encode_len(),
213 Packet::Pubrec(inner) => inner.encode_len(),
214 Packet::Pubrel(inner) => inner.encode_len(),
215 Packet::Pubcomp(inner) => inner.encode_len(),
216 Packet::Subscribe(inner) => inner.encode_len(),
217 Packet::Suback(inner) => inner.encode_len(),
218 Packet::Unsubscribe(inner) => inner.encode_len(),
219 Packet::Unsuback(inner) => inner.encode_len(),
220 Packet::Disconnect(inner) => inner.encode_len(),
221 Packet::Auth(inner) => inner.encode_len(),
222 };
223 Ok(total_len(remaining_len)?)
224 }
225}
226
227#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
229pub enum PacketType {
230 Connect,
231 Connack,
232 Publish,
233 Puback,
234 Pubrec,
235 Pubrel,
236 Pubcomp,
237 Subscribe,
238 Suback,
239 Unsubscribe,
240 Unsuback,
241 Pingreq,
242 Pingresp,
243 Disconnect,
244 Auth,
245}
246
247impl fmt::Display for PacketType {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "{self:?}")
250 }
251}
252
253#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
255pub struct Header {
256 pub typ: PacketType,
257 pub dup: bool,
258 pub qos: QoS,
259 pub retain: bool,
260 pub remaining_len: u32,
261}
262
263impl Header {
264 pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
265 Self {
266 typ,
267 dup,
268 qos,
269 retain,
270 remaining_len,
271 }
272 }
273
274 pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, ErrorV5> {
275 const FLAGS_MASK: u8 = 0b1111;
276 let (typ, flags_ok) = match hd >> 4 {
277 1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
278 2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
279 3 => {
280 return Ok(Header {
281 typ: PacketType::Publish,
282 dup: hd & 0b1000 != 0,
283 qos: QoS::from_u8((hd & 0b110) >> 1)?,
284 retain: hd & 1 == 1,
285 remaining_len,
286 });
287 }
288 4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
289 5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
290 6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
291 7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
292 8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
293 9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
294 10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
295 11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
296 12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
297 13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
298 14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
299 15 => (PacketType::Auth, hd & FLAGS_MASK == 0),
300 _ => return Err(Error::InvalidHeader.into()),
301 };
302 if !flags_ok {
303 return Err(Error::InvalidHeader.into());
304 }
305 Ok(Header {
306 typ,
307 dup: false,
308 qos: QoS::Level0,
309 retain: false,
310 remaining_len,
311 })
312 }
313
314 pub fn decode(mut reader: &[u8]) -> Result<Self, ErrorV5> {
315 block_on(Self::decode_async(&mut reader))
316 }
317
318 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
319 let (typ, remaining_len) = decode_raw_header(reader).await?;
320 Header::new_with(typ, remaining_len)
321 }
322}
323
324packet_from!(
325 Connect,
326 Connack,
327 Publish,
328 Puback,
329 Pubrec,
330 Pubrel,
331 Pubcomp,
332 Subscribe,
333 Suback,
334 Unsubscribe,
335 Unsuback,
336 Disconnect,
337 Auth
338);