1use super::property::{
9 property_decode, property_decode_non_zero, property_encode, property_len, Property,
10 PropertyFrame,
11};
12use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
13use crate::codec::{Decode, Encode, RawPacket};
14use crate::protocol::util::len_bytes;
15use crate::protocol::v5::reason::ReasonCode;
16use crate::protocol::{FixedHeader, PacketType, QoS};
17use crate::Error;
18use bit_field::BitField;
19use bytes::{Buf, BufMut, Bytes, BytesMut};
20
21#[derive(Debug, Default, Clone, PartialEq, Eq)]
39pub struct ConnAckProperties {
40 pub session_expiry_interval: Option<u32>,
42 pub receive_maximum: Option<u16>,
44 pub maximum_qos: Option<QoS>,
46 pub retain_available: Option<bool>,
48 pub maximum_packet_size: Option<u32>,
50 pub assigned_client_id: Option<String>,
52 pub topic_alias_maximum: Option<u16>,
54 pub reason: Option<String>,
56 pub user_properties: Vec<(String, String)>,
58 pub wildcard_subscription_available: Option<bool>,
60 pub subscription_id_available: Option<bool>,
62 pub shared_subscription_available: Option<bool>,
64 pub server_keep_alive: Option<u16>,
66 pub response_info: Option<String>,
68 pub server_reference: Option<String>,
70 pub auth_method: Option<String>,
72 pub auth_data: Option<Bytes>,
74}
75
76impl PropertyFrame for ConnAckProperties {
77 fn encoded_len(&self) -> usize {
79 let mut len = 0;
80
81 len += property_len!(&self.session_expiry_interval);
82 len += property_len!(&self.receive_maximum);
83 len += property_len!(&self.maximum_qos);
84 len += property_len!(&self.retain_available);
85 len += property_len!(&self.maximum_packet_size);
86 len += property_len!(&self.assigned_client_id);
87 len += property_len!(&self.topic_alias_maximum);
88 len += property_len!(&self.reason);
89 len += property_len!(&self.user_properties);
90 len += property_len!(&self.wildcard_subscription_available);
91 len += property_len!(&self.subscription_id_available);
92 len += property_len!(&self.shared_subscription_available);
93 len += property_len!(&self.server_keep_alive);
94 len += property_len!(&self.response_info);
95 len += property_len!(&self.server_reference);
96 len += property_len!(&self.auth_method);
97 len += property_len!(&self.auth_data);
98
99 len
100 }
101
102 fn encode(&self, buf: &mut BytesMut) {
104 property_encode!(
105 &self.session_expiry_interval,
106 Property::SessionExpiryInterval,
107 buf
108 );
109 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
110 property_encode!(&self.maximum_qos, Property::MaximumQoS, buf);
111 property_encode!(&self.retain_available, Property::RetainAvailable, buf);
112 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
113 property_encode!(
114 &self.assigned_client_id,
115 Property::AssignedClientIdentifier,
116 buf
117 );
118 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
119 property_encode!(&self.reason, Property::ReasonString, buf);
120 property_encode!(&self.user_properties, Property::UserProp, buf);
121 property_encode!(
122 &self.wildcard_subscription_available,
123 Property::WildcardSubscriptionAvailable,
124 buf
125 );
126 property_encode!(
127 &self.subscription_id_available,
128 Property::SubscriptionIdentifierAvailable,
129 buf
130 );
131 property_encode!(
132 &self.shared_subscription_available,
133 Property::SharedSubscriptionAvailable,
134 buf
135 );
136 property_encode!(&self.server_keep_alive, Property::ServerKeepAlive, buf);
137 property_encode!(&self.response_info, Property::ResponseInformation, buf);
138 property_encode!(&self.server_reference, Property::ServerReference, buf);
139 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
140 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
141 }
142
143 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
145 if buf.is_empty() {
146 return Ok(None);
147 }
148
149 let mut properties = ConnAckProperties::default();
150
151 while buf.has_remaining() {
152 let property: Property = decode_byte(buf)?.try_into()?;
153 match property {
154 Property::SessionExpiryInterval => {
155 property_decode!(&mut properties.session_expiry_interval, buf);
156 }
157 Property::ReceiveMaximum => {
158 property_decode_non_zero!(&mut properties.receive_maximum, buf);
159 }
160 Property::MaximumQoS => {
161 property_decode!(&mut properties.maximum_qos, buf);
162 }
163 Property::RetainAvailable => {
164 property_decode!(&mut properties.retain_available, buf);
165 }
166 Property::MaximumPacketSize => {
167 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
168 }
169 Property::AssignedClientIdentifier => {
170 property_decode!(&mut properties.assigned_client_id, buf);
171 }
172 Property::TopicAliasMaximum => {
173 property_decode!(&mut properties.topic_alias_maximum, buf);
174 }
175 Property::ReasonString => {
176 property_decode!(&mut properties.reason, buf);
177 }
178 Property::UserProp => {
179 property_decode!(&mut properties.user_properties, buf);
180 }
181 Property::WildcardSubscriptionAvailable => {
182 property_decode!(&mut properties.wildcard_subscription_available, buf);
183 }
184 Property::SubscriptionIdentifierAvailable => {
185 property_decode!(&mut properties.subscription_id_available, buf);
186 }
187 Property::SharedSubscriptionAvailable => {
188 property_decode!(&mut properties.shared_subscription_available, buf);
189 }
190 Property::ServerKeepAlive => {
191 property_decode!(&mut properties.server_keep_alive, buf);
192 }
193 Property::ResponseInformation => {
194 property_decode!(&mut properties.response_info, buf);
195 }
196 Property::ServerReference => {
197 property_decode!(&mut properties.server_reference, buf);
198 }
199 Property::AuthenticationMethod => {
200 property_decode!(&mut properties.auth_method, buf);
201 }
202 Property::AuthenticationData => {
203 property_decode!(&mut properties.auth_data, buf);
204 }
205 _ => return Err(Error::PropertyMismatch),
206 }
207 }
208
209 Ok(Some(properties))
210 }
211}
212
213fn validate_connack_reason_code(code: ReasonCode) -> bool {
215 matches!(code.into(), 0 | 128..=138 | 140 | 144 | 149 | 151 | 153..=157 | 159)
216}
217
218#[derive(Debug, Clone, PartialEq, Eq)]
219struct ConnAckHeader {
220 code: ReasonCode,
221 session_present: bool,
222 properties: Option<ConnAckProperties>,
223}
224
225impl ConnAckHeader {
226 fn new(code: ReasonCode, session_present: bool, properties: Option<ConnAckProperties>) -> Self {
227 if !validate_connack_reason_code(code) {
228 panic!("Invalid reason code {code}");
229 }
230 ConnAckHeader {
231 code,
232 session_present,
233 properties,
234 }
235 }
236
237 fn encoded_len(&self) -> usize {
238 let properties_len = self
239 .properties
240 .as_ref()
241 .map(|properties| properties.encoded_len())
242 .unwrap_or(0);
243 1 + 1 + len_bytes(properties_len) + properties_len
244 }
245
246 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
247 let mut flags = 0u8;
248 flags.set_bit(0, self.session_present);
249
250 buf.put_u8(flags);
252
253 buf.put_u8(self.code.into());
255
256 let properties_len = self
257 .properties
258 .as_ref()
259 .map(|properties| properties.encoded_len())
260 .unwrap_or(0) as u32;
261
262 encode_variable_integer(buf, properties_len)?;
263
264 if let Some(properties) = self.properties.as_ref() {
265 properties.encode(buf);
266 }
267
268 Ok(())
269 }
270
271 fn decode(payload: &mut Bytes) -> Result<Self, Error> {
272 let conn_ack_flag = decode_byte(payload)?;
273 let code = decode_byte(payload)?.try_into()?;
274
275 if !validate_connack_reason_code(code) {
276 return Err(Error::InvalidReasonCode(code.into()));
277 }
278
279 let session_present = conn_ack_flag.get_bit(0);
280
281 let properties_len = decode_variable_integer(payload)? as usize;
282 if payload.len() < properties_len + len_bytes(properties_len) {
283 return Err(Error::MalformedPacket);
284 }
285
286 payload.advance(len_bytes(properties_len));
288
289 let mut frame = payload.split_to(properties_len);
290 let properties = ConnAckProperties::decode(&mut frame)?;
291
292 Ok(ConnAckHeader {
293 code,
294 session_present,
295 properties,
296 })
297 }
298}
299
300#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct ConnAck {
327 header: ConnAckHeader,
328}
329
330impl ConnAck {
331 pub fn new(
333 code: ReasonCode,
334 session_present: bool,
335 properties: Option<ConnAckProperties>,
336 ) -> Self {
337 ConnAck {
338 header: ConnAckHeader::new(code, session_present, properties),
339 }
340 }
341
342 pub fn code(&self) -> ReasonCode {
344 self.header.code
345 }
346
347 pub fn session_present(&self) -> bool {
349 self.header.session_present
350 }
351
352 pub fn properties(&self) -> Option<ConnAckProperties> {
354 self.header.properties.clone()
355 }
356}
357
358impl Encode for ConnAck {
359 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
361 let header = FixedHeader::new(PacketType::ConnAck, self.payload_len());
362 header.encode(buf)?;
363 self.header.encode(buf)
364 }
365
366 fn payload_len(&self) -> usize {
368 self.header.encoded_len()
369 }
370}
371
372impl Decode for ConnAck {
373 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
375 if packet.header.packet_type() != PacketType::ConnAck || !packet.header.flags().is_default()
376 {
377 return Err(Error::MalformedPacket);
378 }
379
380 let header = ConnAckHeader::decode(&mut packet.payload)?;
381 Ok(ConnAck { header })
382 }
383}