1use core::convert::AsRef;
2
3use crate::{
4 block_on, decode_raw_header, encode_packet, packet_from, total_len, AsyncRead, AsyncWrite,
5 Encodable, Error, QoS, QosPid, VarBytes,
6};
7
8use super::{
9 Auth, Connack, Connect, Disconnect, ErrorV5, Puback, Pubcomp, Publish, Pubrec, Pubrel, Suback,
10 Subscribe, Unsuback, Unsubscribe,
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
16pub enum Packet {
17 Connect(Connect),
19 Connack(Connack),
21 Publish(Publish),
23 Puback(Puback),
25 Pubrec(Pubrec),
27 Pubrel(Pubrel),
29 Pubcomp(Pubcomp),
31 Subscribe(Subscribe),
33 Suback(Suback),
35 Unsubscribe(Unsubscribe),
37 Unsuback(Unsuback),
39 Pingreq,
41 Pingresp,
43 Disconnect(Disconnect),
45 Auth(Auth),
47}
48
49impl Packet {
50 pub fn get_type(&self) -> PacketType {
55 match self {
56 Packet::Pingreq => PacketType::Pingreq,
57 Packet::Pingresp => PacketType::Pingresp,
58 Packet::Connect(_) => PacketType::Connect,
59 Packet::Connack(_) => PacketType::Connack,
60 Packet::Publish(_) => PacketType::Publish,
61 Packet::Puback(_) => PacketType::Puback,
62 Packet::Pubrec(_) => PacketType::Pubrec,
63 Packet::Pubrel(_) => PacketType::Pubrel,
64 Packet::Pubcomp(_) => PacketType::Pubcomp,
65 Packet::Subscribe(_) => PacketType::Subscribe,
66 Packet::Suback(_) => PacketType::Suback,
67 Packet::Unsubscribe(_) => PacketType::Unsubscribe,
68 Packet::Unsuback(_) => PacketType::Unsuback,
69 Packet::Disconnect(_) => PacketType::Disconnect,
70 Packet::Auth(_) => PacketType::Auth,
71 }
72 }
73
74 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
76 let header = Header::decode_async(reader).await?;
77 Ok(match header.typ {
78 PacketType::Pingreq => Packet::Pingreq,
79 PacketType::Pingresp => Packet::Pingresp,
80 PacketType::Connect => Connect::decode_async(reader, header).await?.into(),
81 PacketType::Connack => Connack::decode_async(reader, header).await?.into(),
82 PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
83 PacketType::Puback => Puback::decode_async(reader, header).await?.into(),
84 PacketType::Pubrec => Pubrec::decode_async(reader, header).await?.into(),
85 PacketType::Pubrel => Pubrel::decode_async(reader, header).await?.into(),
86 PacketType::Pubcomp => Pubcomp::decode_async(reader, header).await?.into(),
87 PacketType::Subscribe => Subscribe::decode_async(reader, header).await?.into(),
88 PacketType::Suback => Suback::decode_async(reader, header).await?.into(),
89 PacketType::Unsubscribe => Unsubscribe::decode_async(reader, header).await?.into(),
90 PacketType::Unsuback => Unsuback::decode_async(reader, header).await?.into(),
91 PacketType::Disconnect => Disconnect::decode_async(reader, header).await?.into(),
92 PacketType::Auth => Auth::decode_async(reader, header).await?.into(),
93 })
94 }
95
96 pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), ErrorV5> {
98 let data = self.encode().map_err(ErrorV5::Common)?;
99 writer.write_all(data.as_ref()).await?;
100 Ok(())
101 }
102
103 pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, ErrorV5> {
106 match block_on(Self::decode_async(&mut bytes)) {
107 Ok(pkt) => Ok(Some(pkt)),
108 Err(err) => {
109 if let ErrorV5::Common(e) = &err {
110 if e.is_eof() {
111 return Ok(None);
112 }
113 }
114 Err(err)
115 }
116 }
117 }
118
119 pub fn encode(&self) -> Result<VarBytes, Error> {
121 const VOID_PACKET_REMAINING_LEN: u8 = 0;
122 let data = match self {
123 Packet::Pingreq => {
124 const CONTROL_BYTE: u8 = 0b11000000;
125 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
126 }
127 Packet::Pingresp => {
128 const CONTROL_BYTE: u8 = 0b11010000;
129 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
130 }
131 Packet::Publish(publish) => {
132 let mut control_byte: u8 = match publish.qos_pid {
133 QosPid::Level0 => 0b00110000,
134 QosPid::Level1(_) => 0b00110010,
135 QosPid::Level2(_) => 0b00110100,
136 };
137 if publish.dup {
138 control_byte |= 0b00001000;
139 }
140 if publish.retain {
141 control_byte |= 0b00000001;
142 }
143 encode_packet(control_byte, publish)?
144 }
145 Packet::Connect(inner) => {
146 const CONTROL_BYTE: u8 = 0b00010000;
147 encode_packet(CONTROL_BYTE, inner)?
148 }
149 Packet::Connack(inner) => {
150 const CONTROL_BYTE: u8 = 0b00100000;
151 encode_packet(CONTROL_BYTE, inner)?
152 }
153 Packet::Puback(inner) => {
154 const CONTROL_BYTE: u8 = 0b01000000;
155 encode_packet(CONTROL_BYTE, inner)?
156 }
157 Packet::Pubrec(inner) => {
158 const CONTROL_BYTE: u8 = 0b01010000;
159 encode_packet(CONTROL_BYTE, inner)?
160 }
161 Packet::Pubrel(inner) => {
162 const CONTROL_BYTE: u8 = 0b01100010;
163 encode_packet(CONTROL_BYTE, inner)?
164 }
165 Packet::Pubcomp(inner) => {
166 const CONTROL_BYTE: u8 = 0b01110000;
167 encode_packet(CONTROL_BYTE, inner)?
168 }
169 Packet::Subscribe(inner) => {
170 const CONTROL_BYTE: u8 = 0b10000010;
171 encode_packet(CONTROL_BYTE, inner)?
172 }
173 Packet::Suback(inner) => {
174 const CONTROL_BYTE: u8 = 0b10010000;
175 encode_packet(CONTROL_BYTE, inner)?
176 }
177 Packet::Unsubscribe(inner) => {
178 const CONTROL_BYTE: u8 = 0b10100010;
179 encode_packet(CONTROL_BYTE, inner)?
180 }
181 Packet::Unsuback(inner) => {
182 const CONTROL_BYTE: u8 = 0b10110000;
183 encode_packet(CONTROL_BYTE, inner)?
184 }
185 Packet::Disconnect(inner) => {
186 const CONTROL_BYTE: u8 = 0b11100000;
187 encode_packet(CONTROL_BYTE, inner)?
188 }
189 Packet::Auth(inner) => {
190 const CONTROL_BYTE: u8 = 0b11110000;
191 encode_packet(CONTROL_BYTE, inner)?
192 }
193 };
194 Ok(VarBytes::Dynamic(data))
195 }
196
197 pub fn encode_len(&self) -> Result<usize, ErrorV5> {
199 let remaining_len = match self {
200 Packet::Pingreq => return Ok(2),
201 Packet::Pingresp => return Ok(2),
202 Packet::Connect(inner) => inner.encode_len(),
203 Packet::Connack(inner) => inner.encode_len(),
204 Packet::Publish(inner) => inner.encode_len(),
205 Packet::Puback(inner) => inner.encode_len(),
206 Packet::Pubrec(inner) => inner.encode_len(),
207 Packet::Pubrel(inner) => inner.encode_len(),
208 Packet::Pubcomp(inner) => inner.encode_len(),
209 Packet::Subscribe(inner) => inner.encode_len(),
210 Packet::Suback(inner) => inner.encode_len(),
211 Packet::Unsubscribe(inner) => inner.encode_len(),
212 Packet::Unsuback(inner) => inner.encode_len(),
213 Packet::Disconnect(inner) => inner.encode_len(),
214 Packet::Auth(inner) => inner.encode_len(),
215 };
216 Ok(total_len(remaining_len)?)
217 }
218}
219
220#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
222pub enum PacketType {
223 Connect,
224 Connack,
225 Publish,
226 Puback,
227 Pubrec,
228 Pubrel,
229 Pubcomp,
230 Subscribe,
231 Suback,
232 Unsubscribe,
233 Unsuback,
234 Pingreq,
235 Pingresp,
236 Disconnect,
237 Auth,
238}
239
240impl core::fmt::Display for PacketType {
241 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
242 write!(f, "{self:?}")
243 }
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub struct Header {
249 pub typ: PacketType,
250 pub dup: bool,
251 pub qos: QoS,
252 pub retain: bool,
253 pub remaining_len: u32,
254}
255
256impl Header {
257 pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
258 Self {
259 typ,
260 dup,
261 qos,
262 retain,
263 remaining_len,
264 }
265 }
266
267 pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, ErrorV5> {
268 const FLAGS_MASK: u8 = 0b1111;
269 let (typ, flags_ok) = match hd >> 4 {
270 1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
271 2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
272 3 => {
273 return Ok(Header {
274 typ: PacketType::Publish,
275 dup: hd & 0b1000 != 0,
276 qos: QoS::from_u8((hd & 0b110) >> 1)?,
277 retain: hd & 1 == 1,
278 remaining_len,
279 });
280 }
281 4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
282 5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
283 6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
284 7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
285 8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
286 9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
287 10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
288 11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
289 12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
290 13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
291 14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
292 15 => (PacketType::Auth, hd & FLAGS_MASK == 0),
293 _ => return Err(Error::InvalidHeader.into()),
294 };
295 if !flags_ok {
296 return Err(Error::InvalidHeader.into());
297 }
298 Ok(Header {
299 typ,
300 dup: false,
301 qos: QoS::Level0,
302 retain: false,
303 remaining_len,
304 })
305 }
306
307 pub fn decode(mut reader: &[u8]) -> Result<Self, ErrorV5> {
308 block_on(Self::decode_async(&mut reader))
309 }
310
311 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
312 let (typ, remaining_len) = decode_raw_header(reader).await?;
313 Header::new_with(typ, remaining_len)
314 }
315}
316
317packet_from!(
318 Connect,
319 Connack,
320 Publish,
321 Puback,
322 Pubrec,
323 Pubrel,
324 Pubcomp,
325 Subscribe,
326 Suback,
327 Unsubscribe,
328 Unsuback,
329 Disconnect,
330 Auth
331);