ntex_mqtt/v5/codec/packet/
mod.rs1use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
2
3pub use crate::types::{ConnectAckFlags, ConnectFlags, QoS};
4
5use super::{UserProperties, encode::*, property_type as pt};
6use crate::error::{DecodeError, EncodeError};
7use crate::types::packet_type;
8use crate::utils::{Decode, Property, take_properties, write_variable_length};
9
10mod auth;
11mod connack;
12mod connect;
13mod disconnect;
14mod pubacks;
15mod publish;
16mod subscribe;
17
18pub use auth::*;
19pub use connack::*;
20pub use connect::*;
21pub use disconnect::*;
22pub use pubacks::*;
23pub use publish::*;
24pub use subscribe::*;
25
26#[derive(Debug, PartialEq, Eq, Clone)]
27pub enum Packet {
29 Connect(Box<Connect>),
31 ConnectAck(Box<ConnectAck>),
33 PublishAck(PublishAck),
35 PublishReceived(PublishAck),
37 PublishRelease(PublishAck2),
39 PublishComplete(PublishAck2),
41 Subscribe(Subscribe),
43 SubscribeAck(SubscribeAck),
45 Unsubscribe(Unsubscribe),
47 UnsubscribeAck(UnsubscribeAck),
49 PingRequest,
51 PingResponse,
53 Disconnect(Disconnect),
55 Auth(Auth),
57}
58
59impl Packet {
60 pub fn packet_type(&self) -> u8 {
61 match self {
62 Packet::Connect(_) => packet_type::CONNECT,
63 Packet::ConnectAck(_) => packet_type::CONNACK,
64 Packet::PublishAck(_) => packet_type::PUBACK,
65 Packet::PublishReceived(_) => packet_type::PUBREC,
66 Packet::PublishRelease(_) => packet_type::PUBREL,
67 Packet::PublishComplete(_) => packet_type::PUBCOMP,
68 Packet::Subscribe(_) => packet_type::SUBSCRIBE,
69 Packet::SubscribeAck(_) => packet_type::SUBACK,
70 Packet::Unsubscribe(_) => packet_type::UNSUBSCRIBE,
71 Packet::UnsubscribeAck(_) => packet_type::UNSUBACK,
72 Packet::PingRequest => packet_type::PINGREQ,
73 Packet::PingResponse => packet_type::PINGRESP,
74 Packet::Disconnect(_) => packet_type::DISCONNECT,
75 Packet::Auth(_) => packet_type::AUTH,
76 }
77 }
78}
79
80impl From<Connect> for Packet {
81 fn from(pkt: Connect) -> Self {
82 Self::Connect(Box::new(pkt))
83 }
84}
85
86impl From<Box<Connect>> for Packet {
87 fn from(pkt: Box<Connect>) -> Self {
88 Self::Connect(pkt)
89 }
90}
91
92impl From<ConnectAck> for Packet {
93 fn from(pkt: ConnectAck) -> Self {
94 Self::ConnectAck(Box::new(pkt))
95 }
96}
97
98impl From<Box<ConnectAck>> for Packet {
99 fn from(pkt: Box<ConnectAck>) -> Self {
100 Self::ConnectAck(pkt)
101 }
102}
103
104impl From<PublishAck> for Packet {
105 fn from(pkt: PublishAck) -> Self {
106 Self::PublishAck(pkt)
107 }
108}
109
110impl From<Subscribe> for Packet {
111 fn from(pkt: Subscribe) -> Self {
112 Self::Subscribe(pkt)
113 }
114}
115
116impl From<SubscribeAck> for Packet {
117 fn from(pkt: SubscribeAck) -> Self {
118 Self::SubscribeAck(pkt)
119 }
120}
121
122impl From<Unsubscribe> for Packet {
123 fn from(pkt: Unsubscribe) -> Self {
124 Self::Unsubscribe(pkt)
125 }
126}
127
128impl From<UnsubscribeAck> for Packet {
129 fn from(pkt: UnsubscribeAck) -> Self {
130 Self::UnsubscribeAck(pkt)
131 }
132}
133
134impl From<Disconnect> for Packet {
135 fn from(pkt: Disconnect) -> Self {
136 Self::Disconnect(pkt)
137 }
138}
139
140impl From<Auth> for Packet {
141 fn from(pkt: Auth) -> Self {
142 Self::Auth(pkt)
143 }
144}
145
146pub(super) mod property_type {
147 pub(crate) const UTF8_PAYLOAD: u8 = 0x01;
148 pub(crate) const MSG_EXPIRY_INT: u8 = 0x02;
149 pub(crate) const CONTENT_TYPE: u8 = 0x03;
150 pub(crate) const RESP_TOPIC: u8 = 0x08;
151 pub(crate) const CORR_DATA: u8 = 0x09;
152 pub(crate) const SUB_ID: u8 = 0x0B;
153 pub(crate) const SESS_EXPIRY_INT: u8 = 0x11;
154 pub(crate) const ASSND_CLIENT_ID: u8 = 0x12;
155 pub(crate) const SERVER_KA: u8 = 0x13;
156 pub(crate) const AUTH_METHOD: u8 = 0x15;
157 pub(crate) const AUTH_DATA: u8 = 0x16;
158 pub(crate) const REQ_PROB_INFO: u8 = 0x17;
159 pub(crate) const WILL_DELAY_INT: u8 = 0x18;
160 pub(crate) const REQ_RESP_INFO: u8 = 0x19;
161 pub(crate) const RESP_INFO: u8 = 0x1A;
162 pub(crate) const SERVER_REF: u8 = 0x1C;
163 pub(crate) const REASON_STRING: u8 = 0x1F;
164 pub(crate) const RECEIVE_MAX: u8 = 0x21;
165 pub(crate) const TOPIC_ALIAS_MAX: u8 = 0x22;
166 pub(crate) const TOPIC_ALIAS: u8 = 0x23;
167 pub(crate) const MAX_QOS: u8 = 0x24;
168 pub(crate) const RETAIN_AVAIL: u8 = 0x25;
169 pub(crate) const USER: u8 = 0x26;
170 pub(crate) const MAX_PACKET_SIZE: u8 = 0x27;
171 pub(crate) const WILDCARD_SUB_AVAIL: u8 = 0x28;
172 pub(crate) const SUB_IDS_AVAIL: u8 = 0x29;
173 pub(crate) const SHARED_SUB_AVAIL: u8 = 0x2A;
174}
175
176mod ack_props {
177 use super::*;
178 use crate::v5::codec::UserProperty;
179
180 pub(crate) fn encoded_size(
181 properties: &[UserProperty],
182 reason_string: &Option<ByteString>,
183 limit: u32,
184 ) -> usize {
185 if limit < 4 {
186 return 1; }
189
190 let len = encoded_size_opt_props(properties, reason_string, limit - 4);
191 var_int_len(len) as usize + len
192 }
193
194 pub(crate) fn encode(
195 properties: &[UserProperty],
196 reason_string: &Option<ByteString>,
197 buf: &mut BytesMut,
198 size: u32,
199 ) -> Result<(), EncodeError> {
200 debug_assert!(size > 0); if size == 1 {
203 buf.put_u8(0);
205 return Ok(());
206 }
207
208 let size = var_int_len_from_size(size);
209 write_variable_length(size, buf);
210 encode_opt_props(properties, reason_string, buf, size)
211 }
212
213 pub(crate) fn decode(
215 src: &mut Bytes,
216 ) -> Result<(UserProperties, Option<ByteString>), DecodeError> {
217 let prop_src = &mut take_properties(src)?;
218 let mut reason_string = None;
219 let mut user_props = Vec::new();
220 while prop_src.has_remaining() {
221 let prop_id = prop_src.get_u8();
222 match prop_id {
223 pt::REASON_STRING => reason_string.read_value(prop_src)?,
224 pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
225 _ => return Err(DecodeError::MalformedPacket),
226 }
227 }
228
229 Ok((user_props, reason_string))
230 }
231}