rmqtt_codec/v5/packet/
mod.rs1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use bytestring::ByteString;
3
4pub use crate::types::{ConnectAckFlags, ConnectFlags, QoS};
5
6use super::{encode::*, property_type as pt, UserProperties};
7use crate::error::{DecodeError, EncodeError};
8use crate::types::packet_type;
9use crate::utils::{take_properties, write_variable_length, Decode, Property};
10
11mod auth;
12mod connack;
13mod connect;
14mod disconnect;
15mod pubacks;
16mod publish;
17mod subscribe;
18
19pub use auth::*;
20pub use connack::*;
21pub use connect::*;
22pub use disconnect::*;
23pub use pubacks::*;
24pub use publish::*;
25pub use subscribe::*;
26
27#[derive(Debug, PartialEq, Eq, Clone)]
28pub enum Packet {
30 Connect(Box<Connect>),
32 ConnectAck(Box<ConnectAck>),
34 Publish(Box<Publish>),
36 PublishAck(PublishAck),
38 PublishReceived(PublishAck),
40 PublishRelease(PublishAck2),
42 PublishComplete(PublishAck2),
44 Subscribe(Subscribe),
46 SubscribeAck(SubscribeAck),
48 Unsubscribe(Unsubscribe),
50 UnsubscribeAck(UnsubscribeAck),
52 PingRequest,
54 PingResponse,
56 Disconnect(Disconnect),
58 Auth(Auth),
60}
61
62impl Packet {
63 pub fn packet_type(&self) -> u8 {
64 match self {
65 Packet::Connect(_) => packet_type::CONNECT,
66 Packet::ConnectAck(_) => packet_type::CONNACK,
67 Packet::Publish(_) => packet_type::PUBLISH_START,
68 Packet::PublishAck(_) => packet_type::PUBACK,
69 Packet::PublishReceived(_) => packet_type::PUBREC,
70 Packet::PublishRelease(_) => packet_type::PUBREL,
71 Packet::PublishComplete(_) => packet_type::PUBCOMP,
72 Packet::Subscribe(_) => packet_type::SUBSCRIBE,
73 Packet::SubscribeAck(_) => packet_type::SUBACK,
74 Packet::Unsubscribe(_) => packet_type::UNSUBSCRIBE,
75 Packet::UnsubscribeAck(_) => packet_type::UNSUBACK,
76 Packet::PingRequest => packet_type::PINGREQ,
77 Packet::PingResponse => packet_type::PINGRESP,
78 Packet::Disconnect(_) => packet_type::DISCONNECT,
79 Packet::Auth(_) => packet_type::AUTH,
80 }
81 }
82}
83
84impl From<Connect> for Packet {
85 fn from(pkt: Connect) -> Self {
86 Self::Connect(Box::new(pkt))
87 }
88}
89
90impl From<Box<Connect>> for Packet {
91 fn from(pkt: Box<Connect>) -> Self {
92 Self::Connect(pkt)
93 }
94}
95
96impl From<ConnectAck> for Packet {
97 fn from(pkt: ConnectAck) -> Self {
98 Self::ConnectAck(Box::new(pkt))
99 }
100}
101
102impl From<Box<ConnectAck>> for Packet {
103 fn from(pkt: Box<ConnectAck>) -> Self {
104 Self::ConnectAck(pkt)
105 }
106}
107
108impl From<Publish> for Packet {
109 fn from(pkt: Publish) -> Self {
110 Self::Publish(Box::new(pkt))
111 }
112}
113
114impl From<PublishAck> for Packet {
115 fn from(pkt: PublishAck) -> Self {
116 Self::PublishAck(pkt)
117 }
118}
119
120impl From<Subscribe> for Packet {
121 fn from(pkt: Subscribe) -> Self {
122 Self::Subscribe(pkt)
123 }
124}
125
126impl From<SubscribeAck> for Packet {
127 fn from(pkt: SubscribeAck) -> Self {
128 Self::SubscribeAck(pkt)
129 }
130}
131
132impl From<Unsubscribe> for Packet {
133 fn from(pkt: Unsubscribe) -> Self {
134 Self::Unsubscribe(pkt)
135 }
136}
137
138impl From<UnsubscribeAck> for Packet {
139 fn from(pkt: UnsubscribeAck) -> Self {
140 Self::UnsubscribeAck(pkt)
141 }
142}
143
144impl From<Disconnect> for Packet {
145 fn from(pkt: Disconnect) -> Self {
146 Self::Disconnect(pkt)
147 }
148}
149
150impl From<Auth> for Packet {
151 fn from(pkt: Auth) -> Self {
152 Self::Auth(pkt)
153 }
154}
155
156pub(super) mod property_type {
157 pub(crate) const UTF8_PAYLOAD: u8 = 0x01;
158 pub(crate) const MSG_EXPIRY_INT: u8 = 0x02;
159 pub(crate) const CONTENT_TYPE: u8 = 0x03;
160 pub(crate) const RESP_TOPIC: u8 = 0x08;
161 pub(crate) const CORR_DATA: u8 = 0x09;
162 pub(crate) const SUB_ID: u8 = 0x0B;
163 pub(crate) const SESS_EXPIRY_INT: u8 = 0x11;
164 pub(crate) const ASSND_CLIENT_ID: u8 = 0x12;
165 pub(crate) const SERVER_KA: u8 = 0x13;
166 pub(crate) const AUTH_METHOD: u8 = 0x15;
167 pub(crate) const AUTH_DATA: u8 = 0x16;
168 pub(crate) const REQ_PROB_INFO: u8 = 0x17;
169 pub(crate) const WILL_DELAY_INT: u8 = 0x18;
170 pub(crate) const REQ_RESP_INFO: u8 = 0x19;
171 pub(crate) const RESP_INFO: u8 = 0x1A;
172 pub(crate) const SERVER_REF: u8 = 0x1C;
173 pub(crate) const REASON_STRING: u8 = 0x1F;
174 pub(crate) const RECEIVE_MAX: u8 = 0x21;
175 pub(crate) const TOPIC_ALIAS_MAX: u8 = 0x22;
176 pub(crate) const TOPIC_ALIAS: u8 = 0x23;
177 pub(crate) const MAX_QOS: u8 = 0x24;
178 pub(crate) const RETAIN_AVAIL: u8 = 0x25;
179 pub(crate) const USER: u8 = 0x26;
180 pub(crate) const MAX_PACKET_SIZE: u8 = 0x27;
181 pub(crate) const WILDCARD_SUB_AVAIL: u8 = 0x28;
182 pub(crate) const SUB_IDS_AVAIL: u8 = 0x29;
183 pub(crate) const SHARED_SUB_AVAIL: u8 = 0x2A;
184}
185
186mod ack_props {
187 use super::*;
188 use crate::v5::UserProperty;
189
190 pub(crate) fn encoded_size(
191 properties: &[UserProperty],
192 reason_string: &Option<ByteString>,
193 limit: u32,
194 ) -> usize {
195 if limit < 4 {
196 return 1; }
199
200 let len = encoded_size_opt_props(properties, reason_string, limit - 4);
201 var_int_len(len) as usize + len
202 }
203
204 pub(crate) fn encode(
205 properties: &[UserProperty],
206 reason_string: &Option<ByteString>,
207 buf: &mut BytesMut,
208 size: u32,
209 ) -> Result<(), EncodeError> {
210 debug_assert!(size > 0); if size == 1 {
213 buf.put_u8(0);
215 return Ok(());
216 }
217
218 let size = var_int_len_from_size(size);
219 write_variable_length(size, buf);
220 encode_opt_props(properties, reason_string, buf, size)
221 }
222
223 pub(crate) fn decode(src: &mut Bytes) -> Result<(UserProperties, Option<ByteString>), DecodeError> {
225 let prop_src = &mut take_properties(src)?;
226 let mut reason_string = None;
227 let mut user_props = Vec::new();
228 while prop_src.has_remaining() {
229 let prop_id = prop_src.get_u8();
230 match prop_id {
231 pt::REASON_STRING => reason_string.read_value(prop_src)?,
232 pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
233 _ => return Err(DecodeError::MalformedPacket),
234 }
235 }
236
237 Ok((user_props, reason_string))
238 }
239}