1use super::{
2 Error, FixedHeader, PropertyType, len_len, length, property, read_mqtt_bytes, read_mqtt_string,
3 read_u8, read_u16, read_u32, write_mqtt_bytes, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ConnectReturnCode {
11 Success,
12 RefusedProtocolVersion,
13 BadClientId,
14 ServiceUnavailable,
15 UnspecifiedError,
16 MalformedPacket,
17 ProtocolError,
18 ImplementationSpecificError,
19 UnsupportedProtocolVersion,
20 ClientIdentifierNotValid,
21 BadUserNamePassword,
22 NotAuthorized,
23 ServerUnavailable,
24 ServerBusy,
25 Banned,
26 BadAuthenticationMethod,
27 TopicNameInvalid,
28 PacketTooLarge,
29 QuotaExceeded,
30 PayloadFormatInvalid,
31 RetainNotSupported,
32 QoSNotSupported,
33 UseAnotherServer,
34 ServerMoved,
35 ConnectionRateExceeded,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct ConnAck {
41 pub session_present: bool,
42 pub code: ConnectReturnCode,
43 pub properties: Option<ConnAckProperties>,
44}
45
46impl ConnAck {
47 fn len(&self) -> usize {
48 let mut len = 1 + 1; if let Some(p) = &self.properties {
52 let properties_len = p.len();
53 let properties_len_len = len_len(properties_len);
54 len += properties_len_len + properties_len;
55 } else {
56 len += 1;
57 }
58
59 len
60 }
61
62 pub fn size(&self) -> usize {
63 let len = self.len();
64 let remaining_len_size = len_len(len);
65
66 1 + remaining_len_size + len
67 }
68
69 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
70 let variable_header_index = fixed_header.header_len;
71 bytes.advance(variable_header_index);
72
73 let flags = read_u8(&mut bytes)?;
74 let return_code = read_u8(&mut bytes)?;
75 let properties = ConnAckProperties::read(&mut bytes)?;
76
77 let session_present = (flags & 0x01) == 1;
78 let code = connect_return(return_code)?;
79 let connack = Self {
80 session_present,
81 code,
82 properties,
83 };
84
85 Ok(connack)
86 }
87
88 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
89 let len = Self::len(self);
90 buffer.put_u8(0x20);
91
92 let count = write_remaining_length(buffer, len)?;
93 buffer.put_u8(u8::from(self.session_present));
94 buffer.put_u8(connect_code(self.code));
95
96 if let Some(p) = &self.properties {
97 p.write(buffer)?;
98 } else {
99 write_remaining_length(buffer, 0)?;
100 }
101
102 Ok(1 + count + len)
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct ConnAckProperties {
108 pub session_expiry_interval: Option<u32>,
109 pub receive_max: Option<u16>,
110 pub max_qos: Option<u8>,
111 pub retain_available: Option<u8>,
112 pub max_packet_size: Option<u32>,
113 pub assigned_client_identifier: Option<String>,
114 pub topic_alias_max: Option<u16>,
115 pub reason_string: Option<String>,
116 pub user_properties: Vec<(String, String)>,
117 pub wildcard_subscription_available: Option<u8>,
118 pub subscription_identifiers_available: Option<u8>,
119 pub shared_subscription_available: Option<u8>,
120 pub server_keep_alive: Option<u16>,
121 pub response_information: Option<String>,
122 pub server_reference: Option<String>,
123 pub authentication_method: Option<String>,
124 pub authentication_data: Option<Bytes>,
125}
126
127impl ConnAckProperties {
128 fn len(&self) -> usize {
129 let mut len = 0;
130
131 if self.session_expiry_interval.is_some() {
132 len += 1 + 4;
133 }
134
135 if self.receive_max.is_some() {
136 len += 1 + 2;
137 }
138
139 if self.max_qos.is_some() {
140 len += 1 + 1;
141 }
142
143 if self.retain_available.is_some() {
144 len += 1 + 1;
145 }
146
147 if self.max_packet_size.is_some() {
148 len += 1 + 4;
149 }
150
151 if let Some(id) = &self.assigned_client_identifier {
152 len += 1 + 2 + id.len();
153 }
154
155 if self.topic_alias_max.is_some() {
156 len += 1 + 2;
157 }
158
159 if let Some(reason) = &self.reason_string {
160 len += 1 + 2 + reason.len();
161 }
162
163 for (key, value) in &self.user_properties {
164 len += 1 + 2 + key.len() + 2 + value.len();
165 }
166
167 if self.wildcard_subscription_available.is_some() {
168 len += 1 + 1;
169 }
170
171 if self.subscription_identifiers_available.is_some() {
172 len += 1 + 1;
173 }
174
175 if self.shared_subscription_available.is_some() {
176 len += 1 + 1;
177 }
178
179 if self.server_keep_alive.is_some() {
180 len += 1 + 2;
181 }
182
183 if let Some(info) = &self.response_information {
184 len += 1 + 2 + info.len();
185 }
186
187 if let Some(reference) = &self.server_reference {
188 len += 1 + 2 + reference.len();
189 }
190
191 if let Some(authentication_method) = &self.authentication_method {
192 len += 1 + 2 + authentication_method.len();
193 }
194
195 if let Some(authentication_data) = &self.authentication_data {
196 len += 1 + 2 + authentication_data.len();
197 }
198
199 len
200 }
201
202 pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
203 let mut properties = Self {
204 session_expiry_interval: None,
205 receive_max: None,
206 max_qos: None,
207 retain_available: None,
208 max_packet_size: None,
209 assigned_client_identifier: None,
210 topic_alias_max: None,
211 reason_string: None,
212 user_properties: Vec::new(),
213 wildcard_subscription_available: None,
214 subscription_identifiers_available: None,
215 shared_subscription_available: None,
216 server_keep_alive: None,
217 response_information: None,
218 server_reference: None,
219 authentication_method: None,
220 authentication_data: None,
221 };
222
223 let (properties_len_len, properties_len) = length(bytes.iter())?;
224 bytes.advance(properties_len_len);
225 if properties_len == 0 {
226 return Ok(None);
227 }
228
229 let mut cursor = 0;
230 while cursor < properties_len {
232 let prop = read_u8(bytes)?;
233 cursor += 1;
234 read_connack_property(property(prop)?, bytes, &mut cursor, &mut properties)?;
235 }
236
237 Ok(Some(properties))
238 }
239
240 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
241 let len = self.len();
242 write_remaining_length(buffer, len)?;
243
244 if let Some(session_expiry_interval) = self.session_expiry_interval {
245 buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
246 buffer.put_u32(session_expiry_interval);
247 }
248
249 if let Some(receive_maximum) = self.receive_max {
250 buffer.put_u8(PropertyType::ReceiveMaximum as u8);
251 buffer.put_u16(receive_maximum);
252 }
253
254 if let Some(qos) = self.max_qos {
255 buffer.put_u8(PropertyType::MaximumQos as u8);
256 buffer.put_u8(qos);
257 }
258
259 if let Some(retain_available) = self.retain_available {
260 buffer.put_u8(PropertyType::RetainAvailable as u8);
261 buffer.put_u8(retain_available);
262 }
263
264 if let Some(max_packet_size) = self.max_packet_size {
265 buffer.put_u8(PropertyType::MaximumPacketSize as u8);
266 buffer.put_u32(max_packet_size);
267 }
268
269 if let Some(id) = &self.assigned_client_identifier {
270 buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
271 write_mqtt_string(buffer, id);
272 }
273
274 if let Some(topic_alias_max) = self.topic_alias_max {
275 buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
276 buffer.put_u16(topic_alias_max);
277 }
278
279 if let Some(reason) = &self.reason_string {
280 buffer.put_u8(PropertyType::ReasonString as u8);
281 write_mqtt_string(buffer, reason);
282 }
283
284 for (key, value) in &self.user_properties {
285 buffer.put_u8(PropertyType::UserProperty as u8);
286 write_mqtt_string(buffer, key);
287 write_mqtt_string(buffer, value);
288 }
289
290 if let Some(w) = self.wildcard_subscription_available {
291 buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
292 buffer.put_u8(w);
293 }
294
295 if let Some(s) = self.subscription_identifiers_available {
296 buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
297 buffer.put_u8(s);
298 }
299
300 if let Some(s) = self.shared_subscription_available {
301 buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
302 buffer.put_u8(s);
303 }
304
305 if let Some(keep_alive) = self.server_keep_alive {
306 buffer.put_u8(PropertyType::ServerKeepAlive as u8);
307 buffer.put_u16(keep_alive);
308 }
309
310 if let Some(info) = &self.response_information {
311 buffer.put_u8(PropertyType::ResponseInformation as u8);
312 write_mqtt_string(buffer, info);
313 }
314
315 if let Some(reference) = &self.server_reference {
316 buffer.put_u8(PropertyType::ServerReference as u8);
317 write_mqtt_string(buffer, reference);
318 }
319
320 if let Some(authentication_method) = &self.authentication_method {
321 buffer.put_u8(PropertyType::AuthenticationMethod as u8);
322 write_mqtt_string(buffer, authentication_method);
323 }
324
325 if let Some(authentication_data) = &self.authentication_data {
326 buffer.put_u8(PropertyType::AuthenticationData as u8);
327 write_mqtt_bytes(buffer, authentication_data);
328 }
329
330 Ok(())
331 }
332}
333
334fn read_connack_property(
335 property_type: PropertyType,
336 bytes: &mut Bytes,
337 cursor: &mut usize,
338 properties: &mut ConnAckProperties,
339) -> Result<(), Error> {
340 match property_type {
341 PropertyType::SessionExpiryInterval => {
342 properties.session_expiry_interval = Some(read_u32(bytes)?);
343 *cursor += 4;
344 }
345 PropertyType::ReceiveMaximum => {
346 let receive_max = read_u16(bytes)?;
347 if receive_max == 0 {
348 return Err(Error::ProtocolError);
349 }
350 properties.receive_max = Some(receive_max);
351 *cursor += 2;
352 }
353 PropertyType::MaximumQos => {
354 properties.max_qos = Some(read_u8(bytes)?);
355 *cursor += 1;
356 }
357 PropertyType::RetainAvailable => {
358 properties.retain_available = Some(read_u8(bytes)?);
359 *cursor += 1;
360 }
361 PropertyType::AssignedClientIdentifier => {
362 let id = read_mqtt_string(bytes)?;
363 *cursor += 2 + id.len();
364 properties.assigned_client_identifier = Some(id);
365 }
366 PropertyType::MaximumPacketSize => {
367 properties.max_packet_size = Some(read_u32(bytes)?);
368 *cursor += 4;
369 }
370 PropertyType::TopicAliasMaximum => {
371 properties.topic_alias_max = Some(read_u16(bytes)?);
372 *cursor += 2;
373 }
374 PropertyType::ReasonString => {
375 let reason = read_mqtt_string(bytes)?;
376 *cursor += 2 + reason.len();
377 properties.reason_string = Some(reason);
378 }
379 PropertyType::UserProperty => {
380 let key = read_mqtt_string(bytes)?;
381 let value = read_mqtt_string(bytes)?;
382 *cursor += 2 + key.len() + 2 + value.len();
383 properties.user_properties.push((key, value));
384 }
385 PropertyType::WildcardSubscriptionAvailable => {
386 properties.wildcard_subscription_available = Some(read_u8(bytes)?);
387 *cursor += 1;
388 }
389 PropertyType::SubscriptionIdentifierAvailable => {
390 properties.subscription_identifiers_available = Some(read_u8(bytes)?);
391 *cursor += 1;
392 }
393 PropertyType::SharedSubscriptionAvailable => {
394 properties.shared_subscription_available = Some(read_u8(bytes)?);
395 *cursor += 1;
396 }
397 PropertyType::ServerKeepAlive => {
398 properties.server_keep_alive = Some(read_u16(bytes)?);
399 *cursor += 2;
400 }
401 PropertyType::ResponseInformation => {
402 let info = read_mqtt_string(bytes)?;
403 *cursor += 2 + info.len();
404 properties.response_information = Some(info);
405 }
406 PropertyType::ServerReference => {
407 let reference = read_mqtt_string(bytes)?;
408 *cursor += 2 + reference.len();
409 properties.server_reference = Some(reference);
410 }
411 PropertyType::AuthenticationMethod => {
412 let method = read_mqtt_string(bytes)?;
413 *cursor += 2 + method.len();
414 properties.authentication_method = Some(method);
415 }
416 PropertyType::AuthenticationData => {
417 let data = read_mqtt_bytes(bytes)?;
418 *cursor += 2 + data.len();
419 properties.authentication_data = Some(data);
420 }
421 _ => return Err(Error::InvalidPropertyType(property_type as u8)),
422 }
423
424 Ok(())
425}
426
427const fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
429 let code = match num {
430 0 => ConnectReturnCode::Success,
431 128 => ConnectReturnCode::UnspecifiedError,
432 129 => ConnectReturnCode::MalformedPacket,
433 130 => ConnectReturnCode::ProtocolError,
434 131 => ConnectReturnCode::ImplementationSpecificError,
435 132 => ConnectReturnCode::UnsupportedProtocolVersion,
436 133 => ConnectReturnCode::ClientIdentifierNotValid,
437 134 => ConnectReturnCode::BadUserNamePassword,
438 135 => ConnectReturnCode::NotAuthorized,
439 136 => ConnectReturnCode::ServerUnavailable,
440 137 => ConnectReturnCode::ServerBusy,
441 138 => ConnectReturnCode::Banned,
442 140 => ConnectReturnCode::BadAuthenticationMethod,
443 144 => ConnectReturnCode::TopicNameInvalid,
444 149 => ConnectReturnCode::PacketTooLarge,
445 151 => ConnectReturnCode::QuotaExceeded,
446 153 => ConnectReturnCode::PayloadFormatInvalid,
447 154 => ConnectReturnCode::RetainNotSupported,
448 155 => ConnectReturnCode::QoSNotSupported,
449 156 => ConnectReturnCode::UseAnotherServer,
450 157 => ConnectReturnCode::ServerMoved,
451 159 => ConnectReturnCode::ConnectionRateExceeded,
452 num => return Err(Error::InvalidConnectReturnCode(num)),
453 };
454
455 Ok(code)
456}
457
458fn connect_code(return_code: ConnectReturnCode) -> u8 {
459 match return_code {
460 ConnectReturnCode::Success => 0,
461 ConnectReturnCode::UnspecifiedError => 128,
462 ConnectReturnCode::MalformedPacket => 129,
463 ConnectReturnCode::ProtocolError => 130,
464 ConnectReturnCode::ImplementationSpecificError => 131,
465 ConnectReturnCode::UnsupportedProtocolVersion => 132,
466 ConnectReturnCode::ClientIdentifierNotValid => 133,
467 ConnectReturnCode::BadUserNamePassword => 134,
468 ConnectReturnCode::NotAuthorized => 135,
469 ConnectReturnCode::ServerUnavailable => 136,
470 ConnectReturnCode::ServerBusy => 137,
471 ConnectReturnCode::Banned => 138,
472 ConnectReturnCode::BadAuthenticationMethod => 140,
473 ConnectReturnCode::TopicNameInvalid => 144,
474 ConnectReturnCode::PacketTooLarge => 149,
475 ConnectReturnCode::QuotaExceeded => 151,
476 ConnectReturnCode::PayloadFormatInvalid => 153,
477 ConnectReturnCode::RetainNotSupported => 154,
478 ConnectReturnCode::QoSNotSupported => 155,
479 ConnectReturnCode::UseAnotherServer => 156,
480 ConnectReturnCode::ServerMoved => 157,
481 ConnectReturnCode::ConnectionRateExceeded => 159,
482 _ => unreachable!(),
483 }
484}
485
486#[cfg(test)]
487mod test {
488 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
489 use super::*;
490 use bytes::{Bytes, BytesMut};
491 use pretty_assertions::assert_eq;
492
493 #[test]
494 fn length_calculation() {
495 let mut dummy_bytes = BytesMut::new();
496 let connack_props = ConnAckProperties {
499 session_expiry_interval: None,
500 receive_max: None,
501 max_qos: None,
502 retain_available: None,
503 max_packet_size: None,
504 assigned_client_identifier: None,
505 topic_alias_max: None,
506 reason_string: None,
507 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
508 wildcard_subscription_available: None,
509 subscription_identifiers_available: None,
510 shared_subscription_available: None,
511 server_keep_alive: None,
512 response_information: None,
513 server_reference: None,
514 authentication_method: None,
515 authentication_data: None,
516 };
517
518 let connack_pkt = ConnAck {
519 session_present: false,
520 code: ConnectReturnCode::Success,
521 properties: Some(connack_props),
522 };
523
524 let size_from_size = connack_pkt.size();
525 let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
526 let size_from_bytes = dummy_bytes.len();
527
528 assert_eq!(size_from_write, size_from_bytes);
529 assert_eq!(size_from_size, size_from_bytes);
530 }
531
532 #[test]
533 fn read_rejects_receive_maximum_zero() {
534 let mut bytes = Bytes::from_static(&[
535 0x03, 0x21, 0x00, 0x00, ]);
539 let result = ConnAckProperties::read(&mut bytes);
540
541 assert!(matches!(result, Err(Error::ProtocolError)));
542 }
543}