mqtt_bytes_v5/
connack.rs

1use crate::MqttString;
2
3use super::{
4    len_len, length, property, read_mqtt_bytes, read_mqtt_string, read_u16, read_u32, read_u8,
5    write_mqtt_bytes, write_mqtt_string, write_remaining_length, Debug, Error, FixedHeader,
6    PropertyType,
7};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9
10/// Return code in connack
11// This contains return codes for both MQTT v311 and v5
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ConnectReturnCode {
14    Success,
15    RefusedProtocolVersion,
16    BadClientId,
17    ServiceUnavailable,
18    UnspecifiedError,
19    MalformedPacket,
20    ProtocolError,
21    ImplementationSpecificError,
22    UnsupportedProtocolVersion,
23    ClientIdentifierNotValid,
24    BadUserNamePassword,
25    NotAuthorized,
26    ServerUnavailable,
27    ServerBusy,
28    Banned,
29    BadAuthenticationMethod,
30    TopicNameInvalid,
31    PacketTooLarge,
32    QuotaExceeded,
33    PayloadFormatInvalid,
34    RetainNotSupported,
35    QoSNotSupported,
36    UseAnotherServer,
37    ServerMoved,
38    ConnectionRateExceeded,
39}
40
41/// Acknowledgement to connect packet
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct ConnAck {
44    pub session_present: bool,
45    pub code: ConnectReturnCode,
46    pub properties: Option<ConnAckProperties>,
47}
48
49impl ConnAck {
50    fn len(&self) -> usize {
51        let mut len = 1  // session present
52                    + 1; // code
53
54        if let Some(p) = &self.properties {
55            let properties_len = p.len();
56            let properties_len_len = len_len(properties_len);
57            len += properties_len_len + properties_len;
58        } else {
59            len += 1;
60        }
61
62        len
63    }
64
65    pub fn size(&self) -> usize {
66        let len = self.len();
67        let remaining_len_size = len_len(len);
68
69        1 + remaining_len_size + len
70    }
71
72    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<ConnAck, Error> {
73        let variable_header_index = fixed_header.fixed_header_len;
74        bytes.advance(variable_header_index);
75
76        let flags = read_u8(&mut bytes)?;
77        let return_code = read_u8(&mut bytes)?;
78        let properties = ConnAckProperties::read(&mut bytes)?;
79
80        let session_present = (flags & 0x01) == 1;
81        let code = connect_return(return_code)?;
82        let connack = ConnAck {
83            session_present,
84            code,
85            properties,
86        };
87
88        Ok(connack)
89    }
90
91    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
92        let len = Self::len(self);
93        buffer.put_u8(0x20);
94
95        let count = write_remaining_length(buffer, len)?;
96        buffer.put_u8(u8::from(self.session_present));
97        buffer.put_u8(connect_code(self.code));
98
99        if let Some(p) = &self.properties {
100            p.write(buffer)?;
101        } else {
102            write_remaining_length(buffer, 0)?;
103        }
104
105        Ok(1 + count + len)
106    }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Default)]
110pub struct ConnAckProperties {
111    pub session_expiry_interval: Option<u32>,
112    pub receive_max: Option<u16>,
113    pub max_qos: Option<u8>,
114    pub retain_available: Option<u8>,
115    pub max_packet_size: Option<u32>,
116    pub assigned_client_identifier: Option<MqttString>,
117    pub topic_alias_max: Option<u16>,
118    pub reason_string: Option<MqttString>,
119    pub user_properties: Vec<(MqttString, MqttString)>,
120    pub wildcard_subscription_available: Option<u8>,
121    pub subscription_identifiers_available: Option<u8>,
122    pub shared_subscription_available: Option<u8>,
123    pub server_keep_alive: Option<u16>,
124    pub response_information: Option<MqttString>,
125    pub server_reference: Option<MqttString>,
126    pub authentication_method: Option<MqttString>,
127    pub authentication_data: Option<Bytes>,
128}
129
130impl ConnAckProperties {
131    #[must_use]
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    fn len(&self) -> usize {
137        let mut len = 0;
138
139        if self.session_expiry_interval.is_some() {
140            len += 1 + 4;
141        }
142
143        if self.receive_max.is_some() {
144            len += 1 + 2;
145        }
146
147        if self.max_qos.is_some() {
148            len += 1 + 1;
149        }
150
151        if self.retain_available.is_some() {
152            len += 1 + 1;
153        }
154
155        if self.max_packet_size.is_some() {
156            len += 1 + 4;
157        }
158
159        if let Some(id) = &self.assigned_client_identifier {
160            len += 1 + 2 + id.len();
161        }
162
163        if self.topic_alias_max.is_some() {
164            len += 1 + 2;
165        }
166
167        if let Some(reason) = &self.reason_string {
168            len += 1 + 2 + reason.len();
169        }
170
171        for (key, value) in &self.user_properties {
172            len += 1 + 2 + key.len() + 2 + value.len();
173        }
174
175        if self.wildcard_subscription_available.is_some() {
176            len += 1 + 1;
177        }
178
179        if self.subscription_identifiers_available.is_some() {
180            len += 1 + 1;
181        }
182
183        if self.shared_subscription_available.is_some() {
184            len += 1 + 1;
185        }
186
187        if self.server_keep_alive.is_some() {
188            len += 1 + 2;
189        }
190
191        if let Some(info) = &self.response_information {
192            len += 1 + 2 + info.len();
193        }
194
195        if let Some(reference) = &self.server_reference {
196            len += 1 + 2 + reference.len();
197        }
198
199        if let Some(authentication_method) = &self.authentication_method {
200            len += 1 + 2 + authentication_method.len();
201        }
202
203        if let Some(authentication_data) = &self.authentication_data {
204            len += 1 + 2 + authentication_data.len();
205        }
206
207        len
208    }
209
210    #[allow(clippy::too_many_lines)]
211    pub fn read(bytes: &mut Bytes) -> Result<Option<ConnAckProperties>, Error> {
212        let mut session_expiry_interval = None;
213        let mut receive_max = None;
214        let mut max_qos = None;
215        let mut retain_available = None;
216        let mut max_packet_size = None;
217        let mut assigned_client_identifier = None;
218        let mut topic_alias_max = None;
219        let mut reason_string = None;
220        let mut user_properties = Vec::new();
221        let mut wildcard_subscription_available = None;
222        let mut subscription_identifiers_available = None;
223        let mut shared_subscription_available = None;
224        let mut server_keep_alive = None;
225        let mut response_information = None;
226        let mut server_reference = None;
227        let mut authentication_method = None;
228        let mut authentication_data = None;
229
230        let (properties_len_len, properties_len) = length(bytes.iter())?;
231        bytes.advance(properties_len_len);
232        if properties_len == 0 {
233            return Ok(None);
234        }
235
236        let mut cursor = 0;
237        // read until cursor reaches property length. properties_len = 0 will skip this loop
238        while cursor < properties_len {
239            let prop = read_u8(bytes)?;
240            cursor += 1;
241
242            match property(prop)? {
243                PropertyType::SessionExpiryInterval => {
244                    session_expiry_interval = Some(read_u32(bytes)?);
245                    cursor += 4;
246                }
247                PropertyType::ReceiveMaximum => {
248                    receive_max = Some(read_u16(bytes)?);
249                    cursor += 2;
250                }
251                PropertyType::MaximumQos => {
252                    max_qos = Some(read_u8(bytes)?);
253                    cursor += 1;
254                }
255                PropertyType::RetainAvailable => {
256                    retain_available = Some(read_u8(bytes)?);
257                    cursor += 1;
258                }
259                PropertyType::AssignedClientIdentifier => {
260                    let id = read_mqtt_string(bytes)?;
261                    cursor += 2 + id.len();
262                    assigned_client_identifier = Some(id);
263                }
264                PropertyType::MaximumPacketSize => {
265                    max_packet_size = Some(read_u32(bytes)?);
266                    cursor += 4;
267                }
268                PropertyType::TopicAliasMaximum => {
269                    topic_alias_max = Some(read_u16(bytes)?);
270                    cursor += 2;
271                }
272                PropertyType::ReasonString => {
273                    let reason = read_mqtt_string(bytes)?;
274                    cursor += 2 + reason.len();
275                    reason_string = Some(reason);
276                }
277                PropertyType::UserProperty => {
278                    let key = read_mqtt_string(bytes)?;
279                    let value = read_mqtt_string(bytes)?;
280                    cursor += 2 + key.len() + 2 + value.len();
281                    user_properties.push((key, value));
282                }
283                PropertyType::WildcardSubscriptionAvailable => {
284                    wildcard_subscription_available = Some(read_u8(bytes)?);
285                    cursor += 1;
286                }
287                PropertyType::SubscriptionIdentifierAvailable => {
288                    subscription_identifiers_available = Some(read_u8(bytes)?);
289                    cursor += 1;
290                }
291                PropertyType::SharedSubscriptionAvailable => {
292                    shared_subscription_available = Some(read_u8(bytes)?);
293                    cursor += 1;
294                }
295                PropertyType::ServerKeepAlive => {
296                    server_keep_alive = Some(read_u16(bytes)?);
297                    cursor += 2;
298                }
299                PropertyType::ResponseInformation => {
300                    let info = read_mqtt_string(bytes)?;
301                    cursor += 2 + info.len();
302                    response_information = Some(info);
303                }
304                PropertyType::ServerReference => {
305                    let reference = read_mqtt_string(bytes)?;
306                    cursor += 2 + reference.len();
307                    server_reference = Some(reference);
308                }
309                PropertyType::AuthenticationMethod => {
310                    let method = read_mqtt_string(bytes)?;
311                    cursor += 2 + method.len();
312                    authentication_method = Some(method);
313                }
314                PropertyType::AuthenticationData => {
315                    let data = read_mqtt_bytes(bytes)?;
316                    cursor += 2 + data.len();
317                    authentication_data = Some(data);
318                }
319                _ => return Err(Error::InvalidPropertyType(prop)),
320            }
321        }
322
323        Ok(Some(ConnAckProperties {
324            session_expiry_interval,
325            receive_max,
326            max_qos,
327            retain_available,
328            max_packet_size,
329            assigned_client_identifier,
330            topic_alias_max,
331            reason_string,
332            user_properties,
333            wildcard_subscription_available,
334            subscription_identifiers_available,
335            shared_subscription_available,
336            server_keep_alive,
337            response_information,
338            server_reference,
339            authentication_method,
340            authentication_data,
341        }))
342    }
343
344    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
345        let len = self.len();
346        write_remaining_length(buffer, len)?;
347
348        if let Some(session_expiry_interval) = self.session_expiry_interval {
349            buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
350            buffer.put_u32(session_expiry_interval);
351        }
352
353        if let Some(receive_maximum) = self.receive_max {
354            buffer.put_u8(PropertyType::ReceiveMaximum as u8);
355            buffer.put_u16(receive_maximum);
356        }
357
358        if let Some(qos) = self.max_qos {
359            buffer.put_u8(PropertyType::MaximumQos as u8);
360            buffer.put_u8(qos);
361        }
362
363        if let Some(retain_available) = self.retain_available {
364            buffer.put_u8(PropertyType::RetainAvailable as u8);
365            buffer.put_u8(retain_available);
366        }
367
368        if let Some(max_packet_size) = self.max_packet_size {
369            buffer.put_u8(PropertyType::MaximumPacketSize as u8);
370            buffer.put_u32(max_packet_size);
371        }
372
373        if let Some(id) = &self.assigned_client_identifier {
374            buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
375            write_mqtt_string(buffer, id)?;
376        }
377
378        if let Some(topic_alias_max) = self.topic_alias_max {
379            buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
380            buffer.put_u16(topic_alias_max);
381        }
382
383        if let Some(reason) = &self.reason_string {
384            buffer.put_u8(PropertyType::ReasonString as u8);
385            write_mqtt_string(buffer, reason)?;
386        }
387
388        for (key, value) in &self.user_properties {
389            buffer.put_u8(PropertyType::UserProperty as u8);
390            write_mqtt_string(buffer, key)?;
391            write_mqtt_string(buffer, value)?;
392        }
393
394        if let Some(w) = self.wildcard_subscription_available {
395            buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
396            buffer.put_u8(w);
397        }
398
399        if let Some(s) = self.subscription_identifiers_available {
400            buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
401            buffer.put_u8(s);
402        }
403
404        if let Some(s) = self.shared_subscription_available {
405            buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
406            buffer.put_u8(s);
407        }
408
409        if let Some(keep_alive) = self.server_keep_alive {
410            buffer.put_u8(PropertyType::ServerKeepAlive as u8);
411            buffer.put_u16(keep_alive);
412        }
413
414        if let Some(info) = &self.response_information {
415            buffer.put_u8(PropertyType::ResponseInformation as u8);
416            write_mqtt_string(buffer, info)?;
417        }
418
419        if let Some(reference) = &self.server_reference {
420            buffer.put_u8(PropertyType::ServerReference as u8);
421            write_mqtt_string(buffer, reference)?;
422        }
423
424        if let Some(authentication_method) = &self.authentication_method {
425            buffer.put_u8(PropertyType::AuthenticationMethod as u8);
426            write_mqtt_string(buffer, authentication_method)?;
427        }
428
429        if let Some(authentication_data) = &self.authentication_data {
430            buffer.put_u8(PropertyType::AuthenticationData as u8);
431            write_mqtt_bytes(buffer, authentication_data)?;
432        }
433
434        Ok(())
435    }
436}
437
438/// Connection return code type
439fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
440    let code = match num {
441        0 => ConnectReturnCode::Success,
442        128 => ConnectReturnCode::UnspecifiedError,
443        129 => ConnectReturnCode::MalformedPacket,
444        130 => ConnectReturnCode::ProtocolError,
445        131 => ConnectReturnCode::ImplementationSpecificError,
446        132 => ConnectReturnCode::UnsupportedProtocolVersion,
447        133 => ConnectReturnCode::ClientIdentifierNotValid,
448        134 => ConnectReturnCode::BadUserNamePassword,
449        135 => ConnectReturnCode::NotAuthorized,
450        136 => ConnectReturnCode::ServerUnavailable,
451        137 => ConnectReturnCode::ServerBusy,
452        138 => ConnectReturnCode::Banned,
453        140 => ConnectReturnCode::BadAuthenticationMethod,
454        144 => ConnectReturnCode::TopicNameInvalid,
455        149 => ConnectReturnCode::PacketTooLarge,
456        151 => ConnectReturnCode::QuotaExceeded,
457        153 => ConnectReturnCode::PayloadFormatInvalid,
458        154 => ConnectReturnCode::RetainNotSupported,
459        155 => ConnectReturnCode::QoSNotSupported,
460        156 => ConnectReturnCode::UseAnotherServer,
461        157 => ConnectReturnCode::ServerMoved,
462        159 => ConnectReturnCode::ConnectionRateExceeded,
463        num => return Err(Error::InvalidConnectReturnCode(num)),
464    };
465
466    Ok(code)
467}
468
469fn connect_code(return_code: ConnectReturnCode) -> u8 {
470    match return_code {
471        ConnectReturnCode::Success => 0,
472        ConnectReturnCode::UnspecifiedError => 128,
473        ConnectReturnCode::MalformedPacket => 129,
474        ConnectReturnCode::ProtocolError => 130,
475        ConnectReturnCode::ImplementationSpecificError => 131,
476        ConnectReturnCode::UnsupportedProtocolVersion => 132,
477        ConnectReturnCode::ClientIdentifierNotValid => 133,
478        ConnectReturnCode::BadUserNamePassword => 134,
479        ConnectReturnCode::NotAuthorized => 135,
480        ConnectReturnCode::ServerUnavailable => 136,
481        ConnectReturnCode::ServerBusy => 137,
482        ConnectReturnCode::Banned => 138,
483        ConnectReturnCode::BadAuthenticationMethod => 140,
484        ConnectReturnCode::TopicNameInvalid => 144,
485        ConnectReturnCode::PacketTooLarge => 149,
486        ConnectReturnCode::QuotaExceeded => 151,
487        ConnectReturnCode::PayloadFormatInvalid => 153,
488        ConnectReturnCode::RetainNotSupported => 154,
489        ConnectReturnCode::QoSNotSupported => 155,
490        ConnectReturnCode::UseAnotherServer => 156,
491        ConnectReturnCode::ServerMoved => 157,
492        ConnectReturnCode::ConnectionRateExceeded => 159,
493        _ => unreachable!(),
494    }
495}
496
497#[cfg(test)]
498mod test {
499    use crate::test::read_write_packets;
500    use crate::Packet;
501
502    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
503    use super::*;
504    use bytes::BytesMut;
505    use pretty_assertions::assert_eq;
506
507    #[test]
508    fn length_calculation() {
509        let mut dummy_bytes = BytesMut::new();
510        // Use user_properties to pad the size to exceed ~128 bytes to make the
511        // remaining_length field in the packet be 2 bytes long.
512        let connack_props = ConnAckProperties {
513            session_expiry_interval: None,
514            receive_max: None,
515            max_qos: None,
516            retain_available: None,
517            max_packet_size: None,
518            assigned_client_identifier: None,
519            topic_alias_max: None,
520            reason_string: None,
521            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
522            wildcard_subscription_available: None,
523            subscription_identifiers_available: None,
524            shared_subscription_available: None,
525            server_keep_alive: None,
526            response_information: None,
527            server_reference: None,
528            authentication_method: None,
529            authentication_data: None,
530        };
531
532        let connack_pkt = ConnAck {
533            session_present: false,
534            code: ConnectReturnCode::Success,
535            properties: Some(connack_props),
536        };
537
538        let size_from_size = connack_pkt.size();
539        let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
540        let size_from_bytes = dummy_bytes.len();
541
542        assert_eq!(size_from_write, size_from_bytes);
543        assert_eq!(size_from_size, size_from_bytes);
544    }
545
546    #[test]
547    fn test_write_read() {
548        read_write_packets(write_read_provider());
549    }
550
551    fn write_read_provider() -> Vec<Packet> {
552        let mut properties = ConnAckProperties::new();
553        properties.assigned_client_identifier = Some("client".into());
554        vec![
555            Packet::ConnAck(ConnAck {
556                session_present: true,
557                code: ConnectReturnCode::Success,
558                properties: Some(properties),
559            }),
560            Packet::ConnAck(ConnAck {
561                session_present: false,
562                code: ConnectReturnCode::Success,
563                properties: None,
564            }),
565            Packet::ConnAck(ConnAck {
566                session_present: true,
567                code: ConnectReturnCode::BadAuthenticationMethod,
568                properties: None,
569            }),
570            Packet::ConnAck(ConnAck {
571                session_present: true,
572                code: ConnectReturnCode::Success,
573                properties: Some(ConnAckProperties {
574                    session_expiry_interval: Some(10),
575                    receive_max: Some(20),
576                    max_qos: Some(1),
577                    retain_available: Some(1),
578                    max_packet_size: Some(30),
579                    assigned_client_identifier: Some("client".into()),
580                    topic_alias_max: Some(40),
581                    reason_string: Some("reason".into()),
582                    user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
583                    wildcard_subscription_available: Some(1),
584                    subscription_identifiers_available: Some(1),
585                    shared_subscription_available: Some(1),
586                    server_keep_alive: Some(50),
587                    response_information: Some("info".into()),
588                    server_reference: Some("ref".into()),
589                    authentication_method: Some("method".into()),
590                    authentication_data: Some(Bytes::from_static(&[1, 2, 3])),
591                }),
592            }),
593        ]
594    }
595}