Skip to main content

rumqttc/mqttbytes/v5/
connack.rs

1use super::{
2    Error, FixedHeader, PropertyType, len_len, length, property, read_mqtt_bytes, read_mqtt_string,
3    read_u8, read_u16, read_u32, write_mqtt_bytes, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7/// Return code in connack
8// This contains return codes for both MQTT v311 and v5
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ConnectReturnCode {
11    Success,
12    RefusedProtocolVersion,
13    BadClientId,
14    ServiceUnavailable,
15    UnspecifiedError,
16    MalformedPacket,
17    ProtocolError,
18    ImplementationSpecificError,
19    UnsupportedProtocolVersion,
20    ClientIdentifierNotValid,
21    BadUserNamePassword,
22    NotAuthorized,
23    ServerUnavailable,
24    ServerBusy,
25    Banned,
26    BadAuthenticationMethod,
27    TopicNameInvalid,
28    PacketTooLarge,
29    QuotaExceeded,
30    PayloadFormatInvalid,
31    RetainNotSupported,
32    QoSNotSupported,
33    UseAnotherServer,
34    ServerMoved,
35    ConnectionRateExceeded,
36}
37
38/// Acknowledgement to connect packet
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct ConnAck {
41    pub session_present: bool,
42    pub code: ConnectReturnCode,
43    pub properties: Option<ConnAckProperties>,
44}
45
46impl ConnAck {
47    fn len(&self) -> usize {
48        let mut len = 1  // session present
49                    + 1; // code
50
51        if let Some(p) = &self.properties {
52            let properties_len = p.len();
53            let properties_len_len = len_len(properties_len);
54            len += properties_len_len + properties_len;
55        } else {
56            len += 1;
57        }
58
59        len
60    }
61
62    pub fn size(&self) -> usize {
63        let len = self.len();
64        let remaining_len_size = len_len(len);
65
66        1 + remaining_len_size + len
67    }
68
69    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
70        let variable_header_index = fixed_header.header_len;
71        bytes.advance(variable_header_index);
72
73        let flags = read_u8(&mut bytes)?;
74        let return_code = read_u8(&mut bytes)?;
75        let properties = ConnAckProperties::read(&mut bytes)?;
76
77        let session_present = (flags & 0x01) == 1;
78        let code = connect_return(return_code)?;
79        let connack = Self {
80            session_present,
81            code,
82            properties,
83        };
84
85        Ok(connack)
86    }
87
88    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
89        let len = Self::len(self);
90        buffer.put_u8(0x20);
91
92        let count = write_remaining_length(buffer, len)?;
93        buffer.put_u8(u8::from(self.session_present));
94        buffer.put_u8(connect_code(self.code));
95
96        if let Some(p) = &self.properties {
97            p.write(buffer)?;
98        } else {
99            write_remaining_length(buffer, 0)?;
100        }
101
102        Ok(1 + count + len)
103    }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct ConnAckProperties {
108    pub session_expiry_interval: Option<u32>,
109    pub receive_max: Option<u16>,
110    pub max_qos: Option<u8>,
111    pub retain_available: Option<u8>,
112    pub max_packet_size: Option<u32>,
113    pub assigned_client_identifier: Option<String>,
114    pub topic_alias_max: Option<u16>,
115    pub reason_string: Option<String>,
116    pub user_properties: Vec<(String, String)>,
117    pub wildcard_subscription_available: Option<u8>,
118    pub subscription_identifiers_available: Option<u8>,
119    pub shared_subscription_available: Option<u8>,
120    pub server_keep_alive: Option<u16>,
121    pub response_information: Option<String>,
122    pub server_reference: Option<String>,
123    pub authentication_method: Option<String>,
124    pub authentication_data: Option<Bytes>,
125}
126
127impl ConnAckProperties {
128    fn len(&self) -> usize {
129        let mut len = 0;
130
131        if self.session_expiry_interval.is_some() {
132            len += 1 + 4;
133        }
134
135        if self.receive_max.is_some() {
136            len += 1 + 2;
137        }
138
139        if self.max_qos.is_some() {
140            len += 1 + 1;
141        }
142
143        if self.retain_available.is_some() {
144            len += 1 + 1;
145        }
146
147        if self.max_packet_size.is_some() {
148            len += 1 + 4;
149        }
150
151        if let Some(id) = &self.assigned_client_identifier {
152            len += 1 + 2 + id.len();
153        }
154
155        if self.topic_alias_max.is_some() {
156            len += 1 + 2;
157        }
158
159        if let Some(reason) = &self.reason_string {
160            len += 1 + 2 + reason.len();
161        }
162
163        for (key, value) in &self.user_properties {
164            len += 1 + 2 + key.len() + 2 + value.len();
165        }
166
167        if self.wildcard_subscription_available.is_some() {
168            len += 1 + 1;
169        }
170
171        if self.subscription_identifiers_available.is_some() {
172            len += 1 + 1;
173        }
174
175        if self.shared_subscription_available.is_some() {
176            len += 1 + 1;
177        }
178
179        if self.server_keep_alive.is_some() {
180            len += 1 + 2;
181        }
182
183        if let Some(info) = &self.response_information {
184            len += 1 + 2 + info.len();
185        }
186
187        if let Some(reference) = &self.server_reference {
188            len += 1 + 2 + reference.len();
189        }
190
191        if let Some(authentication_method) = &self.authentication_method {
192            len += 1 + 2 + authentication_method.len();
193        }
194
195        if let Some(authentication_data) = &self.authentication_data {
196            len += 1 + 2 + authentication_data.len();
197        }
198
199        len
200    }
201
202    pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
203        let mut properties = Self {
204            session_expiry_interval: None,
205            receive_max: None,
206            max_qos: None,
207            retain_available: None,
208            max_packet_size: None,
209            assigned_client_identifier: None,
210            topic_alias_max: None,
211            reason_string: None,
212            user_properties: Vec::new(),
213            wildcard_subscription_available: None,
214            subscription_identifiers_available: None,
215            shared_subscription_available: None,
216            server_keep_alive: None,
217            response_information: None,
218            server_reference: None,
219            authentication_method: None,
220            authentication_data: None,
221        };
222
223        let (properties_len_len, properties_len) = length(bytes.iter())?;
224        bytes.advance(properties_len_len);
225        if properties_len == 0 {
226            return Ok(None);
227        }
228
229        let mut cursor = 0;
230        // read until cursor reaches property length. properties_len = 0 will skip this loop
231        while cursor < properties_len {
232            let prop = read_u8(bytes)?;
233            cursor += 1;
234            read_connack_property(property(prop)?, bytes, &mut cursor, &mut properties)?;
235        }
236
237        Ok(Some(properties))
238    }
239
240    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
241        let len = self.len();
242        write_remaining_length(buffer, len)?;
243
244        if let Some(session_expiry_interval) = self.session_expiry_interval {
245            buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
246            buffer.put_u32(session_expiry_interval);
247        }
248
249        if let Some(receive_maximum) = self.receive_max {
250            buffer.put_u8(PropertyType::ReceiveMaximum as u8);
251            buffer.put_u16(receive_maximum);
252        }
253
254        if let Some(qos) = self.max_qos {
255            buffer.put_u8(PropertyType::MaximumQos as u8);
256            buffer.put_u8(qos);
257        }
258
259        if let Some(retain_available) = self.retain_available {
260            buffer.put_u8(PropertyType::RetainAvailable as u8);
261            buffer.put_u8(retain_available);
262        }
263
264        if let Some(max_packet_size) = self.max_packet_size {
265            buffer.put_u8(PropertyType::MaximumPacketSize as u8);
266            buffer.put_u32(max_packet_size);
267        }
268
269        if let Some(id) = &self.assigned_client_identifier {
270            buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
271            write_mqtt_string(buffer, id);
272        }
273
274        if let Some(topic_alias_max) = self.topic_alias_max {
275            buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
276            buffer.put_u16(topic_alias_max);
277        }
278
279        if let Some(reason) = &self.reason_string {
280            buffer.put_u8(PropertyType::ReasonString as u8);
281            write_mqtt_string(buffer, reason);
282        }
283
284        for (key, value) in &self.user_properties {
285            buffer.put_u8(PropertyType::UserProperty as u8);
286            write_mqtt_string(buffer, key);
287            write_mqtt_string(buffer, value);
288        }
289
290        if let Some(w) = self.wildcard_subscription_available {
291            buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
292            buffer.put_u8(w);
293        }
294
295        if let Some(s) = self.subscription_identifiers_available {
296            buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
297            buffer.put_u8(s);
298        }
299
300        if let Some(s) = self.shared_subscription_available {
301            buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
302            buffer.put_u8(s);
303        }
304
305        if let Some(keep_alive) = self.server_keep_alive {
306            buffer.put_u8(PropertyType::ServerKeepAlive as u8);
307            buffer.put_u16(keep_alive);
308        }
309
310        if let Some(info) = &self.response_information {
311            buffer.put_u8(PropertyType::ResponseInformation as u8);
312            write_mqtt_string(buffer, info);
313        }
314
315        if let Some(reference) = &self.server_reference {
316            buffer.put_u8(PropertyType::ServerReference as u8);
317            write_mqtt_string(buffer, reference);
318        }
319
320        if let Some(authentication_method) = &self.authentication_method {
321            buffer.put_u8(PropertyType::AuthenticationMethod as u8);
322            write_mqtt_string(buffer, authentication_method);
323        }
324
325        if let Some(authentication_data) = &self.authentication_data {
326            buffer.put_u8(PropertyType::AuthenticationData as u8);
327            write_mqtt_bytes(buffer, authentication_data);
328        }
329
330        Ok(())
331    }
332}
333
334fn read_connack_property(
335    property_type: PropertyType,
336    bytes: &mut Bytes,
337    cursor: &mut usize,
338    properties: &mut ConnAckProperties,
339) -> Result<(), Error> {
340    match property_type {
341        PropertyType::SessionExpiryInterval => {
342            properties.session_expiry_interval = Some(read_u32(bytes)?);
343            *cursor += 4;
344        }
345        PropertyType::ReceiveMaximum => {
346            let receive_max = read_u16(bytes)?;
347            if receive_max == 0 {
348                return Err(Error::ProtocolError);
349            }
350            properties.receive_max = Some(receive_max);
351            *cursor += 2;
352        }
353        PropertyType::MaximumQos => {
354            properties.max_qos = Some(read_u8(bytes)?);
355            *cursor += 1;
356        }
357        PropertyType::RetainAvailable => {
358            properties.retain_available = Some(read_u8(bytes)?);
359            *cursor += 1;
360        }
361        PropertyType::AssignedClientIdentifier => {
362            let id = read_mqtt_string(bytes)?;
363            *cursor += 2 + id.len();
364            properties.assigned_client_identifier = Some(id);
365        }
366        PropertyType::MaximumPacketSize => {
367            properties.max_packet_size = Some(read_u32(bytes)?);
368            *cursor += 4;
369        }
370        PropertyType::TopicAliasMaximum => {
371            properties.topic_alias_max = Some(read_u16(bytes)?);
372            *cursor += 2;
373        }
374        PropertyType::ReasonString => {
375            let reason = read_mqtt_string(bytes)?;
376            *cursor += 2 + reason.len();
377            properties.reason_string = Some(reason);
378        }
379        PropertyType::UserProperty => {
380            let key = read_mqtt_string(bytes)?;
381            let value = read_mqtt_string(bytes)?;
382            *cursor += 2 + key.len() + 2 + value.len();
383            properties.user_properties.push((key, value));
384        }
385        PropertyType::WildcardSubscriptionAvailable => {
386            properties.wildcard_subscription_available = Some(read_u8(bytes)?);
387            *cursor += 1;
388        }
389        PropertyType::SubscriptionIdentifierAvailable => {
390            properties.subscription_identifiers_available = Some(read_u8(bytes)?);
391            *cursor += 1;
392        }
393        PropertyType::SharedSubscriptionAvailable => {
394            properties.shared_subscription_available = Some(read_u8(bytes)?);
395            *cursor += 1;
396        }
397        PropertyType::ServerKeepAlive => {
398            properties.server_keep_alive = Some(read_u16(bytes)?);
399            *cursor += 2;
400        }
401        PropertyType::ResponseInformation => {
402            let info = read_mqtt_string(bytes)?;
403            *cursor += 2 + info.len();
404            properties.response_information = Some(info);
405        }
406        PropertyType::ServerReference => {
407            let reference = read_mqtt_string(bytes)?;
408            *cursor += 2 + reference.len();
409            properties.server_reference = Some(reference);
410        }
411        PropertyType::AuthenticationMethod => {
412            let method = read_mqtt_string(bytes)?;
413            *cursor += 2 + method.len();
414            properties.authentication_method = Some(method);
415        }
416        PropertyType::AuthenticationData => {
417            let data = read_mqtt_bytes(bytes)?;
418            *cursor += 2 + data.len();
419            properties.authentication_data = Some(data);
420        }
421        _ => return Err(Error::InvalidPropertyType(property_type as u8)),
422    }
423
424    Ok(())
425}
426
427/// Connection return code type
428const fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
429    let code = match num {
430        0 => ConnectReturnCode::Success,
431        128 => ConnectReturnCode::UnspecifiedError,
432        129 => ConnectReturnCode::MalformedPacket,
433        130 => ConnectReturnCode::ProtocolError,
434        131 => ConnectReturnCode::ImplementationSpecificError,
435        132 => ConnectReturnCode::UnsupportedProtocolVersion,
436        133 => ConnectReturnCode::ClientIdentifierNotValid,
437        134 => ConnectReturnCode::BadUserNamePassword,
438        135 => ConnectReturnCode::NotAuthorized,
439        136 => ConnectReturnCode::ServerUnavailable,
440        137 => ConnectReturnCode::ServerBusy,
441        138 => ConnectReturnCode::Banned,
442        140 => ConnectReturnCode::BadAuthenticationMethod,
443        144 => ConnectReturnCode::TopicNameInvalid,
444        149 => ConnectReturnCode::PacketTooLarge,
445        151 => ConnectReturnCode::QuotaExceeded,
446        153 => ConnectReturnCode::PayloadFormatInvalid,
447        154 => ConnectReturnCode::RetainNotSupported,
448        155 => ConnectReturnCode::QoSNotSupported,
449        156 => ConnectReturnCode::UseAnotherServer,
450        157 => ConnectReturnCode::ServerMoved,
451        159 => ConnectReturnCode::ConnectionRateExceeded,
452        num => return Err(Error::InvalidConnectReturnCode(num)),
453    };
454
455    Ok(code)
456}
457
458fn connect_code(return_code: ConnectReturnCode) -> u8 {
459    match return_code {
460        ConnectReturnCode::Success => 0,
461        ConnectReturnCode::UnspecifiedError => 128,
462        ConnectReturnCode::MalformedPacket => 129,
463        ConnectReturnCode::ProtocolError => 130,
464        ConnectReturnCode::ImplementationSpecificError => 131,
465        ConnectReturnCode::UnsupportedProtocolVersion => 132,
466        ConnectReturnCode::ClientIdentifierNotValid => 133,
467        ConnectReturnCode::BadUserNamePassword => 134,
468        ConnectReturnCode::NotAuthorized => 135,
469        ConnectReturnCode::ServerUnavailable => 136,
470        ConnectReturnCode::ServerBusy => 137,
471        ConnectReturnCode::Banned => 138,
472        ConnectReturnCode::BadAuthenticationMethod => 140,
473        ConnectReturnCode::TopicNameInvalid => 144,
474        ConnectReturnCode::PacketTooLarge => 149,
475        ConnectReturnCode::QuotaExceeded => 151,
476        ConnectReturnCode::PayloadFormatInvalid => 153,
477        ConnectReturnCode::RetainNotSupported => 154,
478        ConnectReturnCode::QoSNotSupported => 155,
479        ConnectReturnCode::UseAnotherServer => 156,
480        ConnectReturnCode::ServerMoved => 157,
481        ConnectReturnCode::ConnectionRateExceeded => 159,
482        _ => unreachable!(),
483    }
484}
485
486#[cfg(test)]
487mod test {
488    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
489    use super::*;
490    use bytes::{Bytes, BytesMut};
491    use pretty_assertions::assert_eq;
492
493    #[test]
494    fn length_calculation() {
495        let mut dummy_bytes = BytesMut::new();
496        // Use user_properties to pad the size to exceed ~128 bytes to make the
497        // remaining_length field in the packet be 2 bytes long.
498        let connack_props = ConnAckProperties {
499            session_expiry_interval: None,
500            receive_max: None,
501            max_qos: None,
502            retain_available: None,
503            max_packet_size: None,
504            assigned_client_identifier: None,
505            topic_alias_max: None,
506            reason_string: None,
507            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
508            wildcard_subscription_available: None,
509            subscription_identifiers_available: None,
510            shared_subscription_available: None,
511            server_keep_alive: None,
512            response_information: None,
513            server_reference: None,
514            authentication_method: None,
515            authentication_data: None,
516        };
517
518        let connack_pkt = ConnAck {
519            session_present: false,
520            code: ConnectReturnCode::Success,
521            properties: Some(connack_props),
522        };
523
524        let size_from_size = connack_pkt.size();
525        let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
526        let size_from_bytes = dummy_bytes.len();
527
528        assert_eq!(size_from_write, size_from_bytes);
529        assert_eq!(size_from_size, size_from_bytes);
530    }
531
532    #[test]
533    fn read_rejects_receive_maximum_zero() {
534        let mut bytes = Bytes::from_static(&[
535            0x03, // properties length
536            0x21, // ReceiveMaximum property
537            0x00, 0x00, // value = 0
538        ]);
539        let result = ConnAckProperties::read(&mut bytes);
540
541        assert!(matches!(result, Err(Error::ProtocolError)));
542    }
543}