mqtt_proto/v5/
connect.rs

1use core::convert::TryFrom;
2
3use alloc::string::String;
4use alloc::sync::Arc;
5use alloc::vec::Vec;
6
7use bytes::Bytes;
8use simdutf8::basic::from_utf8;
9
10use crate::{
11    from_read_exact_error, read_bytes, read_string, read_u16, read_u8, write_bytes, write_u16,
12    write_u8, AsyncRead, Encodable, Error, Protocol, QoS, SyncWrite, TopicName,
13};
14
15use super::{
16    decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
17    UserProperty,
18};
19
20/// Body type of CONNECT packet.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct Connect {
23    /// The [protocol version](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901036).
24    pub protocol: Protocol,
25
26    /// [Clean start] flag. This value specifies whether the Connection starts a new Session or is a continuation of an existing Session.
27    ///
28    /// [Clean start]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901039
29    pub clean_start: bool,
30
31    /// The [keep alive]. A time interval measured in seconds. It is the
32    /// maximum time interval that is permitted to elapse between the point at
33    /// which the Client finishes transmitting one MQTT Control Packet and the
34    /// point it starts sending the next.
35    ///
36    /// [keep alive]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901045
37    pub keep_alive: u16,
38
39    /// Properties
40    pub properties: ConnectProperties,
41
42    /// The [client identifier] (ClientID).
43    ///
44    /// [client identifier]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
45    pub client_id: Arc<String>,
46
47    /// The [will] message.
48    ///
49    /// [will]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901060
50    pub last_will: Option<LastWill>,
51
52    /// The [user name](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901071).
53    pub username: Option<Arc<String>>,
54
55    /// The [password](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901072).
56    pub password: Option<Bytes>,
57}
58#[cfg(feature = "arbitrary")]
59impl<'a> arbitrary::Arbitrary<'a> for Connect {
60    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
61        Ok(Connect {
62            protocol: u.arbitrary()?,
63            clean_start: u.arbitrary()?,
64            keep_alive: u.arbitrary()?,
65            properties: u.arbitrary()?,
66            client_id: u.arbitrary()?,
67            last_will: u.arbitrary()?,
68            username: u.arbitrary()?,
69            password: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
70        })
71    }
72}
73
74impl Connect {
75    pub fn new(client_id: Arc<String>, keep_alive: u16) -> Self {
76        Connect {
77            protocol: Protocol::V500,
78            clean_start: true,
79            keep_alive,
80            properties: ConnectProperties::default(),
81            client_id,
82            last_will: None,
83            username: None,
84            password: None,
85        }
86    }
87
88    pub async fn decode_async<T: AsyncRead + Unpin>(
89        reader: &mut T,
90        header: Header,
91    ) -> Result<Self, ErrorV5> {
92        let protocol = Protocol::decode_async(reader).await?;
93        Self::decode_with_protocol(reader, header, protocol).await
94    }
95
96    #[inline]
97    pub async fn decode_with_protocol<T: AsyncRead + Unpin>(
98        reader: &mut T,
99        header: Header,
100        protocol: Protocol,
101    ) -> Result<Self, ErrorV5> {
102        if protocol != Protocol::V500 {
103            return Err(Error::UnexpectedProtocol(protocol).into());
104        }
105        let connect_flags: u8 = read_u8(reader).await?;
106        if connect_flags & 1 != 0 {
107            return Err(Error::InvalidConnectFlags(connect_flags).into());
108        }
109        let keep_alive = read_u16(reader).await?;
110
111        // FIXME: check remaining length
112
113        let properties = ConnectProperties::decode_async(reader, header.typ).await?;
114        let client_id = Arc::new(read_string(reader).await?);
115        let last_will = if connect_flags & 0b100 != 0 {
116            let qos = QoS::from_u8((connect_flags & 0b11000) >> 3)?;
117            let retain = (connect_flags & 0b00100000) != 0;
118            Some(LastWill::decode_async(reader, qos, retain).await?)
119        } else if connect_flags & 0b11000 != 0 {
120            return Err(Error::InvalidConnectFlags(connect_flags).into());
121        } else {
122            None
123        };
124        let username = if connect_flags & 0b10000000 != 0 {
125            Some(Arc::new(read_string(reader).await?))
126        } else {
127            None
128        };
129        let password = if connect_flags & 0b01000000 != 0 {
130            Some(Bytes::from(read_bytes(reader).await?))
131        } else {
132            None
133        };
134        let clean_start = (connect_flags & 0b10) != 0;
135
136        Ok(Connect {
137            protocol,
138            clean_start,
139            properties,
140            keep_alive,
141            client_id,
142            last_will,
143            username,
144            password,
145        })
146    }
147}
148
149impl Encodable for Connect {
150    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
151        let mut connect_flags: u8 = 0b00000000;
152        if self.clean_start {
153            connect_flags |= 0b10;
154        }
155        if self.username.is_some() {
156            connect_flags |= 0b10000000;
157        }
158        if self.password.is_some() {
159            connect_flags |= 0b01000000;
160        }
161        if let Some(last_will) = self.last_will.as_ref() {
162            connect_flags |= 0b00000100;
163            connect_flags |= (last_will.qos as u8) << 3;
164            if last_will.retain {
165                connect_flags |= 0b00100000;
166            }
167        }
168
169        self.protocol.encode(writer)?;
170        write_u8(writer, connect_flags)?;
171        write_u16(writer, self.keep_alive)?;
172        self.properties.encode(writer)?;
173        write_bytes(writer, self.client_id.as_bytes())?;
174        if let Some(last_will) = self.last_will.as_ref() {
175            last_will.encode(writer)?;
176        }
177        if let Some(username) = self.username.as_ref() {
178            write_bytes(writer, username.as_bytes())?;
179        }
180        if let Some(password) = self.password.as_ref() {
181            write_bytes(writer, password.as_ref())?;
182        }
183        Ok(())
184    }
185
186    fn encode_len(&self) -> usize {
187        let mut len = self.protocol.encode_len();
188        // flags + keep-alive
189        len += 1 + 2;
190        // properties
191        len += self.properties.encode_len();
192        // client identifier
193        len += 2 + self.client_id.len();
194        if let Some(last_will) = self.last_will.as_ref() {
195            len += last_will.encode_len();
196        }
197        if let Some(username) = self.username.as_ref() {
198            len += 2 + username.len();
199        }
200        if let Some(password) = self.password.as_ref() {
201            len += 2 + password.len();
202        }
203        len
204    }
205}
206
207/// Property list for CONNECT packet.
208#[derive(Debug, Clone, PartialEq, Eq, Default)]
209pub struct ConnectProperties {
210    /// Session Expiry Interval
211    pub session_expiry_interval: Option<u32>,
212    /// Receive Maximum
213    pub receive_max: Option<u16>,
214    /// Maximum Packet Size
215    pub max_packet_size: Option<u32>,
216    /// Topic Alias Maximum
217    pub topic_alias_max: Option<u16>,
218    /// Request Response Information. If absent the default value should be false.
219    pub request_response_info: Option<bool>,
220    /// Request Problem Information. If absent the default value should be true.
221    pub request_problem_info: Option<bool>,
222    /// User Property
223    pub user_properties: Vec<UserProperty>,
224    /// Authentication Method
225    pub auth_method: Option<Arc<String>>,
226    /// Authentication Data
227    pub auth_data: Option<Bytes>,
228}
229
230#[cfg(feature = "arbitrary")]
231impl<'a> arbitrary::Arbitrary<'a> for ConnectProperties {
232    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
233        Ok(ConnectProperties {
234            session_expiry_interval: u.arbitrary()?,
235            receive_max: u.arbitrary()?,
236            max_packet_size: u.arbitrary()?,
237            topic_alias_max: u.arbitrary()?,
238            request_response_info: u.arbitrary()?,
239            request_problem_info: u.arbitrary()?,
240            user_properties: u.arbitrary()?,
241            auth_method: u.arbitrary()?,
242            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
243        })
244    }
245}
246
247impl ConnectProperties {
248    pub async fn decode_async<T: AsyncRead + Unpin>(
249        reader: &mut T,
250        packet_type: PacketType,
251    ) -> Result<Self, ErrorV5> {
252        let mut properties = ConnectProperties::default();
253        decode_properties!(
254            packet_type,
255            properties,
256            reader,
257            SessionExpiryInterval,
258            ReceiveMaximum,
259            MaximumPacketSize,
260            TopicAliasMaximum,
261            RequestResponseInformation,
262            RequestProblemInformation,
263            AuthenticationMethod,
264            AuthenticationData,
265        );
266        Ok(properties)
267    }
268}
269
270impl Encodable for ConnectProperties {
271    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
272        encode_properties!(
273            self,
274            writer,
275            SessionExpiryInterval,
276            ReceiveMaximum,
277            MaximumPacketSize,
278            TopicAliasMaximum,
279            RequestResponseInformation,
280            RequestProblemInformation,
281            AuthenticationMethod,
282            AuthenticationData,
283        );
284        Ok(())
285    }
286
287    fn encode_len(&self) -> usize {
288        let mut len = 0;
289        encode_properties_len!(
290            self,
291            len,
292            SessionExpiryInterval,
293            ReceiveMaximum,
294            MaximumPacketSize,
295            TopicAliasMaximum,
296            RequestResponseInformation,
297            RequestProblemInformation,
298            AuthenticationMethod,
299            AuthenticationData,
300        );
301        len
302    }
303}
304
305/// The will message for CONNECT packet.
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct LastWill {
308    pub qos: QoS,
309    pub retain: bool,
310    pub topic_name: TopicName,
311    pub payload: Bytes,
312    pub properties: WillProperties,
313}
314
315#[cfg(feature = "arbitrary")]
316impl<'a> arbitrary::Arbitrary<'a> for LastWill {
317    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
318        Ok(LastWill {
319            qos: u.arbitrary()?,
320            retain: u.arbitrary()?,
321            properties: u.arbitrary()?,
322            topic_name: u.arbitrary()?,
323            payload: Bytes::from(Vec::<u8>::arbitrary(u)?),
324        })
325    }
326}
327
328impl LastWill {
329    pub fn new(qos: QoS, topic_name: TopicName, payload: Bytes) -> Self {
330        LastWill {
331            qos,
332            retain: false,
333            topic_name,
334            payload,
335            properties: WillProperties::default(),
336        }
337    }
338
339    pub async fn decode_async<T: AsyncRead + Unpin>(
340        reader: &mut T,
341        qos: QoS,
342        retain: bool,
343    ) -> Result<Self, ErrorV5> {
344        let properties = WillProperties::decode_async(reader).await?;
345        let topic_name = TopicName::try_from(read_string(reader).await?)?;
346        let payload = read_bytes(reader).await?;
347        if properties.payload_is_utf8 == Some(true) && from_utf8(&payload).is_err() {
348            return Err(ErrorV5::InvalidPayloadFormat);
349        }
350        Ok(LastWill {
351            qos,
352            retain,
353            properties,
354            topic_name,
355            payload: Bytes::from(payload),
356        })
357    }
358}
359
360impl Encodable for LastWill {
361    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
362        self.properties.encode(writer)?;
363        write_bytes(writer, self.topic_name.as_bytes())?;
364        write_bytes(writer, self.payload.as_ref())?;
365        Ok(())
366    }
367
368    fn encode_len(&self) -> usize {
369        let mut len = self.properties.encode_len();
370        len += 4;
371        len += self.topic_name.len();
372        len += self.payload.len();
373        len
374    }
375}
376
377/// Property list for will message.
378#[derive(Debug, Clone, PartialEq, Eq, Default)]
379pub struct WillProperties {
380    pub delay_interval: Option<u32>,
381    pub payload_is_utf8: Option<bool>,
382    pub message_expiry_interval: Option<u32>,
383    pub content_type: Option<Arc<String>>,
384    pub response_topic: Option<TopicName>,
385    pub correlation_data: Option<Bytes>,
386    pub user_properties: Vec<UserProperty>,
387}
388#[cfg(feature = "arbitrary")]
389impl<'a> arbitrary::Arbitrary<'a> for WillProperties {
390    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
391        Ok(WillProperties {
392            delay_interval: u.arbitrary()?,
393            payload_is_utf8: u.arbitrary()?,
394            message_expiry_interval: u.arbitrary()?,
395            content_type: u.arbitrary()?,
396            response_topic: u.arbitrary()?,
397            correlation_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
398            user_properties: u.arbitrary()?,
399        })
400    }
401}
402
403impl WillProperties {
404    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
405        let mut properties = WillProperties::default();
406        decode_properties!(
407            LastWill,
408            properties,
409            reader,
410            WillDelayInterval,
411            PayloadFormatIndicator,
412            MessageExpiryInterval,
413            ContentType,
414            ResponseTopic,
415            CorrelationData,
416        );
417        Ok(properties)
418    }
419}
420
421impl Encodable for WillProperties {
422    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
423        encode_properties!(
424            self,
425            writer,
426            WillDelayInterval,
427            PayloadFormatIndicator,
428            MessageExpiryInterval,
429            ContentType,
430            ResponseTopic,
431            CorrelationData,
432        );
433        Ok(())
434    }
435
436    fn encode_len(&self) -> usize {
437        let mut len = 0;
438        encode_properties_len!(
439            self,
440            len,
441            WillDelayInterval,
442            PayloadFormatIndicator,
443            MessageExpiryInterval,
444            ContentType,
445            ResponseTopic,
446            CorrelationData,
447        );
448        len
449    }
450}
451
452/// Body type of CONNACK packet.
453#[derive(Debug, Clone, PartialEq, Eq)]
454#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
455pub struct Connack {
456    pub session_present: bool,
457    pub reason_code: ConnectReasonCode,
458    pub properties: ConnackProperties,
459}
460
461impl Connack {
462    pub fn new(session_present: bool, reason_code: ConnectReasonCode) -> Self {
463        Connack {
464            session_present,
465            reason_code,
466            properties: ConnackProperties::default(),
467        }
468    }
469    pub async fn decode_async<T: AsyncRead + Unpin>(
470        reader: &mut T,
471        header: Header,
472    ) -> Result<Self, ErrorV5> {
473        let mut payload = [0u8; 2];
474        reader
475            .read_exact(&mut payload)
476            .await
477            .map_err(from_read_exact_error)?;
478        let session_present = match payload[0] {
479            0 => false,
480            1 => true,
481            _ => return Err(Error::InvalidConnackFlags(payload[0]).into()),
482        };
483        let reason_code = ConnectReasonCode::from_u8(payload[1])
484            .ok_or(ErrorV5::InvalidReasonCode(header.typ, payload[1]))?;
485        let properties = ConnackProperties::decode_async(reader, header.typ).await?;
486        Ok(Connack {
487            session_present,
488            reason_code,
489            properties,
490        })
491    }
492}
493
494impl Encodable for Connack {
495    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
496        write_u8(writer, u8::from(self.session_present))?;
497        write_u8(writer, self.reason_code as u8)?;
498        self.properties.encode(writer)?;
499        Ok(())
500    }
501
502    fn encode_len(&self) -> usize {
503        2 + self.properties.encode_len()
504    }
505}
506
507/// Reason code for CONNECT packet.
508///
509/// | Dec |  Hex | Reason Code name              | Description                                                                                              |
510/// |-----|------|-------------------------------|----------------------------------------------------------------------------------------------------------|
511/// |   0 | 0x00 | Success                       | The Connection is accepted.                                                                              |
512/// | 128 | 0x80 | Unspecified error             | The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply.  |
513/// | 129 | 0x81 | Malformed Packet              | Data within the CONNECT packet could not be correctly parsed.                                            |
514/// | 130 | 0x82 | Protocol Error                | Data in the CONNECT packet does not conform to this specification.                                       |
515/// | 131 | 0x83 | Implementation specific error | The CONNECT is valid but is not accepted by this Server.                                                 |
516/// | 132 | 0x84 | Unsupported Protocol Version  | The Server does not support the version of the MQTT protocol requested by the Client.                    |
517/// | 133 | 0x85 | Client Identifier not valid   | The Client Identifier is a valid string but is not allowed by the Server.                                |
518/// | 134 | 0x86 | Bad User Name or Password     | The Server does not accept the User Name or Password specified by the Client                             |
519/// | 135 | 0x87 | Not authorized                | The Client is not authorized to connect.                                                                 |
520/// | 136 | 0x88 | Server unavailable            | The MQTT Server is not available.                                                                        |
521/// | 137 | 0x89 | Server busy                   | The Server is busy. Try again later.                                                                     |
522/// | 138 | 0x8A | Banned                        | This Client has been banned by administrative action. Contact the server administrator.                  |
523/// | 140 | 0x8C | Bad authentication method     | The authentication method is not supported or does not match the authentication method currently in use. |
524/// | 144 | 0x90 | Topic Name invalid            | The Will Topic Name is not malformed, but is not accepted by this Server.                                |
525/// | 149 | 0x95 | Packet too large              | The CONNECT packet exceeded the maximum permissible size.                                                |
526/// | 151 | 0x97 | Quota exceeded                | An implementation or administrative imposed limit has been exceeded.                                     |
527/// | 153 | 0x99 | Payload format invalid        | The Will Payload does not match the specified Payload Format Indicator.                                  |
528/// | 154 | 0x9A | Retain not supported          | The Server does not support retained messages, and Will Retain was set to 1.                             |
529/// | 155 | 0x9B | QoS not supported             | The Server does not support the QoS set in Will QoS.                                                     |
530/// | 156 | 0x9C | Use another server            | The Client should temporarily use another server.                                                        |
531/// | 157 | 0x9D | Server moved                  | The Client should permanently use another server.                                                        |
532/// | 159 | 0x9F | Connection rate exceeded      | The connection rate limit has been exceeded.                                                             |
533#[repr(u8)]
534#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
535#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
536pub enum ConnectReasonCode {
537    Success = 0x00,
538    UnspecifiedError = 0x80,
539    MalformedPacket = 0x81,
540    ProtocolError = 0x82,
541    ImplementationSpecificError = 0x83,
542    UnsupportedProtocolVersion = 0x84,
543    ClientIdentifierNotValid = 0x85,
544    BadUserNameOrPassword = 0x86,
545    NotAuthorized = 0x87,
546    ServerUnavailable = 0x88,
547    ServerBusy = 0x89,
548    Banned = 0x8A,
549    BadAuthMethod = 0x8C,
550    TopicNameInvalid = 0x90,
551    PacketTooLarge = 0x95,
552    QuotaExceeded = 0x97,
553    PayloadFormatInvalid = 0x99,
554    RetainNotSupported = 0x9A,
555    QoSNotSupported = 0x9B,
556    UseAnotherServer = 0x9C,
557    ServerMoved = 0x9D,
558    ConnectionRateExceeded = 0x9F,
559}
560
561impl ConnectReasonCode {
562    pub fn from_u8(value: u8) -> Option<ConnectReasonCode> {
563        let code = match value {
564            0x00 => ConnectReasonCode::Success,
565            0x80 => ConnectReasonCode::UnspecifiedError,
566            0x81 => ConnectReasonCode::MalformedPacket,
567            0x82 => ConnectReasonCode::ProtocolError,
568            0x83 => ConnectReasonCode::ImplementationSpecificError,
569            0x84 => ConnectReasonCode::UnsupportedProtocolVersion,
570            0x85 => ConnectReasonCode::ClientIdentifierNotValid,
571            0x86 => ConnectReasonCode::BadUserNameOrPassword,
572            0x87 => ConnectReasonCode::NotAuthorized,
573            0x88 => ConnectReasonCode::ServerUnavailable,
574            0x89 => ConnectReasonCode::ServerBusy,
575            0x8A => ConnectReasonCode::Banned,
576            0x8C => ConnectReasonCode::BadAuthMethod,
577            0x90 => ConnectReasonCode::TopicNameInvalid,
578            0x95 => ConnectReasonCode::PacketTooLarge,
579            0x97 => ConnectReasonCode::QuotaExceeded,
580            0x99 => ConnectReasonCode::PayloadFormatInvalid,
581            0x9A => ConnectReasonCode::RetainNotSupported,
582            0x9B => ConnectReasonCode::QoSNotSupported,
583            0x9C => ConnectReasonCode::UseAnotherServer,
584            0x9D => ConnectReasonCode::ServerMoved,
585            0x9F => ConnectReasonCode::ConnectionRateExceeded,
586            _ => return None,
587        };
588        Some(code)
589    }
590}
591
592/// Property list for CONNACK packet.
593#[derive(Debug, Clone, PartialEq, Eq, Default)]
594pub struct ConnackProperties {
595    pub session_expiry_interval: Option<u32>,
596    pub receive_max: Option<u16>,
597    pub max_qos: Option<QoS>,
598    pub retain_available: Option<bool>,
599    pub max_packet_size: Option<u32>,
600    pub assigned_client_id: Option<Arc<String>>,
601    pub topic_alias_max: Option<u16>,
602    pub reason_string: Option<Arc<String>>,
603    pub user_properties: Vec<UserProperty>,
604    pub wildcard_subscription_available: Option<bool>,
605    pub subscription_id_available: Option<bool>,
606    pub shared_subscription_available: Option<bool>,
607    pub server_keep_alive: Option<u16>,
608    pub response_info: Option<Arc<String>>,
609    pub server_reference: Option<Arc<String>>,
610    pub auth_method: Option<Arc<String>>,
611    pub auth_data: Option<Bytes>,
612}
613
614#[cfg(feature = "arbitrary")]
615impl<'a> arbitrary::Arbitrary<'a> for ConnackProperties {
616    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
617        Ok(ConnackProperties {
618            session_expiry_interval: u.arbitrary()?,
619            receive_max: u.arbitrary()?,
620            max_qos: u.arbitrary()?,
621            retain_available: u.arbitrary()?,
622            max_packet_size: u.arbitrary()?,
623            assigned_client_id: u.arbitrary()?,
624            topic_alias_max: u.arbitrary()?,
625            reason_string: u.arbitrary()?,
626            user_properties: u.arbitrary()?,
627            wildcard_subscription_available: u.arbitrary()?,
628            subscription_id_available: u.arbitrary()?,
629            shared_subscription_available: u.arbitrary()?,
630            server_keep_alive: u.arbitrary()?,
631            response_info: u.arbitrary()?,
632            server_reference: u.arbitrary()?,
633            auth_method: u.arbitrary()?,
634            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
635        })
636    }
637}
638
639impl ConnackProperties {
640    pub async fn decode_async<T: AsyncRead + Unpin>(
641        reader: &mut T,
642        packet_type: PacketType,
643    ) -> Result<Self, ErrorV5> {
644        let mut properties = ConnackProperties::default();
645        decode_properties!(
646            packet_type,
647            properties,
648            reader,
649            SessionExpiryInterval,
650            ReceiveMaximum,
651            MaximumQoS,
652            RetainAvailable,
653            MaximumPacketSize,
654            AssignedClientIdentifier,
655            TopicAliasMaximum,
656            ReasonString,
657            WildcardSubscriptionAvailable,
658            SubscriptionIdentifierAvailable,
659            SharedSubscriptionAvailable,
660            ServerKeepAlive,
661            ResponseInformation,
662            ServerReference,
663            AuthenticationMethod,
664            AuthenticationData,
665        );
666        Ok(properties)
667    }
668}
669
670impl Encodable for ConnackProperties {
671    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
672        encode_properties!(
673            self,
674            writer,
675            SessionExpiryInterval,
676            ReceiveMaximum,
677            MaximumQoS,
678            RetainAvailable,
679            MaximumPacketSize,
680            AssignedClientIdentifier,
681            TopicAliasMaximum,
682            ReasonString,
683            WildcardSubscriptionAvailable,
684            SubscriptionIdentifierAvailable,
685            SharedSubscriptionAvailable,
686            ServerKeepAlive,
687            ResponseInformation,
688            ServerReference,
689            AuthenticationMethod,
690            AuthenticationData,
691        );
692        Ok(())
693    }
694
695    fn encode_len(&self) -> usize {
696        let mut len = 0;
697        encode_properties_len!(
698            self,
699            len,
700            SessionExpiryInterval,
701            ReceiveMaximum,
702            MaximumQoS,
703            RetainAvailable,
704            MaximumPacketSize,
705            AssignedClientIdentifier,
706            TopicAliasMaximum,
707            ReasonString,
708            WildcardSubscriptionAvailable,
709            SubscriptionIdentifierAvailable,
710            SharedSubscriptionAvailable,
711            ServerKeepAlive,
712            ResponseInformation,
713            ServerReference,
714            AuthenticationMethod,
715            AuthenticationData,
716        );
717        len
718    }
719}
720
721/// Body type for DISCONNECT packet.
722#[derive(Debug, Clone, PartialEq, Eq)]
723#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
724pub struct Disconnect {
725    pub reason_code: DisconnectReasonCode,
726    pub properties: DisconnectProperties,
727}
728
729impl Disconnect {
730    pub fn new(reason_code: DisconnectReasonCode) -> Self {
731        Disconnect {
732            reason_code,
733            properties: DisconnectProperties::default(),
734        }
735    }
736
737    pub fn new_normal() -> Self {
738        Self::new(DisconnectReasonCode::NormalDisconnect)
739    }
740
741    pub async fn decode_async<T: AsyncRead + Unpin>(
742        reader: &mut T,
743        header: Header,
744    ) -> Result<Self, ErrorV5> {
745        let (reason_code, properties) = if header.remaining_len == 0 {
746            (DisconnectReasonCode::NormalDisconnect, Default::default())
747        } else if header.remaining_len == 1 {
748            let reason_byte = read_u8(reader).await?;
749            let reason_code = DisconnectReasonCode::from_u8(reason_byte)
750                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
751            (reason_code, Default::default())
752        } else {
753            let reason_byte = read_u8(reader).await?;
754            let reason_code = DisconnectReasonCode::from_u8(reason_byte)
755                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
756            let properties = DisconnectProperties::decode_async(reader, header.typ).await?;
757            (reason_code, properties)
758        };
759        Ok(Disconnect {
760            reason_code,
761            properties,
762        })
763    }
764}
765
766impl Encodable for Disconnect {
767    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
768        if self.properties == DisconnectProperties::default() {
769            if self.reason_code != DisconnectReasonCode::NormalDisconnect {
770                write_u8(writer, self.reason_code as u8)?;
771            }
772        } else {
773            write_u8(writer, self.reason_code as u8)?;
774            self.properties.encode(writer)?;
775        }
776        Ok(())
777    }
778
779    fn encode_len(&self) -> usize {
780        if self.properties == DisconnectProperties::default() {
781            if self.reason_code == DisconnectReasonCode::NormalDisconnect {
782                0
783            } else {
784                1
785            }
786        } else {
787            1 + self.properties.encode_len()
788        }
789    }
790}
791
792/// Reason code for DISCONNECT packet.
793///
794/// | Dec |  Hex | Reason Code name                       | Sent by       | Description                                                                                    |
795/// |-----|------|----------------------------------------|---------------|------------------------------------------------------------------------------------------------|
796/// |   0 | 0x00 | Normal disconnection                   | Client/Server | Close the connection normally. Do not send the Will Message.                                   |
797/// |   4 | 0x04 | Disconnect with Will Message           | Client        | The Client wishes to disconnect but requires that the Server also publishes its Will Message.  |
798/// | 128 | 0x80 | Unspecified error                      | Client/Server | The Connection is closed but the sender either does not wish to reveal the reason,             |
799/// |     |      |                                        |               | or none of the other Reason Codes apply.                                                       |
800/// | 129 | 0x81 | Malformed Packet                       | Client/Server | The received packet does not conform to this specification.                                    |
801/// | 130 | 0x82 | Protocol Error                         | Client/Server | An unexpected or out of order packet was received.                                             |
802/// | 131 | 0x83 | Implementation specific error          | Client/Server | The packet received is valid but cannot be processed by this implementation.                   |
803/// | 135 | 0x87 | Not authorized                         | Server        | The request is not authorized.                                                                 |
804/// | 137 | 0x89 | Server busy                            | Server        | The Server is busy and cannot continue processing requests from this Client.                   |
805/// | 139 | 0x8B | Server shutting down                   | Server        | The Server is shutting down.                                                                   |
806/// | 141 | 0x8D | Keep Alive timeout                     | Server        | The Connection is closed because no packet has been received for 1.5 times the Keepalive time. |
807/// | 142 | 0x8E | Session taken over                     | Server        | Another Connection using the same ClientID has connected causing this Connection to be closed. |
808/// | 143 | 0x8F | Topic Filter invalid                   | Server        | The Topic Filter is correctly formed, but is not accepted by this Sever.                       |
809/// | 144 | 0x90 | Topic Name invalid                     | Client/Server | The Topic Name is correctly formed, but is not accepted by this Client/Server.                 |
810/// | 147 | 0x93 | Receive Maximum exceeded               | Client/Server | The Client/Server has received more than Receive Maximum publication for                       |
811/// |     |      |                                        |               | which it has not sent PUBACK or PUBCOMP.                                                       |
812/// | 148 | 0x94 | Topic Alias invalid                    | Client/Server | The Client/Server has received a PUBLISH packet containing a Topic Alias                       |
813/// |     |      |                                        |               | which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.        |
814/// | 149 | 0x95 | Packet too large                       | Client/Server | The packet size is greater than Maximum Packet Size for this Client/Server.                    |
815/// | 150 | 0x96 | Message rate too high                  | Client/Server | The received data rate is too high.                                                            |
816/// | 151 | 0x97 | Quota exceeded                         | Client/Server | An implementation or administrative imposed limit has been exceeded.                           |
817/// | 152 | 0x98 | Administrative action                  | Client/Server | The Connection is closed due to an administrative action.                                      |
818/// | 153 | 0x99 | Payload format invalid                 | Client/Server | The payload format does not match the one specified by the Payload Format Indicator.           |
819/// | 154 | 0x9A | Retain not supported                   | Server        | The Server has does not support retained messages.                                             |
820/// | 155 | 0x9B | QoS not supported                      | Server        | The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK.     |
821/// | 156 | 0x9C | Use another server                     | Server        | The Client should temporarily change its Server.                                               |
822/// | 157 | 0x9D | Server moved                           | Server        | The Server is moved and the Client should permanently change its server location.              |
823/// | 158 | 0x9E | Shared Subscriptions not supported     | Server        | The Server does not support Shared Subscriptions.                                              |
824/// | 159 | 0x9F | Connection rate exceeded               | Server        | This connection is closed because the connection rate is too high.                             |
825/// | 160 | 0xA0 | Maximum connect time                   | Server        | The maximum connection time authorized for this connection has been exceeded.                  |
826/// | 161 | 0xA1 | Subscription Identifiers not supported | Server        | The Server does not support Subscription Identifiers; the subscription is not accepted.        |
827/// | 162 | 0xA2 | Wildcard Subscriptions not supported   | Server        | The Server does not support Wildcard Subscriptions; the subscription is not accepted.          |
828#[repr(u8)]
829#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
830#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
831pub enum DisconnectReasonCode {
832    NormalDisconnect = 0x00,
833    DisconnectWithWillMessage = 0x04,
834    UnspecifiedError = 0x80,
835    MalformedPacket = 0x81,
836    ProtocolError = 0x82,
837    ImplementationSpecificError = 0x83,
838    NotAuthorized = 0x87,
839    ServerBusy = 0x89,
840    ServerShuttingDown = 0x8B,
841    KeepAliveTimeout = 0x8D,
842    SessionTakenOver = 0x8E,
843    TopicFilterInvalid = 0x8F,
844    TopicNameInvalid = 0x90,
845    ReceiveMaximumExceeded = 0x93,
846    TopicAliasInvalid = 0x94,
847    PacketTooLarge = 0x95,
848    MessageRateTooHigh = 0x96,
849    QuotaExceeded = 0x97,
850    AdministrativeAction = 0x98,
851    PayloadFormatInvalid = 0x99,
852    RetainNotSupported = 0x9A,
853    QoSNotSupported = 0x9B,
854    UserAnotherServer = 0x9C,
855    ServerMoved = 0x9D,
856    SharedSubscriptionNotSupported = 0x9E,
857    ConnectionRateExceeded = 0x9F,
858    MaximumConnectTime = 0xA0,
859    SubscriptionIdentifiersNotSupported = 0xA1,
860    WildcardSubscriptionsNotSupported = 0xA2,
861}
862
863impl DisconnectReasonCode {
864    pub fn from_u8(value: u8) -> Option<Self> {
865        let code = match value {
866            0x00 => Self::NormalDisconnect,
867            0x04 => Self::DisconnectWithWillMessage,
868            0x80 => Self::UnspecifiedError,
869            0x81 => Self::MalformedPacket,
870            0x82 => Self::ProtocolError,
871            0x83 => Self::ImplementationSpecificError,
872            0x87 => Self::NotAuthorized,
873            0x89 => Self::ServerBusy,
874            0x8B => Self::ServerShuttingDown,
875            0x8D => Self::KeepAliveTimeout,
876            0x8E => Self::SessionTakenOver,
877            0x8F => Self::TopicFilterInvalid,
878            0x90 => Self::TopicNameInvalid,
879            0x93 => Self::ReceiveMaximumExceeded,
880            0x94 => Self::TopicAliasInvalid,
881            0x95 => Self::PacketTooLarge,
882            0x96 => Self::MessageRateTooHigh,
883            0x97 => Self::QuotaExceeded,
884            0x98 => Self::AdministrativeAction,
885            0x99 => Self::PayloadFormatInvalid,
886            0x9A => Self::RetainNotSupported,
887            0x9B => Self::QoSNotSupported,
888            0x9C => Self::UserAnotherServer,
889            0x9D => Self::ServerMoved,
890            0x9E => Self::SharedSubscriptionNotSupported,
891            0x9F => Self::ConnectionRateExceeded,
892            0xA0 => Self::MaximumConnectTime,
893            0xA1 => Self::SubscriptionIdentifiersNotSupported,
894            0xA2 => Self::WildcardSubscriptionsNotSupported,
895            _ => return None,
896        };
897        Some(code)
898    }
899}
900
901/// Property list for DISCONNECT packet.
902#[derive(Debug, Clone, PartialEq, Eq, Default)]
903#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
904pub struct DisconnectProperties {
905    pub session_expiry_interval: Option<u32>,
906    pub reason_string: Option<Arc<String>>,
907    pub user_properties: Vec<UserProperty>,
908    pub server_reference: Option<Arc<String>>,
909}
910
911impl DisconnectProperties {
912    pub async fn decode_async<T: AsyncRead + Unpin>(
913        reader: &mut T,
914        packet_type: PacketType,
915    ) -> Result<Self, ErrorV5> {
916        let mut properties = DisconnectProperties::default();
917        decode_properties!(
918            packet_type,
919            properties,
920            reader,
921            SessionExpiryInterval,
922            ReasonString,
923            ServerReference,
924        );
925        Ok(properties)
926    }
927}
928
929impl Encodable for DisconnectProperties {
930    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
931        encode_properties!(
932            self,
933            writer,
934            SessionExpiryInterval,
935            ReasonString,
936            ServerReference,
937        );
938        Ok(())
939    }
940
941    fn encode_len(&self) -> usize {
942        let mut len = 0;
943        encode_properties_len!(
944            self,
945            len,
946            SessionExpiryInterval,
947            ReasonString,
948            ServerReference,
949        );
950        len
951    }
952}
953
954/// Body type of AUTH packet .
955#[derive(Debug, Clone, PartialEq, Eq)]
956#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
957pub struct Auth {
958    pub reason_code: AuthReasonCode,
959    pub properties: AuthProperties,
960}
961
962impl Auth {
963    pub fn new(reason_code: AuthReasonCode) -> Self {
964        Auth {
965            reason_code,
966            properties: AuthProperties::default(),
967        }
968    }
969
970    pub fn new_success() -> Self {
971        Self::new(AuthReasonCode::Success)
972    }
973
974    pub async fn decode_async<T: AsyncRead + Unpin>(
975        reader: &mut T,
976        header: Header,
977    ) -> Result<Self, ErrorV5> {
978        let auth = if header.remaining_len == 0 {
979            Auth {
980                reason_code: AuthReasonCode::Success,
981                properties: AuthProperties::default(),
982            }
983        } else {
984            let reason_byte = read_u8(reader).await?;
985            let reason_code = AuthReasonCode::from_u8(reason_byte)
986                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
987            let properties = AuthProperties::decode_async(reader, header.typ).await?;
988            Auth {
989                reason_code,
990                properties,
991            }
992        };
993        Ok(auth)
994    }
995}
996
997impl Encodable for Auth {
998    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
999        if self.reason_code != AuthReasonCode::Success
1000            || self.properties != AuthProperties::default()
1001        {
1002            write_u8(writer, self.reason_code as u8)?;
1003            self.properties.encode(writer)?;
1004        }
1005        Ok(())
1006    }
1007
1008    fn encode_len(&self) -> usize {
1009        if self.reason_code == AuthReasonCode::Success
1010            && self.properties == AuthProperties::default()
1011        {
1012            0
1013        } else {
1014            1 + self.properties.encode_len()
1015        }
1016    }
1017}
1018
1019/// Reason code for AUTH packet.
1020///
1021/// | Dec |  Hex | Reason Code name        | Sent by       | Description                                   |
1022/// |-----|------|-------------------------|---------------|-----------------------------------------------|
1023/// |   0 | 0x00 | Success                 | Server        | Authentication is successful                  |
1024/// |  24 | 0x18 | Continue authentication | Client/Server | Continue the authentication with another step |
1025/// |  25 | 0x19 | Re-authenticate         | Client        | Initiate a re-authentication                  |
1026#[repr(u8)]
1027#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1028#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
1029pub enum AuthReasonCode {
1030    Success = 0x00,
1031    ContinueAuthentication = 0x18,
1032    ReAuthentication = 0x19,
1033}
1034
1035impl AuthReasonCode {
1036    pub fn from_u8(value: u8) -> Option<Self> {
1037        let code = match value {
1038            0x00 => Self::Success,
1039            0x18 => Self::ContinueAuthentication,
1040            0x19 => Self::ReAuthentication,
1041            _ => return None,
1042        };
1043        Some(code)
1044    }
1045}
1046
1047/// Property list for AUTH packet.
1048#[derive(Debug, Clone, PartialEq, Eq, Default)]
1049pub struct AuthProperties {
1050    pub auth_method: Option<Arc<String>>,
1051    pub auth_data: Option<Bytes>,
1052    pub reason_string: Option<Arc<String>>,
1053    pub user_properties: Vec<UserProperty>,
1054}
1055
1056#[cfg(feature = "arbitrary")]
1057impl<'a> arbitrary::Arbitrary<'a> for AuthProperties {
1058    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1059        Ok(AuthProperties {
1060            auth_method: u.arbitrary()?,
1061            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
1062            reason_string: u.arbitrary()?,
1063            user_properties: u.arbitrary()?,
1064        })
1065    }
1066}
1067
1068impl AuthProperties {
1069    pub async fn decode_async<T: AsyncRead + Unpin>(
1070        reader: &mut T,
1071        packet_type: PacketType,
1072    ) -> Result<Self, ErrorV5> {
1073        let mut properties = AuthProperties::default();
1074        decode_properties!(
1075            packet_type,
1076            properties,
1077            reader,
1078            AuthenticationMethod,
1079            AuthenticationData,
1080            ReasonString,
1081        );
1082        Ok(properties)
1083    }
1084}
1085
1086impl Encodable for AuthProperties {
1087    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
1088        encode_properties!(
1089            self,
1090            writer,
1091            AuthenticationMethod,
1092            AuthenticationData,
1093            ReasonString,
1094        );
1095        Ok(())
1096    }
1097
1098    fn encode_len(&self) -> usize {
1099        let mut len = 0;
1100        encode_properties_len!(
1101            self,
1102            len,
1103            AuthenticationMethod,
1104            AuthenticationData,
1105            ReasonString,
1106        );
1107        len
1108    }
1109}