1use std::time::Duration;
9
10use super::property::{
11 Property, PropertyFrame, property_decode, property_decode_non_zero, property_encode,
12 property_len,
13};
14use crate::Error;
15use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
16use crate::codec::{Decode, Encode, RawPacket};
17use crate::protocol::util::len_bytes;
18use crate::protocol::v5::reason::ReasonCode;
19use crate::protocol::{FixedHeader, PacketType, QoS};
20use bit_field::BitField;
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22
23#[derive(Debug, Default, Clone, PartialEq, Eq)]
42pub struct ConnAckProperties {
43 pub session_expiry_interval: Option<Duration>,
45 pub receive_maximum: Option<u16>,
47 pub maximum_qos: Option<QoS>,
49 pub retain_available: Option<bool>,
51 pub maximum_packet_size: Option<u32>,
53 pub assigned_client_id: Option<String>,
55 pub topic_alias_maximum: Option<u16>,
57 pub reason: Option<String>,
59 pub user_properties: Vec<(String, String)>,
61 pub wildcard_subscription_available: Option<bool>,
63 pub subscription_id_available: Option<bool>,
65 pub shared_subscription_available: Option<bool>,
67 pub server_keep_alive: Option<u16>,
69 pub response_info: Option<String>,
71 pub server_reference: Option<String>,
73 pub auth_method: Option<String>,
75 pub auth_data: Option<Bytes>,
77}
78
79impl PropertyFrame for ConnAckProperties {
80 fn encoded_len(&self) -> usize {
82 let mut len = 0;
83
84 len += property_len!(&self.session_expiry_interval);
85 len += property_len!(&self.receive_maximum);
86 len += property_len!(&self.maximum_qos);
87 len += property_len!(&self.retain_available);
88 len += property_len!(&self.maximum_packet_size);
89 len += property_len!(&self.assigned_client_id);
90 len += property_len!(&self.topic_alias_maximum);
91 len += property_len!(&self.reason);
92 len += property_len!(&self.user_properties);
93 len += property_len!(&self.wildcard_subscription_available);
94 len += property_len!(&self.subscription_id_available);
95 len += property_len!(&self.shared_subscription_available);
96 len += property_len!(&self.server_keep_alive);
97 len += property_len!(&self.response_info);
98 len += property_len!(&self.server_reference);
99 len += property_len!(&self.auth_method);
100 len += property_len!(&self.auth_data);
101
102 len
103 }
104
105 fn encode(&self, buf: &mut BytesMut) {
107 property_encode!(
108 &self.session_expiry_interval,
109 Property::SessionExpiryInterval,
110 buf
111 );
112 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
113 property_encode!(&self.maximum_qos, Property::MaximumQoS, buf);
114 property_encode!(&self.retain_available, Property::RetainAvailable, buf);
115 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
116 property_encode!(
117 &self.assigned_client_id,
118 Property::AssignedClientIdentifier,
119 buf
120 );
121 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
122 property_encode!(&self.reason, Property::ReasonString, buf);
123 property_encode!(&self.user_properties, Property::UserProp, buf);
124 property_encode!(
125 &self.wildcard_subscription_available,
126 Property::WildcardSubscriptionAvailable,
127 buf
128 );
129 property_encode!(
130 &self.subscription_id_available,
131 Property::SubscriptionIdentifierAvailable,
132 buf
133 );
134 property_encode!(
135 &self.shared_subscription_available,
136 Property::SharedSubscriptionAvailable,
137 buf
138 );
139 property_encode!(&self.server_keep_alive, Property::ServerKeepAlive, buf);
140 property_encode!(&self.response_info, Property::ResponseInformation, buf);
141 property_encode!(&self.server_reference, Property::ServerReference, buf);
142 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
143 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
144 }
145
146 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
148 if buf.is_empty() {
149 return Ok(None);
150 }
151
152 let mut properties = ConnAckProperties::default();
153
154 while buf.has_remaining() {
155 let property: Property = decode_byte(buf)?.try_into()?;
156 match property {
157 Property::SessionExpiryInterval => {
158 property_decode!(&mut properties.session_expiry_interval, buf);
159 }
160 Property::ReceiveMaximum => {
161 property_decode_non_zero!(&mut properties.receive_maximum, buf);
162 }
163 Property::MaximumQoS => {
164 property_decode!(&mut properties.maximum_qos, buf);
165 }
166 Property::RetainAvailable => {
167 property_decode!(&mut properties.retain_available, buf);
168 }
169 Property::MaximumPacketSize => {
170 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
171 }
172 Property::AssignedClientIdentifier => {
173 property_decode!(&mut properties.assigned_client_id, buf);
174 }
175 Property::TopicAliasMaximum => {
176 property_decode!(&mut properties.topic_alias_maximum, buf);
177 }
178 Property::ReasonString => {
179 property_decode!(&mut properties.reason, buf);
180 }
181 Property::UserProp => {
182 property_decode!(&mut properties.user_properties, buf);
183 }
184 Property::WildcardSubscriptionAvailable => {
185 property_decode!(&mut properties.wildcard_subscription_available, buf);
186 }
187 Property::SubscriptionIdentifierAvailable => {
188 property_decode!(&mut properties.subscription_id_available, buf);
189 }
190 Property::SharedSubscriptionAvailable => {
191 property_decode!(&mut properties.shared_subscription_available, buf);
192 }
193 Property::ServerKeepAlive => {
194 property_decode!(&mut properties.server_keep_alive, buf);
195 }
196 Property::ResponseInformation => {
197 property_decode!(&mut properties.response_info, buf);
198 }
199 Property::ServerReference => {
200 property_decode!(&mut properties.server_reference, buf);
201 }
202 Property::AuthenticationMethod => {
203 property_decode!(&mut properties.auth_method, buf);
204 }
205 Property::AuthenticationData => {
206 property_decode!(&mut properties.auth_data, buf);
207 }
208 _ => return Err(Error::PropertyMismatch),
209 }
210 }
211
212 Ok(Some(properties))
213 }
214}
215
216fn validate_connack_reason_code(code: ReasonCode) -> bool {
218 matches!(code.into(), 0 | 128..=138 | 140 | 144 | 149 | 151 | 153..=157 | 159)
219}
220
221#[derive(Debug, Clone, PartialEq, Eq)]
222struct ConnAckHeader {
223 code: ReasonCode,
224 session_present: bool,
225 properties: Option<ConnAckProperties>,
226}
227
228impl ConnAckHeader {
229 fn new(code: ReasonCode, session_present: bool, properties: Option<ConnAckProperties>) -> Self {
230 if !validate_connack_reason_code(code) {
231 panic!("Invalid reason code {code}");
232 }
233 ConnAckHeader {
234 code,
235 session_present,
236 properties,
237 }
238 }
239
240 fn encoded_len(&self) -> usize {
241 let properties_len = self
242 .properties
243 .as_ref()
244 .map(|properties| properties.encoded_len())
245 .unwrap_or(0);
246 1 + 1 + len_bytes(properties_len) + properties_len
247 }
248
249 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
250 let mut flags = 0u8;
251 flags.set_bit(0, self.session_present);
252
253 buf.put_u8(flags);
255
256 buf.put_u8(self.code.into());
258
259 let properties_len = self
260 .properties
261 .as_ref()
262 .map(|properties| properties.encoded_len())
263 .unwrap_or(0) as u32;
264
265 encode_variable_integer(buf, properties_len)?;
266
267 if let Some(properties) = self.properties.as_ref() {
268 properties.encode(buf);
269 }
270
271 Ok(())
272 }
273
274 fn decode(payload: &mut Bytes) -> Result<Self, Error> {
275 let conn_ack_flag = decode_byte(payload)?;
276 let code = decode_byte(payload)?.try_into()?;
277
278 if !validate_connack_reason_code(code) {
279 return Err(Error::InvalidReasonCode(code.into()));
280 }
281
282 let session_present = conn_ack_flag.get_bit(0);
283
284 let properties_len = decode_variable_integer(payload)? as usize;
285 if payload.len() < properties_len + len_bytes(properties_len) {
286 return Err(Error::MalformedPacket);
287 }
288
289 payload.advance(len_bytes(properties_len));
291
292 let mut frame = payload.split_to(properties_len);
293 let properties = ConnAckProperties::decode(&mut frame)?;
294
295 Ok(ConnAckHeader {
296 code,
297 session_present,
298 properties,
299 })
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct ConnAck {
331 header: ConnAckHeader,
332}
333
334impl ConnAck {
335 pub fn new(
337 code: ReasonCode,
338 session_present: bool,
339 properties: Option<ConnAckProperties>,
340 ) -> Self {
341 ConnAck {
342 header: ConnAckHeader::new(code, session_present, properties),
343 }
344 }
345
346 pub fn code(&self) -> ReasonCode {
348 self.header.code
349 }
350
351 pub fn session_present(&self) -> bool {
353 self.header.session_present
354 }
355
356 pub fn properties(&self) -> Option<ConnAckProperties> {
358 self.header.properties.clone()
359 }
360}
361
362impl Encode for ConnAck {
363 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
365 let header = FixedHeader::new(PacketType::ConnAck, self.payload_len());
366 header.encode(buf)?;
367 self.header.encode(buf)
368 }
369
370 fn payload_len(&self) -> usize {
372 self.header.encoded_len()
373 }
374}
375
376impl Decode for ConnAck {
377 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
379 if packet.header.packet_type() != PacketType::ConnAck || !packet.header.flags().is_default()
380 {
381 return Err(Error::MalformedPacket);
382 }
383
384 let header = ConnAckHeader::decode(&mut packet.payload)?;
385 Ok(ConnAck { header })
386 }
387}