mqtt_proto/v5/
connect.rs

1use std::convert::TryFrom;
2use std::io;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use simdutf8::basic::from_utf8;
7use tokio::io::{AsyncRead, AsyncReadExt};
8
9use super::{
10    decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
11    UserProperty,
12};
13use crate::{
14    read_bytes, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, Encodable, Error,
15    Protocol, QoS, TopicName,
16};
17
18/// Body type of CONNECT packet.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct Connect {
21    /// The [protocol version](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901036).
22    pub protocol: Protocol,
23
24    /// [Clean start] flag. This value specifies whether the Connection starts a new Session or is a continuation of an existing Session.
25    ///
26    /// [Clean start]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901039
27    pub clean_start: bool,
28
29    /// The [keep alive]. A time interval measured in seconds. It is the
30    /// maximum time interval that is permitted to elapse between the point at
31    /// which the Client finishes transmitting one MQTT Control Packet and the
32    /// point it starts sending the next.
33    ///
34    /// [keep alive]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901045
35    pub keep_alive: u16,
36
37    /// Properties
38    pub properties: ConnectProperties,
39
40    /// The [client identifier] (ClientID).
41    ///
42    /// [client identifier]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
43    pub client_id: Arc<String>,
44
45    /// The [will] message.
46    ///
47    /// [will]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901060
48    pub last_will: Option<LastWill>,
49
50    /// The [user name](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901071).
51    pub username: Option<Arc<String>>,
52
53    /// The [password](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901072).
54    pub password: Option<Bytes>,
55}
56#[cfg(feature = "arbitrary")]
57impl<'a> arbitrary::Arbitrary<'a> for Connect {
58    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
59        Ok(Connect {
60            protocol: u.arbitrary()?,
61            clean_start: u.arbitrary()?,
62            keep_alive: u.arbitrary()?,
63            properties: u.arbitrary()?,
64            client_id: u.arbitrary()?,
65            last_will: u.arbitrary()?,
66            username: u.arbitrary()?,
67            password: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
68        })
69    }
70}
71
72impl Connect {
73    pub fn new(client_id: Arc<String>, keep_alive: u16) -> Self {
74        Connect {
75            protocol: Protocol::V500,
76            clean_start: true,
77            keep_alive,
78            properties: ConnectProperties::default(),
79            client_id,
80            last_will: None,
81            username: None,
82            password: None,
83        }
84    }
85
86    pub async fn decode_async<T: AsyncRead + Unpin>(
87        reader: &mut T,
88        header: Header,
89    ) -> Result<Self, ErrorV5> {
90        let protocol = Protocol::decode_async(reader).await?;
91        Self::decode_with_protocol(reader, header, protocol).await
92    }
93
94    #[inline]
95    pub async fn decode_with_protocol<T: AsyncRead + Unpin>(
96        reader: &mut T,
97        header: Header,
98        protocol: Protocol,
99    ) -> Result<Self, ErrorV5> {
100        if protocol != Protocol::V500 {
101            return Err(Error::UnexpectedProtocol(protocol).into());
102        }
103        let connect_flags: u8 = read_u8(reader).await?;
104        if connect_flags & 1 != 0 {
105            return Err(Error::InvalidConnectFlags(connect_flags).into());
106        }
107        let keep_alive = read_u16(reader).await?;
108
109        // FIXME: check remaining length
110
111        let properties = ConnectProperties::decode_async(reader, header.typ).await?;
112        let client_id = Arc::new(read_string(reader).await?);
113        let last_will = if connect_flags & 0b100 != 0 {
114            let qos = QoS::from_u8((connect_flags & 0b11000) >> 3)?;
115            let retain = (connect_flags & 0b00100000) != 0;
116            Some(LastWill::decode_async(reader, qos, retain).await?)
117        } else if connect_flags & 0b11000 != 0 {
118            return Err(Error::InvalidConnectFlags(connect_flags).into());
119        } else {
120            None
121        };
122        let username = if connect_flags & 0b10000000 != 0 {
123            Some(Arc::new(read_string(reader).await?))
124        } else {
125            None
126        };
127        let password = if connect_flags & 0b01000000 != 0 {
128            Some(Bytes::from(read_bytes(reader).await?))
129        } else {
130            None
131        };
132        let clean_start = (connect_flags & 0b10) != 0;
133
134        Ok(Connect {
135            protocol,
136            clean_start,
137            properties,
138            keep_alive,
139            client_id,
140            last_will,
141            username,
142            password,
143        })
144    }
145}
146
147impl Encodable for Connect {
148    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
149        let mut connect_flags: u8 = 0b00000000;
150        if self.clean_start {
151            connect_flags |= 0b10;
152        }
153        if self.username.is_some() {
154            connect_flags |= 0b10000000;
155        }
156        if self.password.is_some() {
157            connect_flags |= 0b01000000;
158        }
159        if let Some(last_will) = self.last_will.as_ref() {
160            connect_flags |= 0b00000100;
161            connect_flags |= (last_will.qos as u8) << 3;
162            if last_will.retain {
163                connect_flags |= 0b00100000;
164            }
165        }
166
167        self.protocol.encode(writer)?;
168        write_u8(writer, connect_flags)?;
169        write_u16(writer, self.keep_alive)?;
170        self.properties.encode(writer)?;
171        write_bytes(writer, self.client_id.as_bytes())?;
172        if let Some(last_will) = self.last_will.as_ref() {
173            last_will.encode(writer)?;
174        }
175        if let Some(username) = self.username.as_ref() {
176            write_bytes(writer, username.as_bytes())?;
177        }
178        if let Some(password) = self.password.as_ref() {
179            write_bytes(writer, password.as_ref())?;
180        }
181        Ok(())
182    }
183
184    fn encode_len(&self) -> usize {
185        let mut len = self.protocol.encode_len();
186        // flags + keep-alive
187        len += 1 + 2;
188        // properties
189        len += self.properties.encode_len();
190        // client identifier
191        len += 2 + self.client_id.len();
192        if let Some(last_will) = self.last_will.as_ref() {
193            len += last_will.encode_len();
194        }
195        if let Some(username) = self.username.as_ref() {
196            len += 2 + username.len();
197        }
198        if let Some(password) = self.password.as_ref() {
199            len += 2 + password.len();
200        }
201        len
202    }
203}
204
205/// Property list for CONNECT packet.
206#[derive(Debug, Clone, PartialEq, Eq, Default)]
207pub struct ConnectProperties {
208    /// Session Expiry Interval
209    pub session_expiry_interval: Option<u32>,
210    /// Receive Maximum
211    pub receive_max: Option<u16>,
212    /// Maximum Packet Size
213    pub max_packet_size: Option<u32>,
214    /// Topic Alias Maximum
215    pub topic_alias_max: Option<u16>,
216    /// Request Response Information. If absent the default value should be false.
217    pub request_response_info: Option<bool>,
218    /// Request Problem Information. If absent the default value should be true.
219    pub request_problem_info: Option<bool>,
220    /// User Property
221    pub user_properties: Vec<UserProperty>,
222    /// Authentication Method
223    pub auth_method: Option<Arc<String>>,
224    /// Authentication Data
225    pub auth_data: Option<Bytes>,
226}
227
228#[cfg(feature = "arbitrary")]
229impl<'a> arbitrary::Arbitrary<'a> for ConnectProperties {
230    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
231        Ok(ConnectProperties {
232            session_expiry_interval: u.arbitrary()?,
233            receive_max: u.arbitrary()?,
234            max_packet_size: u.arbitrary()?,
235            topic_alias_max: u.arbitrary()?,
236            request_response_info: u.arbitrary()?,
237            request_problem_info: u.arbitrary()?,
238            user_properties: u.arbitrary()?,
239            auth_method: u.arbitrary()?,
240            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
241        })
242    }
243}
244
245impl ConnectProperties {
246    pub async fn decode_async<T: AsyncRead + Unpin>(
247        reader: &mut T,
248        packet_type: PacketType,
249    ) -> Result<Self, ErrorV5> {
250        let mut properties = ConnectProperties::default();
251        decode_properties!(
252            packet_type,
253            properties,
254            reader,
255            SessionExpiryInterval,
256            ReceiveMaximum,
257            MaximumPacketSize,
258            TopicAliasMaximum,
259            RequestResponseInformation,
260            RequestProblemInformation,
261            AuthenticationMethod,
262            AuthenticationData,
263        );
264        Ok(properties)
265    }
266}
267
268impl Encodable for ConnectProperties {
269    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
270        encode_properties!(
271            self,
272            writer,
273            SessionExpiryInterval,
274            ReceiveMaximum,
275            MaximumPacketSize,
276            TopicAliasMaximum,
277            RequestResponseInformation,
278            RequestProblemInformation,
279            AuthenticationMethod,
280            AuthenticationData,
281        );
282        Ok(())
283    }
284
285    fn encode_len(&self) -> usize {
286        let mut len = 0;
287        encode_properties_len!(
288            self,
289            len,
290            SessionExpiryInterval,
291            ReceiveMaximum,
292            MaximumPacketSize,
293            TopicAliasMaximum,
294            RequestResponseInformation,
295            RequestProblemInformation,
296            AuthenticationMethod,
297            AuthenticationData,
298        );
299        len
300    }
301}
302
303/// The will message for CONNECT packet.
304#[derive(Debug, Clone, PartialEq, Eq)]
305pub struct LastWill {
306    pub qos: QoS,
307    pub retain: bool,
308    pub topic_name: TopicName,
309    pub payload: Bytes,
310    pub properties: WillProperties,
311}
312
313#[cfg(feature = "arbitrary")]
314impl<'a> arbitrary::Arbitrary<'a> for LastWill {
315    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
316        Ok(LastWill {
317            qos: u.arbitrary()?,
318            retain: u.arbitrary()?,
319            properties: u.arbitrary()?,
320            topic_name: u.arbitrary()?,
321            payload: Bytes::from(Vec::<u8>::arbitrary(u)?),
322        })
323    }
324}
325
326impl LastWill {
327    pub fn new(qos: QoS, topic_name: TopicName, payload: Bytes) -> Self {
328        LastWill {
329            qos,
330            retain: false,
331            topic_name,
332            payload,
333            properties: WillProperties::default(),
334        }
335    }
336
337    pub async fn decode_async<T: AsyncRead + Unpin>(
338        reader: &mut T,
339        qos: QoS,
340        retain: bool,
341    ) -> Result<Self, ErrorV5> {
342        let properties = WillProperties::decode_async(reader).await?;
343        let topic_name = TopicName::try_from(read_string(reader).await?)?;
344        let payload = read_bytes(reader).await?;
345        if properties.payload_is_utf8 == Some(true) && from_utf8(&payload).is_err() {
346            return Err(ErrorV5::InvalidPayloadFormat);
347        }
348        Ok(LastWill {
349            qos,
350            retain,
351            properties,
352            topic_name,
353            payload: Bytes::from(payload),
354        })
355    }
356}
357
358impl Encodable for LastWill {
359    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
360        self.properties.encode(writer)?;
361        write_bytes(writer, self.topic_name.as_bytes())?;
362        write_bytes(writer, self.payload.as_ref())?;
363        Ok(())
364    }
365
366    fn encode_len(&self) -> usize {
367        let mut len = self.properties.encode_len();
368        len += 4;
369        len += self.topic_name.len();
370        len += self.payload.len();
371        len
372    }
373}
374
375/// Property list for will message.
376#[derive(Debug, Clone, PartialEq, Eq, Default)]
377pub struct WillProperties {
378    pub delay_interval: Option<u32>,
379    pub payload_is_utf8: Option<bool>,
380    pub message_expiry_interval: Option<u32>,
381    pub content_type: Option<Arc<String>>,
382    pub response_topic: Option<TopicName>,
383    pub correlation_data: Option<Bytes>,
384    pub user_properties: Vec<UserProperty>,
385}
386#[cfg(feature = "arbitrary")]
387impl<'a> arbitrary::Arbitrary<'a> for WillProperties {
388    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
389        Ok(WillProperties {
390            delay_interval: u.arbitrary()?,
391            payload_is_utf8: u.arbitrary()?,
392            message_expiry_interval: u.arbitrary()?,
393            content_type: u.arbitrary()?,
394            response_topic: u.arbitrary()?,
395            correlation_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
396            user_properties: u.arbitrary()?,
397        })
398    }
399}
400
401impl WillProperties {
402    pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
403        let mut properties = WillProperties::default();
404        decode_properties!(
405            LastWill,
406            properties,
407            reader,
408            WillDelayInterval,
409            PayloadFormatIndicator,
410            MessageExpiryInterval,
411            ContentType,
412            ResponseTopic,
413            CorrelationData,
414        );
415        Ok(properties)
416    }
417}
418
419impl Encodable for WillProperties {
420    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
421        encode_properties!(
422            self,
423            writer,
424            WillDelayInterval,
425            PayloadFormatIndicator,
426            MessageExpiryInterval,
427            ContentType,
428            ResponseTopic,
429            CorrelationData,
430        );
431        Ok(())
432    }
433
434    fn encode_len(&self) -> usize {
435        let mut len = 0;
436        encode_properties_len!(
437            self,
438            len,
439            WillDelayInterval,
440            PayloadFormatIndicator,
441            MessageExpiryInterval,
442            ContentType,
443            ResponseTopic,
444            CorrelationData,
445        );
446        len
447    }
448}
449
450/// Body type of CONNACK packet.
451#[derive(Debug, Clone, PartialEq, Eq)]
452#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
453pub struct Connack {
454    pub session_present: bool,
455    pub reason_code: ConnectReasonCode,
456    pub properties: ConnackProperties,
457}
458
459impl Connack {
460    pub fn new(session_present: bool, reason_code: ConnectReasonCode) -> Self {
461        Connack {
462            session_present,
463            reason_code,
464            properties: ConnackProperties::default(),
465        }
466    }
467    pub async fn decode_async<T: AsyncRead + Unpin>(
468        reader: &mut T,
469        header: Header,
470    ) -> Result<Self, ErrorV5> {
471        let mut payload = [0u8; 2];
472        reader
473            .read_exact(&mut payload)
474            .await
475            .map_err(|err| Error::IoError(err.kind(), err.to_string()))?;
476        let session_present = match payload[0] {
477            0 => false,
478            1 => true,
479            _ => return Err(Error::InvalidConnackFlags(payload[0]).into()),
480        };
481        let reason_code = ConnectReasonCode::from_u8(payload[1])
482            .ok_or(ErrorV5::InvalidReasonCode(header.typ, payload[1]))?;
483        let properties = ConnackProperties::decode_async(reader, header.typ).await?;
484        Ok(Connack {
485            session_present,
486            reason_code,
487            properties,
488        })
489    }
490}
491
492impl Encodable for Connack {
493    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
494        write_u8(writer, u8::from(self.session_present))?;
495        write_u8(writer, self.reason_code as u8)?;
496        self.properties.encode(writer)?;
497        Ok(())
498    }
499
500    fn encode_len(&self) -> usize {
501        2 + self.properties.encode_len()
502    }
503}
504
505/// Reason code for CONNECT packet.
506///
507/// | Dec |  Hex | Reason Code name              | Description                                                                                              |
508/// |-----|------|-------------------------------|----------------------------------------------------------------------------------------------------------|
509/// |   0 | 0x00 | Success                       | The Connection is accepted.                                                                              |
510/// | 128 | 0x80 | Unspecified error             | The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply.  |
511/// | 129 | 0x81 | Malformed Packet              | Data within the CONNECT packet could not be correctly parsed.                                            |
512/// | 130 | 0x82 | Protocol Error                | Data in the CONNECT packet does not conform to this specification.                                       |
513/// | 131 | 0x83 | Implementation specific error | The CONNECT is valid but is not accepted by this Server.                                                 |
514/// | 132 | 0x84 | Unsupported Protocol Version  | The Server does not support the version of the MQTT protocol requested by the Client.                    |
515/// | 133 | 0x85 | Client Identifier not valid   | The Client Identifier is a valid string but is not allowed by the Server.                                |
516/// | 134 | 0x86 | Bad User Name or Password     | The Server does not accept the User Name or Password specified by the Client                             |
517/// | 135 | 0x87 | Not authorized                | The Client is not authorized to connect.                                                                 |
518/// | 136 | 0x88 | Server unavailable            | The MQTT Server is not available.                                                                        |
519/// | 137 | 0x89 | Server busy                   | The Server is busy. Try again later.                                                                     |
520/// | 138 | 0x8A | Banned                        | This Client has been banned by administrative action. Contact the server administrator.                  |
521/// | 140 | 0x8C | Bad authentication method     | The authentication method is not supported or does not match the authentication method currently in use. |
522/// | 144 | 0x90 | Topic Name invalid            | The Will Topic Name is not malformed, but is not accepted by this Server.                                |
523/// | 149 | 0x95 | Packet too large              | The CONNECT packet exceeded the maximum permissible size.                                                |
524/// | 151 | 0x97 | Quota exceeded                | An implementation or administrative imposed limit has been exceeded.                                     |
525/// | 153 | 0x99 | Payload format invalid        | The Will Payload does not match the specified Payload Format Indicator.                                  |
526/// | 154 | 0x9A | Retain not supported          | The Server does not support retained messages, and Will Retain was set to 1.                             |
527/// | 155 | 0x9B | QoS not supported             | The Server does not support the QoS set in Will QoS.                                                     |
528/// | 156 | 0x9C | Use another server            | The Client should temporarily use another server.                                                        |
529/// | 157 | 0x9D | Server moved                  | The Client should permanently use another server.                                                        |
530/// | 159 | 0x9F | Connection rate exceeded      | The connection rate limit has been exceeded.                                                             |
531#[repr(u8)]
532#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
533#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
534pub enum ConnectReasonCode {
535    Success = 0x00,
536    UnspecifiedError = 0x80,
537    MalformedPacket = 0x81,
538    ProtocolError = 0x82,
539    ImplementationSpecificError = 0x83,
540    UnsupportedProtocolVersion = 0x84,
541    ClientIdentifierNotValid = 0x85,
542    BadUserNameOrPassword = 0x86,
543    NotAuthorized = 0x87,
544    ServerUnavailable = 0x88,
545    ServerBusy = 0x89,
546    Banned = 0x8A,
547    BadAuthMethod = 0x8C,
548    TopicNameInvalid = 0x90,
549    PacketTooLarge = 0x95,
550    QuotaExceeded = 0x97,
551    PayloadFormatInvalid = 0x99,
552    RetainNotSupported = 0x9A,
553    QoSNotSupported = 0x9B,
554    UseAnotherServer = 0x9C,
555    ServerMoved = 0x9D,
556    ConnectionRateExceeded = 0x9F,
557}
558
559impl ConnectReasonCode {
560    pub fn from_u8(value: u8) -> Option<ConnectReasonCode> {
561        let code = match value {
562            0x00 => ConnectReasonCode::Success,
563            0x80 => ConnectReasonCode::UnspecifiedError,
564            0x81 => ConnectReasonCode::MalformedPacket,
565            0x82 => ConnectReasonCode::ProtocolError,
566            0x83 => ConnectReasonCode::ImplementationSpecificError,
567            0x84 => ConnectReasonCode::UnsupportedProtocolVersion,
568            0x85 => ConnectReasonCode::ClientIdentifierNotValid,
569            0x86 => ConnectReasonCode::BadUserNameOrPassword,
570            0x87 => ConnectReasonCode::NotAuthorized,
571            0x88 => ConnectReasonCode::ServerUnavailable,
572            0x89 => ConnectReasonCode::ServerBusy,
573            0x8A => ConnectReasonCode::Banned,
574            0x8C => ConnectReasonCode::BadAuthMethod,
575            0x90 => ConnectReasonCode::TopicNameInvalid,
576            0x95 => ConnectReasonCode::PacketTooLarge,
577            0x97 => ConnectReasonCode::QuotaExceeded,
578            0x99 => ConnectReasonCode::PayloadFormatInvalid,
579            0x9A => ConnectReasonCode::RetainNotSupported,
580            0x9B => ConnectReasonCode::QoSNotSupported,
581            0x9C => ConnectReasonCode::UseAnotherServer,
582            0x9D => ConnectReasonCode::ServerMoved,
583            0x9F => ConnectReasonCode::ConnectionRateExceeded,
584            _ => return None,
585        };
586        Some(code)
587    }
588}
589
590/// Property list for CONNACK packet.
591#[derive(Debug, Clone, PartialEq, Eq, Default)]
592pub struct ConnackProperties {
593    pub session_expiry_interval: Option<u32>,
594    pub receive_max: Option<u16>,
595    pub max_qos: Option<QoS>,
596    pub retain_available: Option<bool>,
597    pub max_packet_size: Option<u32>,
598    pub assigned_client_id: Option<Arc<String>>,
599    pub topic_alias_max: Option<u16>,
600    pub reason_string: Option<Arc<String>>,
601    pub user_properties: Vec<UserProperty>,
602    pub wildcard_subscription_available: Option<bool>,
603    pub subscription_id_available: Option<bool>,
604    pub shared_subscription_available: Option<bool>,
605    pub server_keep_alive: Option<u16>,
606    pub response_info: Option<Arc<String>>,
607    pub server_reference: Option<Arc<String>>,
608    pub auth_method: Option<Arc<String>>,
609    pub auth_data: Option<Bytes>,
610}
611
612#[cfg(feature = "arbitrary")]
613impl<'a> arbitrary::Arbitrary<'a> for ConnackProperties {
614    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
615        Ok(ConnackProperties {
616            session_expiry_interval: u.arbitrary()?,
617            receive_max: u.arbitrary()?,
618            max_qos: u.arbitrary()?,
619            retain_available: u.arbitrary()?,
620            max_packet_size: u.arbitrary()?,
621            assigned_client_id: u.arbitrary()?,
622            topic_alias_max: u.arbitrary()?,
623            reason_string: u.arbitrary()?,
624            user_properties: u.arbitrary()?,
625            wildcard_subscription_available: u.arbitrary()?,
626            subscription_id_available: u.arbitrary()?,
627            shared_subscription_available: u.arbitrary()?,
628            server_keep_alive: u.arbitrary()?,
629            response_info: u.arbitrary()?,
630            server_reference: u.arbitrary()?,
631            auth_method: u.arbitrary()?,
632            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
633        })
634    }
635}
636
637impl ConnackProperties {
638    pub async fn decode_async<T: AsyncRead + Unpin>(
639        reader: &mut T,
640        packet_type: PacketType,
641    ) -> Result<Self, ErrorV5> {
642        let mut properties = ConnackProperties::default();
643        decode_properties!(
644            packet_type,
645            properties,
646            reader,
647            SessionExpiryInterval,
648            ReceiveMaximum,
649            MaximumQoS,
650            RetainAvailable,
651            MaximumPacketSize,
652            AssignedClientIdentifier,
653            TopicAliasMaximum,
654            ReasonString,
655            WildcardSubscriptionAvailable,
656            SubscriptionIdentifierAvailable,
657            SharedSubscriptionAvailable,
658            ServerKeepAlive,
659            ResponseInformation,
660            ServerReference,
661            AuthenticationMethod,
662            AuthenticationData,
663        );
664        Ok(properties)
665    }
666}
667
668impl Encodable for ConnackProperties {
669    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
670        encode_properties!(
671            self,
672            writer,
673            SessionExpiryInterval,
674            ReceiveMaximum,
675            MaximumQoS,
676            RetainAvailable,
677            MaximumPacketSize,
678            AssignedClientIdentifier,
679            TopicAliasMaximum,
680            ReasonString,
681            WildcardSubscriptionAvailable,
682            SubscriptionIdentifierAvailable,
683            SharedSubscriptionAvailable,
684            ServerKeepAlive,
685            ResponseInformation,
686            ServerReference,
687            AuthenticationMethod,
688            AuthenticationData,
689        );
690        Ok(())
691    }
692
693    fn encode_len(&self) -> usize {
694        let mut len = 0;
695        encode_properties_len!(
696            self,
697            len,
698            SessionExpiryInterval,
699            ReceiveMaximum,
700            MaximumQoS,
701            RetainAvailable,
702            MaximumPacketSize,
703            AssignedClientIdentifier,
704            TopicAliasMaximum,
705            ReasonString,
706            WildcardSubscriptionAvailable,
707            SubscriptionIdentifierAvailable,
708            SharedSubscriptionAvailable,
709            ServerKeepAlive,
710            ResponseInformation,
711            ServerReference,
712            AuthenticationMethod,
713            AuthenticationData,
714        );
715        len
716    }
717}
718
719/// Body type for DISCONNECT packet.
720#[derive(Debug, Clone, PartialEq, Eq)]
721#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
722pub struct Disconnect {
723    pub reason_code: DisconnectReasonCode,
724    pub properties: DisconnectProperties,
725}
726
727impl Disconnect {
728    pub fn new(reason_code: DisconnectReasonCode) -> Self {
729        Disconnect {
730            reason_code,
731            properties: DisconnectProperties::default(),
732        }
733    }
734
735    pub fn new_normal() -> Self {
736        Self::new(DisconnectReasonCode::NormalDisconnect)
737    }
738
739    pub async fn decode_async<T: AsyncRead + Unpin>(
740        reader: &mut T,
741        header: Header,
742    ) -> Result<Self, ErrorV5> {
743        let (reason_code, properties) = if header.remaining_len == 0 {
744            (DisconnectReasonCode::NormalDisconnect, Default::default())
745        } else if header.remaining_len == 1 {
746            let reason_byte = read_u8(reader).await?;
747            let reason_code = DisconnectReasonCode::from_u8(reason_byte)
748                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
749            (reason_code, Default::default())
750        } else {
751            let reason_byte = read_u8(reader).await?;
752            let reason_code = DisconnectReasonCode::from_u8(reason_byte)
753                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
754            let properties = DisconnectProperties::decode_async(reader, header.typ).await?;
755            (reason_code, properties)
756        };
757        Ok(Disconnect {
758            reason_code,
759            properties,
760        })
761    }
762}
763
764impl Encodable for Disconnect {
765    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
766        if self.properties == DisconnectProperties::default() {
767            if self.reason_code != DisconnectReasonCode::NormalDisconnect {
768                write_u8(writer, self.reason_code as u8)?;
769            }
770        } else {
771            write_u8(writer, self.reason_code as u8)?;
772            self.properties.encode(writer)?;
773        }
774        Ok(())
775    }
776
777    fn encode_len(&self) -> usize {
778        if self.properties == DisconnectProperties::default() {
779            if self.reason_code == DisconnectReasonCode::NormalDisconnect {
780                0
781            } else {
782                1
783            }
784        } else {
785            1 + self.properties.encode_len()
786        }
787    }
788}
789
790/// Reason code for DISCONNECT packet.
791///
792/// | Dec |  Hex | Reason Code name                       | Sent by       | Description                                                                                    |
793/// |-----|------|----------------------------------------|---------------|------------------------------------------------------------------------------------------------|
794/// |   0 | 0x00 | Normal disconnection                   | Client/Server | Close the connection normally. Do not send the Will Message.                                   |
795/// |   4 | 0x04 | Disconnect with Will Message           | Client        | The Client wishes to disconnect but requires that the Server also publishes its Will Message.  |
796/// | 128 | 0x80 | Unspecified error                      | Client/Server | The Connection is closed but the sender either does not wish to reveal the reason,             |
797/// |     |      |                                        |               | or none of the other Reason Codes apply.                                                       |
798/// | 129 | 0x81 | Malformed Packet                       | Client/Server | The received packet does not conform to this specification.                                    |
799/// | 130 | 0x82 | Protocol Error                         | Client/Server | An unexpected or out of order packet was received.                                             |
800/// | 131 | 0x83 | Implementation specific error          | Client/Server | The packet received is valid but cannot be processed by this implementation.                   |
801/// | 135 | 0x87 | Not authorized                         | Server        | The request is not authorized.                                                                 |
802/// | 137 | 0x89 | Server busy                            | Server        | The Server is busy and cannot continue processing requests from this Client.                   |
803/// | 139 | 0x8B | Server shutting down                   | Server        | The Server is shutting down.                                                                   |
804/// | 141 | 0x8D | Keep Alive timeout                     | Server        | The Connection is closed because no packet has been received for 1.5 times the Keepalive time. |
805/// | 142 | 0x8E | Session taken over                     | Server        | Another Connection using the same ClientID has connected causing this Connection to be closed. |
806/// | 143 | 0x8F | Topic Filter invalid                   | Server        | The Topic Filter is correctly formed, but is not accepted by this Sever.                       |
807/// | 144 | 0x90 | Topic Name invalid                     | Client/Server | The Topic Name is correctly formed, but is not accepted by this Client/Server.                 |
808/// | 147 | 0x93 | Receive Maximum exceeded               | Client/Server | The Client/Server has received more than Receive Maximum publication for                       |
809/// |     |      |                                        |               | which it has not sent PUBACK or PUBCOMP.                                                       |
810/// | 148 | 0x94 | Topic Alias invalid                    | Client/Server | The Client/Server has received a PUBLISH packet containing a Topic Alias                       |
811/// |     |      |                                        |               | which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.        |
812/// | 149 | 0x95 | Packet too large                       | Client/Server | The packet size is greater than Maximum Packet Size for this Client/Server.                    |
813/// | 150 | 0x96 | Message rate too high                  | Client/Server | The received data rate is too high.                                                            |
814/// | 151 | 0x97 | Quota exceeded                         | Client/Server | An implementation or administrative imposed limit has been exceeded.                           |
815/// | 152 | 0x98 | Administrative action                  | Client/Server | The Connection is closed due to an administrative action.                                      |
816/// | 153 | 0x99 | Payload format invalid                 | Client/Server | The payload format does not match the one specified by the Payload Format Indicator.           |
817/// | 154 | 0x9A | Retain not supported                   | Server        | The Server has does not support retained messages.                                             |
818/// | 155 | 0x9B | QoS not supported                      | Server        | The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK.     |
819/// | 156 | 0x9C | Use another server                     | Server        | The Client should temporarily change its Server.                                               |
820/// | 157 | 0x9D | Server moved                           | Server        | The Server is moved and the Client should permanently change its server location.              |
821/// | 158 | 0x9E | Shared Subscriptions not supported     | Server        | The Server does not support Shared Subscriptions.                                              |
822/// | 159 | 0x9F | Connection rate exceeded               | Server        | This connection is closed because the connection rate is too high.                             |
823/// | 160 | 0xA0 | Maximum connect time                   | Server        | The maximum connection time authorized for this connection has been exceeded.                  |
824/// | 161 | 0xA1 | Subscription Identifiers not supported | Server        | The Server does not support Subscription Identifiers; the subscription is not accepted.        |
825/// | 162 | 0xA2 | Wildcard Subscriptions not supported   | Server        | The Server does not support Wildcard Subscriptions; the subscription is not accepted.          |
826#[repr(u8)]
827#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
828#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
829pub enum DisconnectReasonCode {
830    NormalDisconnect = 0x00,
831    DisconnectWithWillMessage = 0x04,
832    UnspecifiedError = 0x80,
833    MalformedPacket = 0x81,
834    ProtocolError = 0x82,
835    ImplementationSpecificError = 0x83,
836    NotAuthorized = 0x87,
837    ServerBusy = 0x89,
838    ServerShuttingDown = 0x8B,
839    KeepAliveTimeout = 0x8D,
840    SessionTakenOver = 0x8E,
841    TopicFilterInvalid = 0x8F,
842    TopicNameInvalid = 0x90,
843    ReceiveMaximumExceeded = 0x93,
844    TopicAliasInvalid = 0x94,
845    PacketTooLarge = 0x95,
846    MessageRateTooHigh = 0x96,
847    QuotaExceeded = 0x97,
848    AdministrativeAction = 0x98,
849    PayloadFormatInvalid = 0x99,
850    RetainNotSupported = 0x9A,
851    QoSNotSupported = 0x9B,
852    UserAnotherServer = 0x9C,
853    ServerMoved = 0x9D,
854    SharedSubscriptionNotSupported = 0x9E,
855    ConnectionRateExceeded = 0x9F,
856    MaximumConnectTime = 0xA0,
857    SubscriptionIdentifiersNotSupported = 0xA1,
858    WildcardSubscriptionsNotSupported = 0xA2,
859}
860
861impl DisconnectReasonCode {
862    pub fn from_u8(value: u8) -> Option<Self> {
863        let code = match value {
864            0x00 => Self::NormalDisconnect,
865            0x04 => Self::DisconnectWithWillMessage,
866            0x80 => Self::UnspecifiedError,
867            0x81 => Self::MalformedPacket,
868            0x82 => Self::ProtocolError,
869            0x83 => Self::ImplementationSpecificError,
870            0x87 => Self::NotAuthorized,
871            0x89 => Self::ServerBusy,
872            0x8B => Self::ServerShuttingDown,
873            0x8D => Self::KeepAliveTimeout,
874            0x8E => Self::SessionTakenOver,
875            0x8F => Self::TopicFilterInvalid,
876            0x90 => Self::TopicNameInvalid,
877            0x93 => Self::ReceiveMaximumExceeded,
878            0x94 => Self::TopicAliasInvalid,
879            0x95 => Self::PacketTooLarge,
880            0x96 => Self::MessageRateTooHigh,
881            0x97 => Self::QuotaExceeded,
882            0x98 => Self::AdministrativeAction,
883            0x99 => Self::PayloadFormatInvalid,
884            0x9A => Self::RetainNotSupported,
885            0x9B => Self::QoSNotSupported,
886            0x9C => Self::UserAnotherServer,
887            0x9D => Self::ServerMoved,
888            0x9E => Self::SharedSubscriptionNotSupported,
889            0x9F => Self::ConnectionRateExceeded,
890            0xA0 => Self::MaximumConnectTime,
891            0xA1 => Self::SubscriptionIdentifiersNotSupported,
892            0xA2 => Self::WildcardSubscriptionsNotSupported,
893            _ => return None,
894        };
895        Some(code)
896    }
897}
898
899/// Property list for DISCONNECT packet.
900#[derive(Debug, Clone, PartialEq, Eq, Default)]
901#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
902pub struct DisconnectProperties {
903    pub session_expiry_interval: Option<u32>,
904    pub reason_string: Option<Arc<String>>,
905    pub user_properties: Vec<UserProperty>,
906    pub server_reference: Option<Arc<String>>,
907}
908
909impl DisconnectProperties {
910    pub async fn decode_async<T: AsyncRead + Unpin>(
911        reader: &mut T,
912        packet_type: PacketType,
913    ) -> Result<Self, ErrorV5> {
914        let mut properties = DisconnectProperties::default();
915        decode_properties!(
916            packet_type,
917            properties,
918            reader,
919            SessionExpiryInterval,
920            ReasonString,
921            ServerReference,
922        );
923        Ok(properties)
924    }
925}
926
927impl Encodable for DisconnectProperties {
928    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
929        encode_properties!(
930            self,
931            writer,
932            SessionExpiryInterval,
933            ReasonString,
934            ServerReference,
935        );
936        Ok(())
937    }
938
939    fn encode_len(&self) -> usize {
940        let mut len = 0;
941        encode_properties_len!(
942            self,
943            len,
944            SessionExpiryInterval,
945            ReasonString,
946            ServerReference,
947        );
948        len
949    }
950}
951
952/// Body type of AUTH packet .
953#[derive(Debug, Clone, PartialEq, Eq)]
954#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
955pub struct Auth {
956    pub reason_code: AuthReasonCode,
957    pub properties: AuthProperties,
958}
959
960impl Auth {
961    pub fn new(reason_code: AuthReasonCode) -> Self {
962        Auth {
963            reason_code,
964            properties: AuthProperties::default(),
965        }
966    }
967
968    pub fn new_success() -> Self {
969        Self::new(AuthReasonCode::Success)
970    }
971
972    pub async fn decode_async<T: AsyncRead + Unpin>(
973        reader: &mut T,
974        header: Header,
975    ) -> Result<Self, ErrorV5> {
976        let auth = if header.remaining_len == 0 {
977            Auth {
978                reason_code: AuthReasonCode::Success,
979                properties: AuthProperties::default(),
980            }
981        } else {
982            let reason_byte = read_u8(reader).await?;
983            let reason_code = AuthReasonCode::from_u8(reason_byte)
984                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
985            let properties = AuthProperties::decode_async(reader, header.typ).await?;
986            Auth {
987                reason_code,
988                properties,
989            }
990        };
991        Ok(auth)
992    }
993}
994
995impl Encodable for Auth {
996    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
997        if self.reason_code != AuthReasonCode::Success
998            || self.properties != AuthProperties::default()
999        {
1000            write_u8(writer, self.reason_code as u8)?;
1001            self.properties.encode(writer)?;
1002        }
1003        Ok(())
1004    }
1005
1006    fn encode_len(&self) -> usize {
1007        if self.reason_code == AuthReasonCode::Success
1008            && self.properties == AuthProperties::default()
1009        {
1010            0
1011        } else {
1012            1 + self.properties.encode_len()
1013        }
1014    }
1015}
1016
1017/// Reason code for AUTH packet.
1018///
1019/// | Dec |  Hex | Reason Code name        | Sent by       | Description                                   |
1020/// |-----|------|-------------------------|---------------|-----------------------------------------------|
1021/// |   0 | 0x00 | Success                 | Server        | Authentication is successful                  |
1022/// |  24 | 0x18 | Continue authentication | Client/Server | Continue the authentication with another step |
1023/// |  25 | 0x19 | Re-authenticate         | Client        | Initiate a re-authentication                  |
1024#[repr(u8)]
1025#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1026#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
1027pub enum AuthReasonCode {
1028    Success = 0x00,
1029    ContinueAuthentication = 0x18,
1030    ReAuthentication = 0x19,
1031}
1032
1033impl AuthReasonCode {
1034    pub fn from_u8(value: u8) -> Option<Self> {
1035        let code = match value {
1036            0x00 => Self::Success,
1037            0x18 => Self::ContinueAuthentication,
1038            0x19 => Self::ReAuthentication,
1039            _ => return None,
1040        };
1041        Some(code)
1042    }
1043}
1044
1045/// Property list for AUTH packet.
1046#[derive(Debug, Clone, PartialEq, Eq, Default)]
1047pub struct AuthProperties {
1048    pub auth_method: Option<Arc<String>>,
1049    pub auth_data: Option<Bytes>,
1050    pub reason_string: Option<Arc<String>>,
1051    pub user_properties: Vec<UserProperty>,
1052}
1053
1054#[cfg(feature = "arbitrary")]
1055impl<'a> arbitrary::Arbitrary<'a> for AuthProperties {
1056    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1057        Ok(AuthProperties {
1058            auth_method: u.arbitrary()?,
1059            auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
1060            reason_string: u.arbitrary()?,
1061            user_properties: u.arbitrary()?,
1062        })
1063    }
1064}
1065
1066impl AuthProperties {
1067    pub async fn decode_async<T: AsyncRead + Unpin>(
1068        reader: &mut T,
1069        packet_type: PacketType,
1070    ) -> Result<Self, ErrorV5> {
1071        let mut properties = AuthProperties::default();
1072        decode_properties!(
1073            packet_type,
1074            properties,
1075            reader,
1076            AuthenticationMethod,
1077            AuthenticationData,
1078            ReasonString,
1079        );
1080        Ok(properties)
1081    }
1082}
1083
1084impl Encodable for AuthProperties {
1085    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
1086        encode_properties!(
1087            self,
1088            writer,
1089            AuthenticationMethod,
1090            AuthenticationData,
1091            ReasonString,
1092        );
1093        Ok(())
1094    }
1095
1096    fn encode_len(&self) -> usize {
1097        let mut len = 0;
1098        encode_properties_len!(
1099            self,
1100            len,
1101            AuthenticationMethod,
1102            AuthenticationData,
1103            ReasonString,
1104        );
1105        len
1106    }
1107}