1use crate::MqttString;
2
3use super::{
4 len_len, length, property, read_mqtt_bytes, read_mqtt_string, read_u16, read_u32, read_u8,
5 write_mqtt_bytes, write_mqtt_string, write_remaining_length, Debug, Error, FixedHeader,
6 PropertyType,
7};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ConnectReturnCode {
14 Success,
15 RefusedProtocolVersion,
16 BadClientId,
17 ServiceUnavailable,
18 UnspecifiedError,
19 MalformedPacket,
20 ProtocolError,
21 ImplementationSpecificError,
22 UnsupportedProtocolVersion,
23 ClientIdentifierNotValid,
24 BadUserNamePassword,
25 NotAuthorized,
26 ServerUnavailable,
27 ServerBusy,
28 Banned,
29 BadAuthenticationMethod,
30 TopicNameInvalid,
31 PacketTooLarge,
32 QuotaExceeded,
33 PayloadFormatInvalid,
34 RetainNotSupported,
35 QoSNotSupported,
36 UseAnotherServer,
37 ServerMoved,
38 ConnectionRateExceeded,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct ConnAck {
44 pub session_present: bool,
45 pub code: ConnectReturnCode,
46 pub properties: Option<ConnAckProperties>,
47}
48
49impl ConnAck {
50 fn len(&self) -> usize {
51 let mut len = 1 + 1; if let Some(p) = &self.properties {
55 let properties_len = p.len();
56 let properties_len_len = len_len(properties_len);
57 len += properties_len_len + properties_len;
58 } else {
59 len += 1;
60 }
61
62 len
63 }
64
65 pub fn size(&self) -> usize {
66 let len = self.len();
67 let remaining_len_size = len_len(len);
68
69 1 + remaining_len_size + len
70 }
71
72 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<ConnAck, Error> {
73 let variable_header_index = fixed_header.fixed_header_len;
74 bytes.advance(variable_header_index);
75
76 let flags = read_u8(&mut bytes)?;
77 let return_code = read_u8(&mut bytes)?;
78 let properties = ConnAckProperties::read(&mut bytes)?;
79
80 let session_present = (flags & 0x01) == 1;
81 let code = connect_return(return_code)?;
82 let connack = ConnAck {
83 session_present,
84 code,
85 properties,
86 };
87
88 Ok(connack)
89 }
90
91 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
92 let len = Self::len(self);
93 buffer.put_u8(0x20);
94
95 let count = write_remaining_length(buffer, len)?;
96 buffer.put_u8(u8::from(self.session_present));
97 buffer.put_u8(connect_code(self.code));
98
99 if let Some(p) = &self.properties {
100 p.write(buffer)?;
101 } else {
102 write_remaining_length(buffer, 0)?;
103 }
104
105 Ok(1 + count + len)
106 }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Default)]
110pub struct ConnAckProperties {
111 pub session_expiry_interval: Option<u32>,
112 pub receive_max: Option<u16>,
113 pub max_qos: Option<u8>,
114 pub retain_available: Option<u8>,
115 pub max_packet_size: Option<u32>,
116 pub assigned_client_identifier: Option<MqttString>,
117 pub topic_alias_max: Option<u16>,
118 pub reason_string: Option<MqttString>,
119 pub user_properties: Vec<(MqttString, MqttString)>,
120 pub wildcard_subscription_available: Option<u8>,
121 pub subscription_identifiers_available: Option<u8>,
122 pub shared_subscription_available: Option<u8>,
123 pub server_keep_alive: Option<u16>,
124 pub response_information: Option<MqttString>,
125 pub server_reference: Option<MqttString>,
126 pub authentication_method: Option<MqttString>,
127 pub authentication_data: Option<Bytes>,
128}
129
130impl ConnAckProperties {
131 #[must_use]
132 pub fn new() -> Self {
133 Self::default()
134 }
135
136 fn len(&self) -> usize {
137 let mut len = 0;
138
139 if self.session_expiry_interval.is_some() {
140 len += 1 + 4;
141 }
142
143 if self.receive_max.is_some() {
144 len += 1 + 2;
145 }
146
147 if self.max_qos.is_some() {
148 len += 1 + 1;
149 }
150
151 if self.retain_available.is_some() {
152 len += 1 + 1;
153 }
154
155 if self.max_packet_size.is_some() {
156 len += 1 + 4;
157 }
158
159 if let Some(id) = &self.assigned_client_identifier {
160 len += 1 + 2 + id.len();
161 }
162
163 if self.topic_alias_max.is_some() {
164 len += 1 + 2;
165 }
166
167 if let Some(reason) = &self.reason_string {
168 len += 1 + 2 + reason.len();
169 }
170
171 for (key, value) in &self.user_properties {
172 len += 1 + 2 + key.len() + 2 + value.len();
173 }
174
175 if self.wildcard_subscription_available.is_some() {
176 len += 1 + 1;
177 }
178
179 if self.subscription_identifiers_available.is_some() {
180 len += 1 + 1;
181 }
182
183 if self.shared_subscription_available.is_some() {
184 len += 1 + 1;
185 }
186
187 if self.server_keep_alive.is_some() {
188 len += 1 + 2;
189 }
190
191 if let Some(info) = &self.response_information {
192 len += 1 + 2 + info.len();
193 }
194
195 if let Some(reference) = &self.server_reference {
196 len += 1 + 2 + reference.len();
197 }
198
199 if let Some(authentication_method) = &self.authentication_method {
200 len += 1 + 2 + authentication_method.len();
201 }
202
203 if let Some(authentication_data) = &self.authentication_data {
204 len += 1 + 2 + authentication_data.len();
205 }
206
207 len
208 }
209
210 #[allow(clippy::too_many_lines)]
211 pub fn read(bytes: &mut Bytes) -> Result<Option<ConnAckProperties>, Error> {
212 let mut session_expiry_interval = None;
213 let mut receive_max = None;
214 let mut max_qos = None;
215 let mut retain_available = None;
216 let mut max_packet_size = None;
217 let mut assigned_client_identifier = None;
218 let mut topic_alias_max = None;
219 let mut reason_string = None;
220 let mut user_properties = Vec::new();
221 let mut wildcard_subscription_available = None;
222 let mut subscription_identifiers_available = None;
223 let mut shared_subscription_available = None;
224 let mut server_keep_alive = None;
225 let mut response_information = None;
226 let mut server_reference = None;
227 let mut authentication_method = None;
228 let mut authentication_data = None;
229
230 let (properties_len_len, properties_len) = length(bytes.iter())?;
231 bytes.advance(properties_len_len);
232 if properties_len == 0 {
233 return Ok(None);
234 }
235
236 let mut cursor = 0;
237 while cursor < properties_len {
239 let prop = read_u8(bytes)?;
240 cursor += 1;
241
242 match property(prop)? {
243 PropertyType::SessionExpiryInterval => {
244 session_expiry_interval = Some(read_u32(bytes)?);
245 cursor += 4;
246 }
247 PropertyType::ReceiveMaximum => {
248 receive_max = Some(read_u16(bytes)?);
249 cursor += 2;
250 }
251 PropertyType::MaximumQos => {
252 max_qos = Some(read_u8(bytes)?);
253 cursor += 1;
254 }
255 PropertyType::RetainAvailable => {
256 retain_available = Some(read_u8(bytes)?);
257 cursor += 1;
258 }
259 PropertyType::AssignedClientIdentifier => {
260 let id = read_mqtt_string(bytes)?;
261 cursor += 2 + id.len();
262 assigned_client_identifier = Some(id);
263 }
264 PropertyType::MaximumPacketSize => {
265 max_packet_size = Some(read_u32(bytes)?);
266 cursor += 4;
267 }
268 PropertyType::TopicAliasMaximum => {
269 topic_alias_max = Some(read_u16(bytes)?);
270 cursor += 2;
271 }
272 PropertyType::ReasonString => {
273 let reason = read_mqtt_string(bytes)?;
274 cursor += 2 + reason.len();
275 reason_string = Some(reason);
276 }
277 PropertyType::UserProperty => {
278 let key = read_mqtt_string(bytes)?;
279 let value = read_mqtt_string(bytes)?;
280 cursor += 2 + key.len() + 2 + value.len();
281 user_properties.push((key, value));
282 }
283 PropertyType::WildcardSubscriptionAvailable => {
284 wildcard_subscription_available = Some(read_u8(bytes)?);
285 cursor += 1;
286 }
287 PropertyType::SubscriptionIdentifierAvailable => {
288 subscription_identifiers_available = Some(read_u8(bytes)?);
289 cursor += 1;
290 }
291 PropertyType::SharedSubscriptionAvailable => {
292 shared_subscription_available = Some(read_u8(bytes)?);
293 cursor += 1;
294 }
295 PropertyType::ServerKeepAlive => {
296 server_keep_alive = Some(read_u16(bytes)?);
297 cursor += 2;
298 }
299 PropertyType::ResponseInformation => {
300 let info = read_mqtt_string(bytes)?;
301 cursor += 2 + info.len();
302 response_information = Some(info);
303 }
304 PropertyType::ServerReference => {
305 let reference = read_mqtt_string(bytes)?;
306 cursor += 2 + reference.len();
307 server_reference = Some(reference);
308 }
309 PropertyType::AuthenticationMethod => {
310 let method = read_mqtt_string(bytes)?;
311 cursor += 2 + method.len();
312 authentication_method = Some(method);
313 }
314 PropertyType::AuthenticationData => {
315 let data = read_mqtt_bytes(bytes)?;
316 cursor += 2 + data.len();
317 authentication_data = Some(data);
318 }
319 _ => return Err(Error::InvalidPropertyType(prop)),
320 }
321 }
322
323 Ok(Some(ConnAckProperties {
324 session_expiry_interval,
325 receive_max,
326 max_qos,
327 retain_available,
328 max_packet_size,
329 assigned_client_identifier,
330 topic_alias_max,
331 reason_string,
332 user_properties,
333 wildcard_subscription_available,
334 subscription_identifiers_available,
335 shared_subscription_available,
336 server_keep_alive,
337 response_information,
338 server_reference,
339 authentication_method,
340 authentication_data,
341 }))
342 }
343
344 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
345 let len = self.len();
346 write_remaining_length(buffer, len)?;
347
348 if let Some(session_expiry_interval) = self.session_expiry_interval {
349 buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
350 buffer.put_u32(session_expiry_interval);
351 }
352
353 if let Some(receive_maximum) = self.receive_max {
354 buffer.put_u8(PropertyType::ReceiveMaximum as u8);
355 buffer.put_u16(receive_maximum);
356 }
357
358 if let Some(qos) = self.max_qos {
359 buffer.put_u8(PropertyType::MaximumQos as u8);
360 buffer.put_u8(qos);
361 }
362
363 if let Some(retain_available) = self.retain_available {
364 buffer.put_u8(PropertyType::RetainAvailable as u8);
365 buffer.put_u8(retain_available);
366 }
367
368 if let Some(max_packet_size) = self.max_packet_size {
369 buffer.put_u8(PropertyType::MaximumPacketSize as u8);
370 buffer.put_u32(max_packet_size);
371 }
372
373 if let Some(id) = &self.assigned_client_identifier {
374 buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
375 write_mqtt_string(buffer, id)?;
376 }
377
378 if let Some(topic_alias_max) = self.topic_alias_max {
379 buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
380 buffer.put_u16(topic_alias_max);
381 }
382
383 if let Some(reason) = &self.reason_string {
384 buffer.put_u8(PropertyType::ReasonString as u8);
385 write_mqtt_string(buffer, reason)?;
386 }
387
388 for (key, value) in &self.user_properties {
389 buffer.put_u8(PropertyType::UserProperty as u8);
390 write_mqtt_string(buffer, key)?;
391 write_mqtt_string(buffer, value)?;
392 }
393
394 if let Some(w) = self.wildcard_subscription_available {
395 buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
396 buffer.put_u8(w);
397 }
398
399 if let Some(s) = self.subscription_identifiers_available {
400 buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
401 buffer.put_u8(s);
402 }
403
404 if let Some(s) = self.shared_subscription_available {
405 buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
406 buffer.put_u8(s);
407 }
408
409 if let Some(keep_alive) = self.server_keep_alive {
410 buffer.put_u8(PropertyType::ServerKeepAlive as u8);
411 buffer.put_u16(keep_alive);
412 }
413
414 if let Some(info) = &self.response_information {
415 buffer.put_u8(PropertyType::ResponseInformation as u8);
416 write_mqtt_string(buffer, info)?;
417 }
418
419 if let Some(reference) = &self.server_reference {
420 buffer.put_u8(PropertyType::ServerReference as u8);
421 write_mqtt_string(buffer, reference)?;
422 }
423
424 if let Some(authentication_method) = &self.authentication_method {
425 buffer.put_u8(PropertyType::AuthenticationMethod as u8);
426 write_mqtt_string(buffer, authentication_method)?;
427 }
428
429 if let Some(authentication_data) = &self.authentication_data {
430 buffer.put_u8(PropertyType::AuthenticationData as u8);
431 write_mqtt_bytes(buffer, authentication_data)?;
432 }
433
434 Ok(())
435 }
436}
437
438fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
440 let code = match num {
441 0 => ConnectReturnCode::Success,
442 128 => ConnectReturnCode::UnspecifiedError,
443 129 => ConnectReturnCode::MalformedPacket,
444 130 => ConnectReturnCode::ProtocolError,
445 131 => ConnectReturnCode::ImplementationSpecificError,
446 132 => ConnectReturnCode::UnsupportedProtocolVersion,
447 133 => ConnectReturnCode::ClientIdentifierNotValid,
448 134 => ConnectReturnCode::BadUserNamePassword,
449 135 => ConnectReturnCode::NotAuthorized,
450 136 => ConnectReturnCode::ServerUnavailable,
451 137 => ConnectReturnCode::ServerBusy,
452 138 => ConnectReturnCode::Banned,
453 140 => ConnectReturnCode::BadAuthenticationMethod,
454 144 => ConnectReturnCode::TopicNameInvalid,
455 149 => ConnectReturnCode::PacketTooLarge,
456 151 => ConnectReturnCode::QuotaExceeded,
457 153 => ConnectReturnCode::PayloadFormatInvalid,
458 154 => ConnectReturnCode::RetainNotSupported,
459 155 => ConnectReturnCode::QoSNotSupported,
460 156 => ConnectReturnCode::UseAnotherServer,
461 157 => ConnectReturnCode::ServerMoved,
462 159 => ConnectReturnCode::ConnectionRateExceeded,
463 num => return Err(Error::InvalidConnectReturnCode(num)),
464 };
465
466 Ok(code)
467}
468
469fn connect_code(return_code: ConnectReturnCode) -> u8 {
470 match return_code {
471 ConnectReturnCode::Success => 0,
472 ConnectReturnCode::UnspecifiedError => 128,
473 ConnectReturnCode::MalformedPacket => 129,
474 ConnectReturnCode::ProtocolError => 130,
475 ConnectReturnCode::ImplementationSpecificError => 131,
476 ConnectReturnCode::UnsupportedProtocolVersion => 132,
477 ConnectReturnCode::ClientIdentifierNotValid => 133,
478 ConnectReturnCode::BadUserNamePassword => 134,
479 ConnectReturnCode::NotAuthorized => 135,
480 ConnectReturnCode::ServerUnavailable => 136,
481 ConnectReturnCode::ServerBusy => 137,
482 ConnectReturnCode::Banned => 138,
483 ConnectReturnCode::BadAuthenticationMethod => 140,
484 ConnectReturnCode::TopicNameInvalid => 144,
485 ConnectReturnCode::PacketTooLarge => 149,
486 ConnectReturnCode::QuotaExceeded => 151,
487 ConnectReturnCode::PayloadFormatInvalid => 153,
488 ConnectReturnCode::RetainNotSupported => 154,
489 ConnectReturnCode::QoSNotSupported => 155,
490 ConnectReturnCode::UseAnotherServer => 156,
491 ConnectReturnCode::ServerMoved => 157,
492 ConnectReturnCode::ConnectionRateExceeded => 159,
493 _ => unreachable!(),
494 }
495}
496
497#[cfg(test)]
498mod test {
499 use crate::test::read_write_packets;
500 use crate::Packet;
501
502 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
503 use super::*;
504 use bytes::BytesMut;
505 use pretty_assertions::assert_eq;
506
507 #[test]
508 fn length_calculation() {
509 let mut dummy_bytes = BytesMut::new();
510 let connack_props = ConnAckProperties {
513 session_expiry_interval: None,
514 receive_max: None,
515 max_qos: None,
516 retain_available: None,
517 max_packet_size: None,
518 assigned_client_identifier: None,
519 topic_alias_max: None,
520 reason_string: None,
521 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
522 wildcard_subscription_available: None,
523 subscription_identifiers_available: None,
524 shared_subscription_available: None,
525 server_keep_alive: None,
526 response_information: None,
527 server_reference: None,
528 authentication_method: None,
529 authentication_data: None,
530 };
531
532 let connack_pkt = ConnAck {
533 session_present: false,
534 code: ConnectReturnCode::Success,
535 properties: Some(connack_props),
536 };
537
538 let size_from_size = connack_pkt.size();
539 let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
540 let size_from_bytes = dummy_bytes.len();
541
542 assert_eq!(size_from_write, size_from_bytes);
543 assert_eq!(size_from_size, size_from_bytes);
544 }
545
546 #[test]
547 fn test_write_read() {
548 read_write_packets(write_read_provider());
549 }
550
551 fn write_read_provider() -> Vec<Packet> {
552 let mut properties = ConnAckProperties::new();
553 properties.assigned_client_identifier = Some("client".into());
554 vec![
555 Packet::ConnAck(ConnAck {
556 session_present: true,
557 code: ConnectReturnCode::Success,
558 properties: Some(properties),
559 }),
560 Packet::ConnAck(ConnAck {
561 session_present: false,
562 code: ConnectReturnCode::Success,
563 properties: None,
564 }),
565 Packet::ConnAck(ConnAck {
566 session_present: true,
567 code: ConnectReturnCode::BadAuthenticationMethod,
568 properties: None,
569 }),
570 Packet::ConnAck(ConnAck {
571 session_present: true,
572 code: ConnectReturnCode::Success,
573 properties: Some(ConnAckProperties {
574 session_expiry_interval: Some(10),
575 receive_max: Some(20),
576 max_qos: Some(1),
577 retain_available: Some(1),
578 max_packet_size: Some(30),
579 assigned_client_identifier: Some("client".into()),
580 topic_alias_max: Some(40),
581 reason_string: Some("reason".into()),
582 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
583 wildcard_subscription_available: Some(1),
584 subscription_identifiers_available: Some(1),
585 shared_subscription_available: Some(1),
586 server_keep_alive: Some(50),
587 response_information: Some("info".into()),
588 server_reference: Some("ref".into()),
589 authentication_method: Some("method".into()),
590 authentication_data: Some(Bytes::from_static(&[1, 2, 3])),
591 }),
592 }),
593 ]
594 }
595}