Skip to main content

rumqttc/mqttbytes/v5/
disconnect.rs

1use std::convert::{TryFrom, TryInto};
2
3use bytes::{BufMut, Bytes, BytesMut};
4
5use super::{
6    Buf, Error, FixedHeader, PacketType, len_len, length, read_mqtt_string, read_u8, read_u32,
7    write_mqtt_string, write_remaining_length,
8};
9
10use super::{PropertyType, property};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(u8)]
14pub enum DisconnectReasonCode {
15    /// Close the connection normally. Do not send the Will Message.
16    NormalDisconnection = 0x00,
17    /// The Client wishes to disconnect but requires that the Server also publishes its Will Message.
18    DisconnectWithWillMessage = 0x04,
19    /// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply.
20    UnspecifiedError = 0x80,
21    /// The received packet does not conform to this specification.
22    MalformedPacket = 0x81,
23    /// An unexpected or out of order packet was received.
24    ProtocolError = 0x82,
25    /// The packet received is valid but cannot be processed by this implementation.
26    ImplementationSpecificError = 0x83,
27    /// The request is not authorized.
28    NotAuthorized = 0x87,
29    /// The Server is busy and cannot continue processing requests from this Client.
30    ServerBusy = 0x89,
31    /// The Server is shutting down.
32    ServerShuttingDown = 0x8B,
33    /// The Connection is closed because no packet has been received for 1.5 times the Keepalive time.
34    KeepAliveTimeout = 0x8D,
35    /// Another Connection using the same `ClientID` has connected causing this Connection to be closed.
36    SessionTakenOver = 0x8E,
37    /// The Topic Filter is correctly formed, but is not accepted by this Sever.
38    TopicFilterInvalid = 0x8F,
39    /// The Topic Name is correctly formed, but is not accepted by this Client or Server.
40    TopicNameInvalid = 0x90,
41    /// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP.
42    ReceiveMaximumExceeded = 0x93,
43    /// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.
44    TopicAliasInvalid = 0x94,
45    /// The packet size is greater than Maximum Packet Size for this Client or Server.
46    PacketTooLarge = 0x95,
47    /// The received data rate is too high.
48    MessageRateTooHigh = 0x96,
49    /// An implementation or administrative imposed limit has been exceeded.
50    QuotaExceeded = 0x97,
51    /// The Connection is closed due to an administrative action.
52    AdministrativeAction = 0x98,
53    /// The payload format does not match the one specified by the Payload Format Indicator.
54    PayloadFormatInvalid = 0x99,
55    /// The Server has does not support retained messages.
56    RetainNotSupported = 0x9A,
57    /// The Client specified a `QoS` greater than the `QoS` specified in a Maximum `QoS` in the CONNACK.
58    QoSNotSupported = 0x9B,
59    /// The Client should temporarily change its Server.
60    UseAnotherServer = 0x9C,
61    /// The Server is moved and the Client should permanently change its server location.
62    ServerMoved = 0x9D,
63    /// The Server does not support Shared Subscriptions.
64    SharedSubscriptionNotSupported = 0x9E,
65    /// This connection is closed because the connection rate is too high.
66    ConnectionRateExceeded = 0x9F,
67    /// The maximum connection time authorized for this connection has been exceeded.
68    MaximumConnectTime = 0xA0,
69    /// The Server does not support Subscription Identifiers; the subscription is not accepted.
70    SubscriptionIdentifiersNotSupported = 0xA1,
71    /// The Server does not support Wildcard subscription; the subscription is not accepted.
72    WildcardSubscriptionsNotSupported = 0xA2,
73}
74
75impl TryFrom<u8> for DisconnectReasonCode {
76    type Error = Error;
77
78    fn try_from(value: u8) -> Result<Self, Self::Error> {
79        let rc = match value {
80            0x00 => Self::NormalDisconnection,
81            0x04 => Self::DisconnectWithWillMessage,
82            0x80 => Self::UnspecifiedError,
83            0x81 => Self::MalformedPacket,
84            0x82 => Self::ProtocolError,
85            0x83 => Self::ImplementationSpecificError,
86            0x87 => Self::NotAuthorized,
87            0x89 => Self::ServerBusy,
88            0x8B => Self::ServerShuttingDown,
89            0x8D => Self::KeepAliveTimeout,
90            0x8E => Self::SessionTakenOver,
91            0x8F => Self::TopicFilterInvalid,
92            0x90 => Self::TopicNameInvalid,
93            0x93 => Self::ReceiveMaximumExceeded,
94            0x94 => Self::TopicAliasInvalid,
95            0x95 => Self::PacketTooLarge,
96            0x96 => Self::MessageRateTooHigh,
97            0x97 => Self::QuotaExceeded,
98            0x98 => Self::AdministrativeAction,
99            0x99 => Self::PayloadFormatInvalid,
100            0x9A => Self::RetainNotSupported,
101            0x9B => Self::QoSNotSupported,
102            0x9C => Self::UseAnotherServer,
103            0x9D => Self::ServerMoved,
104            0x9E => Self::SharedSubscriptionNotSupported,
105            0x9F => Self::ConnectionRateExceeded,
106            0xA0 => Self::MaximumConnectTime,
107            0xA1 => Self::SubscriptionIdentifiersNotSupported,
108            0xA2 => Self::WildcardSubscriptionsNotSupported,
109            other => return Err(Error::InvalidConnectReturnCode(other)),
110        };
111
112        Ok(rc)
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct DisconnectProperties {
118    /// Session Expiry Interval in seconds
119    pub session_expiry_interval: Option<u32>,
120
121    /// Human readable reason for the disconnect
122    pub reason_string: Option<String>,
123
124    /// List of user properties
125    pub user_properties: Vec<(String, String)>,
126
127    /// String which can be used by the Client to identify another Server to use.
128    pub server_reference: Option<String>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct Disconnect {
133    /// Disconnect Reason Code
134    pub reason_code: DisconnectReasonCode,
135
136    /// Disconnect Properties
137    pub properties: Option<DisconnectProperties>,
138}
139
140impl DisconnectProperties {
141    // pub fn new() -> Self {
142    //     Self {
143    //         session_expiry_interval: None,
144    //         reason_string: None,
145    //         user_properties: Vec::new(),
146    //         server_reference: None,
147    //     }
148    // }
149
150    fn len(&self) -> usize {
151        let mut length = 0;
152
153        if self.session_expiry_interval.is_some() {
154            length += 1 + 4;
155        }
156
157        if let Some(reason) = &self.reason_string {
158            length += 1 + 2 + reason.len();
159        }
160
161        for (key, value) in &self.user_properties {
162            length += 1 + 2 + key.len() + 2 + value.len();
163        }
164
165        if let Some(server_reference) = &self.server_reference {
166            length += 1 + 2 + server_reference.len();
167        }
168
169        length
170    }
171
172    pub fn extract(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
173        let (properties_len_len, properties_len) = length(bytes.iter())?;
174
175        bytes.advance(properties_len_len);
176
177        if properties_len == 0 {
178            return Ok(None);
179        }
180
181        let mut session_expiry_interval = None;
182        let mut reason_string = None;
183        let mut user_properties = Vec::new();
184        let mut server_reference = None;
185
186        let mut cursor = 0;
187
188        // read until cursor reaches property length. properties_len = 0 will skip this loop
189        while cursor < properties_len {
190            let prop = read_u8(bytes)?;
191            cursor += 1;
192
193            match property(prop)? {
194                PropertyType::SessionExpiryInterval => {
195                    session_expiry_interval = Some(read_u32(bytes)?);
196                    cursor += 4;
197                }
198                PropertyType::ReasonString => {
199                    let reason = read_mqtt_string(bytes)?;
200                    cursor += 2 + reason.len();
201                    reason_string = Some(reason);
202                }
203                PropertyType::UserProperty => {
204                    let key = read_mqtt_string(bytes)?;
205                    let value = read_mqtt_string(bytes)?;
206                    cursor += 2 + key.len() + 2 + value.len();
207                    user_properties.push((key, value));
208                }
209                PropertyType::ServerReference => {
210                    let reference = read_mqtt_string(bytes)?;
211                    cursor += 2 + reference.len();
212                    server_reference = Some(reference);
213                }
214                _ => return Err(Error::InvalidPropertyType(prop)),
215            }
216        }
217
218        let properties = Self {
219            session_expiry_interval,
220            reason_string,
221            user_properties,
222            server_reference,
223        };
224
225        Ok(Some(properties))
226    }
227
228    fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
229        let length = self.len();
230        write_remaining_length(buffer, length)?;
231
232        if let Some(session_expiry_interval) = self.session_expiry_interval {
233            buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
234            buffer.put_u32(session_expiry_interval);
235        }
236
237        if let Some(reason) = &self.reason_string {
238            buffer.put_u8(PropertyType::ReasonString as u8);
239            write_mqtt_string(buffer, reason);
240        }
241
242        for (key, value) in &self.user_properties {
243            buffer.put_u8(PropertyType::UserProperty as u8);
244            write_mqtt_string(buffer, key);
245            write_mqtt_string(buffer, value);
246        }
247
248        if let Some(reference) = &self.server_reference {
249            buffer.put_u8(PropertyType::ServerReference as u8);
250            write_mqtt_string(buffer, reference);
251        }
252
253        Ok(())
254    }
255}
256
257impl Disconnect {
258    #[must_use]
259    pub const fn new(reason: DisconnectReasonCode) -> Self {
260        Self {
261            reason_code: reason,
262            properties: None,
263        }
264    }
265
266    #[must_use]
267    pub const fn new_with_properties(
268        reason: DisconnectReasonCode,
269        properties: DisconnectProperties,
270    ) -> Self {
271        Self {
272            reason_code: reason,
273            properties: Some(properties),
274        }
275    }
276
277    fn len(&self) -> usize {
278        if self.reason_code == DisconnectReasonCode::NormalDisconnection
279            && self.properties.is_none()
280        {
281            return 2; // Packet type + 0x00
282        }
283
284        let mut length = 1; // Disconnect Reason Code
285
286        if let Some(properties) = &self.properties {
287            let properties_len = properties.len();
288            let properties_len_len = len_len(properties_len);
289            length += properties_len_len + properties_len;
290        }
291
292        length
293    }
294
295    #[must_use]
296    pub fn size(&self) -> usize {
297        let len = self.len();
298        if len == 2 {
299            return len;
300        }
301
302        let remaining_len_size = len_len(len);
303
304        1 + remaining_len_size + len
305    }
306
307    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
308        let packet_type = fixed_header.byte1 >> 4;
309        let flags = fixed_header.byte1 & 0b0000_1111;
310
311        bytes.advance(fixed_header.header_len);
312
313        if packet_type != PacketType::Disconnect as u8 {
314            return Err(Error::InvalidPacketType(packet_type));
315        }
316
317        if flags != 0x00 {
318            return Err(Error::MalformedPacket);
319        }
320
321        if fixed_header.remaining_len == 0 {
322            return Ok(Self::new(DisconnectReasonCode::NormalDisconnection));
323        }
324
325        let reason_code = read_u8(&mut bytes)?;
326
327        let disconnect = Self {
328            reason_code: reason_code.try_into()?,
329            properties: DisconnectProperties::extract(&mut bytes)?,
330        };
331
332        Ok(disconnect)
333    }
334
335    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
336        buffer.put_u8(0xE0);
337
338        let length = self.len();
339
340        if length == 2 {
341            buffer.put_u8(0x00);
342            return Ok(length);
343        }
344
345        let len_len = write_remaining_length(buffer, length)?;
346
347        buffer.put_u8(self.reason_code as u8);
348
349        if let Some(properties) = &self.properties {
350            properties.write(buffer)?;
351        } else {
352            write_remaining_length(buffer, 0)?;
353        }
354
355        Ok(1 + len_len + length)
356    }
357}
358
359#[cfg(test)]
360mod test {
361    use bytes::BytesMut;
362
363    use super::{Disconnect, DisconnectProperties, DisconnectReasonCode};
364    use crate::mqttbytes::v5::parse_fixed_header;
365
366    #[test]
367    fn disconnect1_parsing_works() {
368        let mut buffer = bytes::BytesMut::new();
369        let packet_bytes = [
370            0xE0, // Packet type
371            0x00, // Remaining length
372        ];
373        let expected = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
374
375        buffer.extend_from_slice(&packet_bytes[..]);
376
377        let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
378        let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
379        let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
380
381        assert_eq!(disconnect, expected);
382    }
383
384    #[test]
385    fn disconnect1_encoding_works() {
386        let mut buffer = BytesMut::new();
387        let disconnect = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
388        let expected = [
389            0xE0, // Packet type
390            0x00, // Remaining length
391        ];
392
393        disconnect.write(&mut buffer).unwrap();
394
395        assert_eq!(&buffer[..], &expected);
396    }
397
398    fn sample2() -> Disconnect {
399        let properties = DisconnectProperties {
400            session_expiry_interval: Some(1234),
401            reason_string: Some("test".to_owned()),
402            user_properties: vec![("test".to_owned(), "test".to_owned())],
403            server_reference: Some("test".to_owned()),
404        };
405
406        Disconnect {
407            reason_code: DisconnectReasonCode::UnspecifiedError,
408            properties: Some(properties),
409        }
410    }
411
412    fn sample_bytes2() -> Vec<u8> {
413        vec![
414            0xE0, // Packet type
415            0x22, // Remaining length
416            0x80, // Disconnect Reason Code
417            0x20, // Properties length
418            0x11, 0x00, 0x00, 0x04, 0xd2, // Session expiry interval
419            0x1F, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, // Reason string
420            0x26, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x00, 0x04, 0x74, 0x65, 0x73,
421            0x74, // User properties
422            0x1C, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, // server reference
423        ]
424    }
425
426    #[test]
427    fn disconnect2_parsing_works() {
428        let mut buffer = bytes::BytesMut::new();
429        let packet_bytes = sample_bytes2();
430        let expected = sample2();
431
432        buffer.extend_from_slice(&packet_bytes[..]);
433
434        let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
435        let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
436        let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
437
438        assert_eq!(disconnect, expected);
439    }
440
441    #[test]
442    fn disconnect2_encoding_works() {
443        let mut buffer = BytesMut::new();
444
445        let disconnect = sample2();
446        let expected = sample_bytes2();
447
448        disconnect.write(&mut buffer).unwrap();
449
450        assert_eq!(&buffer[..], &expected);
451    }
452
453    // use super::*;
454    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
455    // use bytes::BytesMut;
456    use pretty_assertions::assert_eq;
457
458    #[test]
459    fn length_calculation() {
460        let mut dummy_bytes = BytesMut::new();
461        // Use user_properties to pad the size to exceed ~128 bytes to make the
462        // remaining_length field in the packet be 2 bytes long.
463        let disconn_props = DisconnectProperties {
464            session_expiry_interval: None,
465            reason_string: None,
466            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
467            server_reference: None,
468        };
469
470        let mut disconn_pkt = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
471        disconn_pkt.properties = Some(disconn_props);
472
473        let size_from_size = disconn_pkt.size();
474        let size_from_write = disconn_pkt.write(&mut dummy_bytes).unwrap();
475        let size_from_bytes = dummy_bytes.len();
476
477        assert_eq!(size_from_write, size_from_bytes);
478        assert_eq!(size_from_size, size_from_bytes);
479    }
480}