1use core::convert::AsRef;
2
3use crate::{
4 block_on, decode_raw_header, encode_packet, packet_from, read_u16, total_len, AsyncRead,
5 AsyncWrite, Encodable, Error, Pid, QoS, QosPid, VarBytes,
6};
7
8use super::{Connack, Connect, Publish, Suback, Subscribe, Unsubscribe};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
13pub enum Packet {
14 Connect(Connect),
16 Connack(Connack),
18 Publish(Publish),
20 Puback(Pid),
22 Pubrec(Pid),
24 Pubrel(Pid),
26 Pubcomp(Pid),
28 Subscribe(Subscribe),
30 Suback(Suback),
32 Unsubscribe(Unsubscribe),
34 Unsuback(Pid),
36 Pingreq,
38 Pingresp,
40 Disconnect,
42}
43
44impl Packet {
45 pub fn get_type(&self) -> PacketType {
50 match self {
51 Packet::Pingreq => PacketType::Pingreq,
52 Packet::Pingresp => PacketType::Pingresp,
53 Packet::Connect(_) => PacketType::Connect,
54 Packet::Connack(_) => PacketType::Connack,
55 Packet::Publish(_) => PacketType::Publish,
56 Packet::Puback(_) => PacketType::Puback,
57 Packet::Pubrec(_) => PacketType::Pubrec,
58 Packet::Pubrel(_) => PacketType::Pubrel,
59 Packet::Pubcomp(_) => PacketType::Pubcomp,
60 Packet::Subscribe(_) => PacketType::Subscribe,
61 Packet::Suback(_) => PacketType::Suback,
62 Packet::Unsubscribe(_) => PacketType::Unsubscribe,
63 Packet::Unsuback(_) => PacketType::Unsuback,
64 Packet::Disconnect => PacketType::Disconnect,
65 }
66 }
67
68 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
70 let header = Header::decode_async(reader).await?;
71 Ok(match header.typ {
72 PacketType::Pingreq => Packet::Pingreq,
73 PacketType::Pingresp => Packet::Pingresp,
74 PacketType::Disconnect => Packet::Disconnect,
75
76 PacketType::Connect => Connect::decode_async(reader).await?.into(),
77 PacketType::Connack => Connack::decode_async(reader).await?.into(),
78 PacketType::Publish => Publish::decode_async(reader, header).await?.into(),
79 PacketType::Puback => Packet::Puback(Pid::try_from(read_u16(reader).await?)?),
80 PacketType::Pubrec => Packet::Pubrec(Pid::try_from(read_u16(reader).await?)?),
81 PacketType::Pubrel => Packet::Pubrel(Pid::try_from(read_u16(reader).await?)?),
82 PacketType::Pubcomp => Packet::Pubcomp(Pid::try_from(read_u16(reader).await?)?),
83 PacketType::Subscribe => Subscribe::decode_async(reader, header.remaining_len as usize)
84 .await?
85 .into(),
86 PacketType::Suback => Suback::decode_async(reader, header.remaining_len as usize)
87 .await?
88 .into(),
89 PacketType::Unsubscribe => {
90 Unsubscribe::decode_async(reader, header.remaining_len as usize)
91 .await?
92 .into()
93 }
94 PacketType::Unsuback => Packet::Unsuback(Pid::try_from(read_u16(reader).await?)?),
95 })
96 }
97
98 pub async fn encode_async<T: AsyncWrite + Unpin>(&self, writer: &mut T) -> Result<(), Error> {
100 let data = self.encode()?;
101 writer.write_all(data.as_ref()).await?;
102 Ok(())
103 }
104
105 pub fn decode(mut bytes: &[u8]) -> Result<Option<Self>, Error> {
108 match block_on(Self::decode_async(&mut bytes)) {
109 Ok(pkt) => Ok(Some(pkt)),
110 Err(err) => {
111 if err.is_eof() {
112 Ok(None)
113 } else {
114 Err(err)
115 }
116 }
117 }
118 }
119
120 pub fn encode(&self) -> Result<VarBytes, Error> {
122 const VOID_PACKET_REMAINING_LEN: u8 = 0;
123 let data = match self {
124 Packet::Pingreq => {
125 const CONTROL_BYTE: u8 = 0b11000000;
126 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
127 }
128 Packet::Pingresp => {
129 const CONTROL_BYTE: u8 = 0b11010000;
130 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
131 }
132 Packet::Connect(connect) => {
133 const CONTROL_BYTE: u8 = 0b00010000;
134 encode_packet(CONTROL_BYTE, connect)?
135 }
136 Packet::Connack(connack) => {
137 const CONTROL_BYTE: u8 = 0b00100000;
138 const REMAINING_LEN: u8 = 2;
139 let flags: u8 = connack.session_present.into();
140 let rc: u8 = connack.code as u8;
141 return Ok(VarBytes::Fixed4([CONTROL_BYTE, REMAINING_LEN, flags, rc]));
142 }
143 Packet::Publish(publish) => {
144 let mut control_byte: u8 = match publish.qos_pid {
145 QosPid::Level0 => 0b00110000,
146 QosPid::Level1(_) => 0b00110010,
147 QosPid::Level2(_) => 0b00110100,
148 };
149 if publish.dup {
150 control_byte |= 0b00001000;
151 }
152 if publish.retain {
153 control_byte |= 0b00000001;
154 }
155 encode_packet(control_byte, publish)?
156 }
157 Packet::Puback(pid) => {
158 const CONTROL_BYTE: u8 = 0b01000000;
159 return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
160 }
161 Packet::Pubrec(pid) => {
162 const CONTROL_BYTE: u8 = 0b01010000;
163 return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
164 }
165 Packet::Pubrel(pid) => {
166 const CONTROL_BYTE: u8 = 0b01100010;
167 return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
168 }
169 Packet::Pubcomp(pid) => {
170 const CONTROL_BYTE: u8 = 0b01110000;
171 return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
172 }
173 Packet::Subscribe(subscribe) => {
174 const CONTROL_BYTE: u8 = 0b10000010;
175 encode_packet(CONTROL_BYTE, subscribe)?
176 }
177 Packet::Suback(suback) => {
178 const CONTROL_BYTE: u8 = 0b10010000;
179 encode_packet(CONTROL_BYTE, suback)?
180 }
181 Packet::Unsubscribe(unsubscribe) => {
182 const CONTROL_BYTE: u8 = 0b10100010;
183 encode_packet(CONTROL_BYTE, unsubscribe)?
184 }
185 Packet::Unsuback(pid) => {
186 const CONTROL_BYTE: u8 = 0b10110000;
187 return Ok(VarBytes::Fixed4(encode_with_pid(CONTROL_BYTE, *pid)));
188 }
189 Packet::Disconnect => {
190 const CONTROL_BYTE: u8 = 0b11100000;
191 return Ok(VarBytes::Fixed2([CONTROL_BYTE, VOID_PACKET_REMAINING_LEN]));
192 }
193 };
194 Ok(VarBytes::Dynamic(data))
195 }
196
197 pub fn encode_len(&self) -> Result<usize, Error> {
199 let remaining_len = match self {
200 Packet::Pingreq => return Ok(2),
201 Packet::Pingresp => return Ok(2),
202 Packet::Disconnect => return Ok(2),
203 Packet::Connack(_) => return Ok(4),
204 Packet::Puback(_) => return Ok(4),
205 Packet::Pubrec(_) => return Ok(4),
206 Packet::Pubrel(_) => return Ok(4),
207 Packet::Pubcomp(_) => return Ok(4),
208 Packet::Unsuback(_) => return Ok(4),
209 Packet::Connect(inner) => inner.encode_len(),
210 Packet::Publish(inner) => inner.encode_len(),
211 Packet::Subscribe(inner) => inner.encode_len(),
212 Packet::Suback(inner) => inner.encode_len(),
213 Packet::Unsubscribe(inner) => inner.encode_len(),
214 };
215 total_len(remaining_len)
216 }
217}
218
219#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
221pub enum PacketType {
222 Connect,
223 Connack,
224 Publish,
225 Puback,
226 Pubrec,
227 Pubrel,
228 Pubcomp,
229 Subscribe,
230 Suback,
231 Unsubscribe,
232 Unsuback,
233 Pingreq,
234 Pingresp,
235 Disconnect,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
240pub struct Header {
241 pub typ: PacketType,
242 pub dup: bool,
243 pub qos: QoS,
244 pub retain: bool,
245 pub remaining_len: u32,
246}
247
248impl Header {
249 pub fn new(typ: PacketType, dup: bool, qos: QoS, retain: bool, remaining_len: u32) -> Self {
250 Self {
251 typ,
252 dup,
253 qos,
254 retain,
255 remaining_len,
256 }
257 }
258
259 pub fn new_with(hd: u8, remaining_len: u32) -> Result<Header, Error> {
260 const FLAGS_MASK: u8 = 0b1111;
261 let (typ, flags_ok) = match hd >> 4 {
262 1 => (PacketType::Connect, hd & FLAGS_MASK == 0),
263 2 => (PacketType::Connack, hd & FLAGS_MASK == 0),
264 3 => {
265 return Ok(Header {
266 typ: PacketType::Publish,
267 dup: hd & 0b1000 != 0,
268 qos: QoS::from_u8((hd & 0b110) >> 1)?,
269 retain: hd & 1 == 1,
270 remaining_len,
271 });
272 }
273 4 => (PacketType::Puback, hd & FLAGS_MASK == 0),
274 5 => (PacketType::Pubrec, hd & FLAGS_MASK == 0),
275 6 => (PacketType::Pubrel, hd & FLAGS_MASK == 0b0010),
276 7 => (PacketType::Pubcomp, hd & FLAGS_MASK == 0),
277 8 => (PacketType::Subscribe, hd & FLAGS_MASK == 0b0010),
278 9 => (PacketType::Suback, hd & FLAGS_MASK == 0),
279 10 => (PacketType::Unsubscribe, hd & FLAGS_MASK == 0b0010),
280 11 => (PacketType::Unsuback, hd & FLAGS_MASK == 0),
281 12 => (PacketType::Pingreq, hd & FLAGS_MASK == 0),
282 13 => (PacketType::Pingresp, hd & FLAGS_MASK == 0),
283 14 => (PacketType::Disconnect, hd & FLAGS_MASK == 0),
284 _ => return Err(Error::InvalidHeader),
285 };
286 if !flags_ok {
287 return Err(Error::InvalidHeader);
288 }
289 Ok(Header {
290 typ,
291 dup: false,
292 qos: QoS::Level0,
293 retain: false,
294 remaining_len,
295 })
296 }
297
298 pub fn decode(mut reader: &[u8]) -> Result<Self, Error> {
299 block_on(Self::decode_async(&mut reader))
300 }
301
302 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, Error> {
303 let (typ, remaining_len) = decode_raw_header(reader).await?;
304 Header::new_with(typ, remaining_len)
305 }
306}
307
308#[inline]
309fn encode_with_pid(control_byte: u8, pid: Pid) -> [u8; 4] {
310 const REMAINING_LEN: u8 = 2;
311 let val = pid.value();
312 [
313 control_byte,
314 REMAINING_LEN,
315 (val >> 8) as u8,
316 (val & 0xFF) as u8,
317 ]
318}
319
320packet_from!(Connect, Publish, Suback, Connack, Subscribe, Unsubscribe);