1use futures_lite::future::block_on;
2use std::convert::AsRef;
3use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
4
5use super::{Connack, Connect, Publish, Suback, Subscribe, Unsubscribe};
6use crate::{
7 decode_raw_header, encode_packet, packet_from, read_u16, total_len, Encodable, Error, Pid, QoS,
8 QosPid, VarBytes,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
13#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
14pub enum Packet {
15 Connect(Connect),
17 Connack(Connack),
19 Publish(Publish),
21 Puback(Pid),
23 Pubrec(Pid),
25 Pubrel(Pid),
27 Pubcomp(Pid),
29 Subscribe(Subscribe),
31 Suback(Suback),
33 Unsubscribe(Unsubscribe),
35 Unsuback(Pid),
37 Pingreq,
39 Pingresp,
41 Disconnect,
43}
44
45impl Packet {
46 pub fn get_type(&self) -> PacketType {
51 match self {
52 Packet::Pingreq => PacketType::Pingreq,
53 Packet::Pingresp => PacketType::Pingresp,
54 Packet::Connect(_) => PacketType::Connect,
55 Packet::Connack(_) => PacketType::Connack,
56 Packet::Publish(_) => PacketType::Publish,
57 Packet::Puback(_) => PacketType::Puback,
58 Packet::Pubrec(_) => PacketType::Pubrec,
59 Packet::Pubrel(_) => PacketType::Pubrel,
60 Packet::Pubcomp(_) => PacketType::Pubcomp,
61 Packet::Subscribe(_) => PacketType::Subscribe,
62 Packet::Suback(_) => PacketType::Suback,
63 Packet::Unsubscribe(_) => PacketType::Unsubscribe,
64 Packet::Unsuback(_) => PacketType::Unsuback,
65 Packet::Disconnect => PacketType::Disconnect,
66 }
67 }
68
69 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
71 let header = Header::decode_async(reader).await?;
72 Ok(match header.typ {
73 PacketType::Pingreq => Packet::Pingreq,
74 PacketType::Pingresp => Packet::Pingresp,
75 PacketType::Disconnect => Packet::Disconnect,
76
77 PacketType::Connect => Connect::decode_async(reader).await?.into(),
78 PacketType::Connack => Connack::decode_async(reader).await?.into(),
79 PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
80 PacketType::Puback => Packet::Puback(Pid::try_from(read_u16(reader).await?)?),
81 PacketType::Pubrec => Packet::Pubrec(Pid::try_from(read_u16(reader).await?)?),
82 PacketType::Pubrel => Packet::Pubrel(Pid::try_from(read_u16(reader).await?)?),
83 PacketType::Pubcomp => Packet::Pubcomp(Pid::try_from(read_u16(reader).await?)?),
84 PacketType::Subscribe => Subscribe::decode_async(reader, header.remaining_len as usize)
85 .await?
86 .into(),
87 PacketType::Suback => Suback::decode_async(reader, header.remaining_len as usize)
88 .await?
89 .into(),
90 PacketType::Unsubscribe => {
91 Unsubscribe::decode_async(reader, header.remaining_len as usize)
92 .await?
93 .into()
94 }
95 PacketType::Unsuback => Packet::Unsuback(Pid::try_from(read_u16(reader).await?)?),
96 })
97 }
98
99 pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), Error> {
101 let data = self.encode()?;
102 writer.write_all(data.as_ref()).await?;
103 Ok(())
104 }
105
106 pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, Error> {
109 match block_on(Self::decode_async(&mut bytes)) {
110 Ok(pkt) => Ok(Some(pkt)),
111 Err(err) => {
112 if err.is_eof() {
113 Ok(None)
114 } else {
115 Err(err)
116 }
117 }
118 }
119 }
120
121 pub fn encode(&self) -> Result<VarBytes, Error> {
123 const VOID_PACKET_REMAINING_LEN: u8 = 0;
124 let data = match self {
125 Packet::Pingreq => {
126 const CONTROL_BYTE: u8 = 0b11000000;
127 VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
128 }
129 Packet::Pingresp => {
130 const CONTROL_BYTE: u8 = 0b11010000;
131 VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
132 }
133 Packet::Connect(connect) => {
134 const CONTROL_BYTE: u8 = 0b00010000;
135 VarBytes::Dynamic(encode_packet(CONTROL_BYTE, connect)?)
136 }
137 Packet::Connack(connack) => {
138 const CONTROL_BYTE: u8 = 0b00100000;
139 const REMAINING_LEN: u8 = 2;
140 let flags: u8 = connack.session_present.into();
141 let rc: u8 = connack.code as u8;
142 VarBytes::Fixed4([CONTROL_BYTE, REMAINING_LEN, flags, rc])
143 }
144 Packet::Publish(publish) => {
145 let mut control_byte: u8 = match publish.qos_pid {
146 QosPid::Level0 => 0b00110000,
147 QosPid::Level1(_) => 0b00110010,
148 QosPid::Level2(_) => 0b00110100,
149 };
150 if publish.dup {
151 control_byte |= 0b00001000;
152 }
153 if publish.retain {
154 control_byte |= 0b00000001;
155 }
156 VarBytes::Dynamic(encode_packet(control_byte, publish)?)
157 }
158 Packet::Puback(pid) => {
159 const CONTROL_BYTE: u8 = 0b01000000;
160 VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
161 }
162 Packet::Pubrec(pid) => {
163 const CONTROL_BYTE: u8 = 0b01010000;
164 VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
165 }
166 Packet::Pubrel(pid) => {
167 const CONTROL_BYTE: u8 = 0b01100010;
168 VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
169 }
170 Packet::Pubcomp(pid) => {
171 const CONTROL_BYTE: u8 = 0b01110000;
172 VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
173 }
174 Packet::Subscribe(subscribe) => {
175 const CONTROL_BYTE: u8 = 0b10000010;
176 VarBytes::Dynamic(encode_packet(CONTROL_BYTE, subscribe)?)
177 }
178 Packet::Suback(suback) => {
179 const CONTROL_BYTE: u8 = 0b10010000;
180 VarBytes::Dynamic(encode_packet(CONTROL_BYTE, suback)?)
181 }
182 Packet::Unsubscribe(unsubscribe) => {
183 const CONTROL_BYTE: u8 = 0b10100010;
184 VarBytes::Dynamic(encode_packet(CONTROL_BYTE, unsubscribe)?)
185 }
186 Packet::Unsuback(pid) => {
187 const CONTROL_BYTE: u8 = 0b10110000;
188 VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid))
189 }
190 Packet::Disconnect => {
191 const CONTROL_BYTE: u8 = 0b11100000;
192 VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN])
193 }
194 };
195 Ok(data)
196 }
197
198 pub fn encode_len(&self) -> Result<usize, Error> {
200 let remaining_len = match self {
201 Packet::Pingreq => return Ok(2),
202 Packet::Pingresp => return Ok(2),
203 Packet::Disconnect => return Ok(2),
204 Packet::Connack(_) => return Ok(4),
205 Packet::Puback(_) => return Ok(4),
206 Packet::Pubrec(_) => return Ok(4),
207 Packet::Pubrel(_) => return Ok(4),
208 Packet::Pubcomp(_) => return Ok(4),
209 Packet::Unsuback(_) => return Ok(4),
210 Packet::Connect(inner) => inner.encode_len(),
211 Packet::Publish(inner) => inner.encode_len(),
212 Packet::Subscribe(inner) => inner.encode_len(),
213 Packet::Suback(inner) => inner.encode_len(),
214 Packet::Unsubscribe(inner) => inner.encode_len(),
215 };
216 total_len(remaining_len)
217 }
218}
219
220#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
222pub enum PacketType {
223 Connect,
224 Connack,
225 Publish,
226 Puback,
227 Pubrec,
228 Pubrel,
229 Pubcomp,
230 Subscribe,
231 Suback,
232 Unsubscribe,
233 Unsuback,
234 Pingreq,
235 Pingresp,
236 Disconnect,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
241pub struct Header {
242 pub typ: PacketType,
243 pub dup: bool,
244 pub qos: QoS,
245 pub retain: bool,
246 pub remaining_len: u32,
247}
248
249impl Header {
250 pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
251 Self {
252 typ,
253 dup,
254 qos,
255 retain,
256 remaining_len,
257 }
258 }
259
260 pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, Error> {
261 const FLAGS_MASK: u8 = 0b1111;
262 let (typ, flags_ok) = match hd >> 4 {
263 1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
264 2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
265 3 => {
266 return Ok(Header {
267 typ: PacketType::Publish,
268 dup: hd & 0b1000 != 0,
269 qos: QoS::from_u8((hd & 0b110) >> 1)?,
270 retain: hd & 1 == 1,
271 remaining_len,
272 });
273 }
274 4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
275 5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
276 6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
277 7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
278 8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
279 9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
280 10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
281 11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
282 12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
283 13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
284 14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
285 _ => return Err(Error::InvalidHeader),
286 };
287 if !flags_ok {
288 return Err(Error::InvalidHeader);
289 }
290 Ok(Header {
291 typ,
292 dup: false,
293 qos: QoS::Level0,
294 retain: false,
295 remaining_len,
296 })
297 }
298
299 pub fn decode(mut reader: &[u8]) -> Result<Self, Error> {
300 block_on(Self::decode_async(&mut reader))
301 }
302
303 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
304 let (typ, remaining_len) = decode_raw_header(reader).await?;
305 Header::new_with(typ, remaining_len)
306 }
307}
308
309#[inline]
310fn encode_with_pid(control_byte: u8, pid: Pid) -> [u8; 4] {
311 const REMAINING_LEN: u8 = 2;
312 let val = pid.value();
313 [
314 control_byte,
315 REMAINING_LEN,
316 (val >> 8) as u8,
317 (val & 0xFF) as u8,
318 ]
319}
320
321packet_from!(Connect, Publish, Suback, Connack, Subscribe, Unsubscribe);