1use core::convert::TryFrom;
2
3use alloc::string::String;
4use alloc::sync::Arc;
5use alloc::vec::Vec;
6
7use bytes::Bytes;
8use simdutf8::basic::from_utf8;
9
10use crate::{
11 from_read_exact_error, read_bytes, read_string, read_u16, read_u8, write_bytes, write_u16,
12 write_u8, AsyncRead, Encodable, Error, Protocol, QoS, SyncWrite, TopicName,
13};
14
15use super::{
16 decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
17 UserProperty,
18};
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct Connect {
23 pub protocol: Protocol,
25
26 pub clean_start: bool,
30
31 pub keep_alive: u16,
38
39 pub properties: ConnectProperties,
41
42 pub client_id: Arc<String>,
46
47 pub last_will: Option<LastWill>,
51
52 pub username: Option<Arc<String>>,
54
55 pub password: Option<Bytes>,
57}
58#[cfg(feature = "arbitrary")]
59impl<'a> arbitrary::Arbitrary<'a> for Connect {
60 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
61 Ok(Connect {
62 protocol: u.arbitrary()?,
63 clean_start: u.arbitrary()?,
64 keep_alive: u.arbitrary()?,
65 properties: u.arbitrary()?,
66 client_id: u.arbitrary()?,
67 last_will: u.arbitrary()?,
68 username: u.arbitrary()?,
69 password: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
70 })
71 }
72}
73
74impl Connect {
75 pub fn new(client_id: Arc<String>, keep_alive: u16) -> Self {
76 Connect {
77 protocol: Protocol::V500,
78 clean_start: true,
79 keep_alive,
80 properties: ConnectProperties::default(),
81 client_id,
82 last_will: None,
83 username: None,
84 password: None,
85 }
86 }
87
88 pub async fn decode_async<T: AsyncRead + Unpin>(
89 reader: &mut T,
90 header: Header,
91 ) -> Result<Self, ErrorV5> {
92 let protocol = Protocol::decode_async(reader).await?;
93 Self::decode_with_protocol(reader, header, protocol).await
94 }
95
96 #[inline]
97 pub async fn decode_with_protocol<T: AsyncRead + Unpin>(
98 reader: &mut T,
99 header: Header,
100 protocol: Protocol,
101 ) -> Result<Self, ErrorV5> {
102 if protocol != Protocol::V500 {
103 return Err(Error::UnexpectedProtocol(protocol).into());
104 }
105 let connect_flags: u8 = read_u8(reader).await?;
106 if connect_flags & 1 != 0 {
107 return Err(Error::InvalidConnectFlags(connect_flags).into());
108 }
109 let keep_alive = read_u16(reader).await?;
110
111 let properties = ConnectProperties::decode_async(reader, header.typ).await?;
114 let client_id = Arc::new(read_string(reader).await?);
115 let last_will = if connect_flags & 0b100 != 0 {
116 let qos = QoS::from_u8((connect_flags & 0b11000) >> 3)?;
117 let retain = (connect_flags & 0b00100000) != 0;
118 Some(LastWill::decode_async(reader, qos, retain).await?)
119 } else if connect_flags & 0b11000 != 0 {
120 return Err(Error::InvalidConnectFlags(connect_flags).into());
121 } else {
122 None
123 };
124 let username = if connect_flags & 0b10000000 != 0 {
125 Some(Arc::new(read_string(reader).await?))
126 } else {
127 None
128 };
129 let password = if connect_flags & 0b01000000 != 0 {
130 Some(Bytes::from(read_bytes(reader).await?))
131 } else {
132 None
133 };
134 let clean_start = (connect_flags & 0b10) != 0;
135
136 Ok(Connect {
137 protocol,
138 clean_start,
139 properties,
140 keep_alive,
141 client_id,
142 last_will,
143 username,
144 password,
145 })
146 }
147}
148
149impl Encodable for Connect {
150 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
151 let mut connect_flags: u8 = 0b00000000;
152 if self.clean_start {
153 connect_flags |= 0b10;
154 }
155 if self.username.is_some() {
156 connect_flags |= 0b10000000;
157 }
158 if self.password.is_some() {
159 connect_flags |= 0b01000000;
160 }
161 if let Some(last_will) = self.last_will.as_ref() {
162 connect_flags |= 0b00000100;
163 connect_flags |= (last_will.qos as u8) << 3;
164 if last_will.retain {
165 connect_flags |= 0b00100000;
166 }
167 }
168
169 self.protocol.encode(writer)?;
170 write_u8(writer, connect_flags)?;
171 write_u16(writer, self.keep_alive)?;
172 self.properties.encode(writer)?;
173 write_bytes(writer, self.client_id.as_bytes())?;
174 if let Some(last_will) = self.last_will.as_ref() {
175 last_will.encode(writer)?;
176 }
177 if let Some(username) = self.username.as_ref() {
178 write_bytes(writer, username.as_bytes())?;
179 }
180 if let Some(password) = self.password.as_ref() {
181 write_bytes(writer, password.as_ref())?;
182 }
183 Ok(())
184 }
185
186 fn encode_len(&self) -> usize {
187 let mut len = self.protocol.encode_len();
188 len += 1 + 2;
190 len += self.properties.encode_len();
192 len += 2 + self.client_id.len();
194 if let Some(last_will) = self.last_will.as_ref() {
195 len += last_will.encode_len();
196 }
197 if let Some(username) = self.username.as_ref() {
198 len += 2 + username.len();
199 }
200 if let Some(password) = self.password.as_ref() {
201 len += 2 + password.len();
202 }
203 len
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Default)]
209pub struct ConnectProperties {
210 pub session_expiry_interval: Option<u32>,
212 pub receive_max: Option<u16>,
214 pub max_packet_size: Option<u32>,
216 pub topic_alias_max: Option<u16>,
218 pub request_response_info: Option<bool>,
220 pub request_problem_info: Option<bool>,
222 pub user_properties: Vec<UserProperty>,
224 pub auth_method: Option<Arc<String>>,
226 pub auth_data: Option<Bytes>,
228}
229
230#[cfg(feature = "arbitrary")]
231impl<'a> arbitrary::Arbitrary<'a> for ConnectProperties {
232 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
233 Ok(ConnectProperties {
234 session_expiry_interval: u.arbitrary()?,
235 receive_max: u.arbitrary()?,
236 max_packet_size: u.arbitrary()?,
237 topic_alias_max: u.arbitrary()?,
238 request_response_info: u.arbitrary()?,
239 request_problem_info: u.arbitrary()?,
240 user_properties: u.arbitrary()?,
241 auth_method: u.arbitrary()?,
242 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
243 })
244 }
245}
246
247impl ConnectProperties {
248 pub async fn decode_async<T: AsyncRead + Unpin>(
249 reader: &mut T,
250 packet_type: PacketType,
251 ) -> Result<Self, ErrorV5> {
252 let mut properties = ConnectProperties::default();
253 decode_properties!(
254 packet_type,
255 properties,
256 reader,
257 SessionExpiryInterval,
258 ReceiveMaximum,
259 MaximumPacketSize,
260 TopicAliasMaximum,
261 RequestResponseInformation,
262 RequestProblemInformation,
263 AuthenticationMethod,
264 AuthenticationData,
265 );
266 Ok(properties)
267 }
268}
269
270impl Encodable for ConnectProperties {
271 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
272 encode_properties!(
273 self,
274 writer,
275 SessionExpiryInterval,
276 ReceiveMaximum,
277 MaximumPacketSize,
278 TopicAliasMaximum,
279 RequestResponseInformation,
280 RequestProblemInformation,
281 AuthenticationMethod,
282 AuthenticationData,
283 );
284 Ok(())
285 }
286
287 fn encode_len(&self) -> usize {
288 let mut len = 0;
289 encode_properties_len!(
290 self,
291 len,
292 SessionExpiryInterval,
293 ReceiveMaximum,
294 MaximumPacketSize,
295 TopicAliasMaximum,
296 RequestResponseInformation,
297 RequestProblemInformation,
298 AuthenticationMethod,
299 AuthenticationData,
300 );
301 len
302 }
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct LastWill {
308 pub qos: QoS,
309 pub retain: bool,
310 pub topic_name: TopicName,
311 pub payload: Bytes,
312 pub properties: WillProperties,
313}
314
315#[cfg(feature = "arbitrary")]
316impl<'a> arbitrary::Arbitrary<'a> for LastWill {
317 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
318 Ok(LastWill {
319 qos: u.arbitrary()?,
320 retain: u.arbitrary()?,
321 properties: u.arbitrary()?,
322 topic_name: u.arbitrary()?,
323 payload: Bytes::from(Vec::<u8>::arbitrary(u)?),
324 })
325 }
326}
327
328impl LastWill {
329 pub fn new(qos: QoS, topic_name: TopicName, payload: Bytes) -> Self {
330 LastWill {
331 qos,
332 retain: false,
333 topic_name,
334 payload,
335 properties: WillProperties::default(),
336 }
337 }
338
339 pub async fn decode_async<T: AsyncRead + Unpin>(
340 reader: &mut T,
341 qos: QoS,
342 retain: bool,
343 ) -> Result<Self, ErrorV5> {
344 let properties = WillProperties::decode_async(reader).await?;
345 let topic_name = TopicName::try_from(read_string(reader).await?)?;
346 let payload = read_bytes(reader).await?;
347 if properties.payload_is_utf8 == Some(true) && from_utf8(&payload).is_err() {
348 return Err(ErrorV5::InvalidPayloadFormat);
349 }
350 Ok(LastWill {
351 qos,
352 retain,
353 properties,
354 topic_name,
355 payload: Bytes::from(payload),
356 })
357 }
358}
359
360impl Encodable for LastWill {
361 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
362 self.properties.encode(writer)?;
363 write_bytes(writer, self.topic_name.as_bytes())?;
364 write_bytes(writer, self.payload.as_ref())?;
365 Ok(())
366 }
367
368 fn encode_len(&self) -> usize {
369 let mut len = self.properties.encode_len();
370 len += 4;
371 len += self.topic_name.len();
372 len += self.payload.len();
373 len
374 }
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, Default)]
379pub struct WillProperties {
380 pub delay_interval: Option<u32>,
381 pub payload_is_utf8: Option<bool>,
382 pub message_expiry_interval: Option<u32>,
383 pub content_type: Option<Arc<String>>,
384 pub response_topic: Option<TopicName>,
385 pub correlation_data: Option<Bytes>,
386 pub user_properties: Vec<UserProperty>,
387}
388#[cfg(feature = "arbitrary")]
389impl<'a> arbitrary::Arbitrary<'a> for WillProperties {
390 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
391 Ok(WillProperties {
392 delay_interval: u.arbitrary()?,
393 payload_is_utf8: u.arbitrary()?,
394 message_expiry_interval: u.arbitrary()?,
395 content_type: u.arbitrary()?,
396 response_topic: u.arbitrary()?,
397 correlation_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
398 user_properties: u.arbitrary()?,
399 })
400 }
401}
402
403impl WillProperties {
404 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
405 let mut properties = WillProperties::default();
406 decode_properties!(
407 LastWill,
408 properties,
409 reader,
410 WillDelayInterval,
411 PayloadFormatIndicator,
412 MessageExpiryInterval,
413 ContentType,
414 ResponseTopic,
415 CorrelationData,
416 );
417 Ok(properties)
418 }
419}
420
421impl Encodable for WillProperties {
422 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
423 encode_properties!(
424 self,
425 writer,
426 WillDelayInterval,
427 PayloadFormatIndicator,
428 MessageExpiryInterval,
429 ContentType,
430 ResponseTopic,
431 CorrelationData,
432 );
433 Ok(())
434 }
435
436 fn encode_len(&self) -> usize {
437 let mut len = 0;
438 encode_properties_len!(
439 self,
440 len,
441 WillDelayInterval,
442 PayloadFormatIndicator,
443 MessageExpiryInterval,
444 ContentType,
445 ResponseTopic,
446 CorrelationData,
447 );
448 len
449 }
450}
451
452#[derive(Debug, Clone, PartialEq, Eq)]
454#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
455pub struct Connack {
456 pub session_present: bool,
457 pub reason_code: ConnectReasonCode,
458 pub properties: ConnackProperties,
459}
460
461impl Connack {
462 pub fn new(session_present: bool, reason_code: ConnectReasonCode) -> Self {
463 Connack {
464 session_present,
465 reason_code,
466 properties: ConnackProperties::default(),
467 }
468 }
469 pub async fn decode_async<T: AsyncRead + Unpin>(
470 reader: &mut T,
471 header: Header,
472 ) -> Result<Self, ErrorV5> {
473 let mut payload = [0u8; 2];
474 reader
475 .read_exact(&mut payload)
476 .await
477 .map_err(from_read_exact_error)?;
478 let session_present = match payload[0] {
479 0 => false,
480 1 => true,
481 _ => return Err(Error::InvalidConnackFlags(payload[0]).into()),
482 };
483 let reason_code = ConnectReasonCode::from_u8(payload[1])
484 .ok_or(ErrorV5::InvalidReasonCode(header.typ, payload[1]))?;
485 let properties = ConnackProperties::decode_async(reader, header.typ).await?;
486 Ok(Connack {
487 session_present,
488 reason_code,
489 properties,
490 })
491 }
492}
493
494impl Encodable for Connack {
495 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
496 write_u8(writer, u8::from(self.session_present))?;
497 write_u8(writer, self.reason_code as u8)?;
498 self.properties.encode(writer)?;
499 Ok(())
500 }
501
502 fn encode_len(&self) -> usize {
503 2 + self.properties.encode_len()
504 }
505}
506
507#[repr(u8)]
534#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
535#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
536pub enum ConnectReasonCode {
537 Success = 0x00,
538 UnspecifiedError = 0x80,
539 MalformedPacket = 0x81,
540 ProtocolError = 0x82,
541 ImplementationSpecificError = 0x83,
542 UnsupportedProtocolVersion = 0x84,
543 ClientIdentifierNotValid = 0x85,
544 BadUserNameOrPassword = 0x86,
545 NotAuthorized = 0x87,
546 ServerUnavailable = 0x88,
547 ServerBusy = 0x89,
548 Banned = 0x8A,
549 BadAuthMethod = 0x8C,
550 TopicNameInvalid = 0x90,
551 PacketTooLarge = 0x95,
552 QuotaExceeded = 0x97,
553 PayloadFormatInvalid = 0x99,
554 RetainNotSupported = 0x9A,
555 QoSNotSupported = 0x9B,
556 UseAnotherServer = 0x9C,
557 ServerMoved = 0x9D,
558 ConnectionRateExceeded = 0x9F,
559}
560
561impl ConnectReasonCode {
562 pub fn from_u8(value: u8) -> Option<ConnectReasonCode> {
563 let code = match value {
564 0x00 => ConnectReasonCode::Success,
565 0x80 => ConnectReasonCode::UnspecifiedError,
566 0x81 => ConnectReasonCode::MalformedPacket,
567 0x82 => ConnectReasonCode::ProtocolError,
568 0x83 => ConnectReasonCode::ImplementationSpecificError,
569 0x84 => ConnectReasonCode::UnsupportedProtocolVersion,
570 0x85 => ConnectReasonCode::ClientIdentifierNotValid,
571 0x86 => ConnectReasonCode::BadUserNameOrPassword,
572 0x87 => ConnectReasonCode::NotAuthorized,
573 0x88 => ConnectReasonCode::ServerUnavailable,
574 0x89 => ConnectReasonCode::ServerBusy,
575 0x8A => ConnectReasonCode::Banned,
576 0x8C => ConnectReasonCode::BadAuthMethod,
577 0x90 => ConnectReasonCode::TopicNameInvalid,
578 0x95 => ConnectReasonCode::PacketTooLarge,
579 0x97 => ConnectReasonCode::QuotaExceeded,
580 0x99 => ConnectReasonCode::PayloadFormatInvalid,
581 0x9A => ConnectReasonCode::RetainNotSupported,
582 0x9B => ConnectReasonCode::QoSNotSupported,
583 0x9C => ConnectReasonCode::UseAnotherServer,
584 0x9D => ConnectReasonCode::ServerMoved,
585 0x9F => ConnectReasonCode::ConnectionRateExceeded,
586 _ => return None,
587 };
588 Some(code)
589 }
590}
591
592#[derive(Debug, Clone, PartialEq, Eq, Default)]
594pub struct ConnackProperties {
595 pub session_expiry_interval: Option<u32>,
596 pub receive_max: Option<u16>,
597 pub max_qos: Option<QoS>,
598 pub retain_available: Option<bool>,
599 pub max_packet_size: Option<u32>,
600 pub assigned_client_id: Option<Arc<String>>,
601 pub topic_alias_max: Option<u16>,
602 pub reason_string: Option<Arc<String>>,
603 pub user_properties: Vec<UserProperty>,
604 pub wildcard_subscription_available: Option<bool>,
605 pub subscription_id_available: Option<bool>,
606 pub shared_subscription_available: Option<bool>,
607 pub server_keep_alive: Option<u16>,
608 pub response_info: Option<Arc<String>>,
609 pub server_reference: Option<Arc<String>>,
610 pub auth_method: Option<Arc<String>>,
611 pub auth_data: Option<Bytes>,
612}
613
614#[cfg(feature = "arbitrary")]
615impl<'a> arbitrary::Arbitrary<'a> for ConnackProperties {
616 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
617 Ok(ConnackProperties {
618 session_expiry_interval: u.arbitrary()?,
619 receive_max: u.arbitrary()?,
620 max_qos: u.arbitrary()?,
621 retain_available: u.arbitrary()?,
622 max_packet_size: u.arbitrary()?,
623 assigned_client_id: u.arbitrary()?,
624 topic_alias_max: u.arbitrary()?,
625 reason_string: u.arbitrary()?,
626 user_properties: u.arbitrary()?,
627 wildcard_subscription_available: u.arbitrary()?,
628 subscription_id_available: u.arbitrary()?,
629 shared_subscription_available: u.arbitrary()?,
630 server_keep_alive: u.arbitrary()?,
631 response_info: u.arbitrary()?,
632 server_reference: u.arbitrary()?,
633 auth_method: u.arbitrary()?,
634 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
635 })
636 }
637}
638
639impl ConnackProperties {
640 pub async fn decode_async<T: AsyncRead + Unpin>(
641 reader: &mut T,
642 packet_type: PacketType,
643 ) -> Result<Self, ErrorV5> {
644 let mut properties = ConnackProperties::default();
645 decode_properties!(
646 packet_type,
647 properties,
648 reader,
649 SessionExpiryInterval,
650 ReceiveMaximum,
651 MaximumQoS,
652 RetainAvailable,
653 MaximumPacketSize,
654 AssignedClientIdentifier,
655 TopicAliasMaximum,
656 ReasonString,
657 WildcardSubscriptionAvailable,
658 SubscriptionIdentifierAvailable,
659 SharedSubscriptionAvailable,
660 ServerKeepAlive,
661 ResponseInformation,
662 ServerReference,
663 AuthenticationMethod,
664 AuthenticationData,
665 );
666 Ok(properties)
667 }
668}
669
670impl Encodable for ConnackProperties {
671 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
672 encode_properties!(
673 self,
674 writer,
675 SessionExpiryInterval,
676 ReceiveMaximum,
677 MaximumQoS,
678 RetainAvailable,
679 MaximumPacketSize,
680 AssignedClientIdentifier,
681 TopicAliasMaximum,
682 ReasonString,
683 WildcardSubscriptionAvailable,
684 SubscriptionIdentifierAvailable,
685 SharedSubscriptionAvailable,
686 ServerKeepAlive,
687 ResponseInformation,
688 ServerReference,
689 AuthenticationMethod,
690 AuthenticationData,
691 );
692 Ok(())
693 }
694
695 fn encode_len(&self) -> usize {
696 let mut len = 0;
697 encode_properties_len!(
698 self,
699 len,
700 SessionExpiryInterval,
701 ReceiveMaximum,
702 MaximumQoS,
703 RetainAvailable,
704 MaximumPacketSize,
705 AssignedClientIdentifier,
706 TopicAliasMaximum,
707 ReasonString,
708 WildcardSubscriptionAvailable,
709 SubscriptionIdentifierAvailable,
710 SharedSubscriptionAvailable,
711 ServerKeepAlive,
712 ResponseInformation,
713 ServerReference,
714 AuthenticationMethod,
715 AuthenticationData,
716 );
717 len
718 }
719}
720
721#[derive(Debug, Clone, PartialEq, Eq)]
723#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
724pub struct Disconnect {
725 pub reason_code: DisconnectReasonCode,
726 pub properties: DisconnectProperties,
727}
728
729impl Disconnect {
730 pub fn new(reason_code: DisconnectReasonCode) -> Self {
731 Disconnect {
732 reason_code,
733 properties: DisconnectProperties::default(),
734 }
735 }
736
737 pub fn new_normal() -> Self {
738 Self::new(DisconnectReasonCode::NormalDisconnect)
739 }
740
741 pub async fn decode_async<T: AsyncRead + Unpin>(
742 reader: &mut T,
743 header: Header,
744 ) -> Result<Self, ErrorV5> {
745 let (reason_code, properties) = if header.remaining_len == 0 {
746 (DisconnectReasonCode::NormalDisconnect, Default::default())
747 } else if header.remaining_len == 1 {
748 let reason_byte = read_u8(reader).await?;
749 let reason_code = DisconnectReasonCode::from_u8(reason_byte)
750 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
751 (reason_code, Default::default())
752 } else {
753 let reason_byte = read_u8(reader).await?;
754 let reason_code = DisconnectReasonCode::from_u8(reason_byte)
755 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
756 let properties = DisconnectProperties::decode_async(reader, header.typ).await?;
757 (reason_code, properties)
758 };
759 Ok(Disconnect {
760 reason_code,
761 properties,
762 })
763 }
764}
765
766impl Encodable for Disconnect {
767 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
768 if self.properties == DisconnectProperties::default() {
769 if self.reason_code != DisconnectReasonCode::NormalDisconnect {
770 write_u8(writer, self.reason_code as u8)?;
771 }
772 } else {
773 write_u8(writer, self.reason_code as u8)?;
774 self.properties.encode(writer)?;
775 }
776 Ok(())
777 }
778
779 fn encode_len(&self) -> usize {
780 if self.properties == DisconnectProperties::default() {
781 if self.reason_code == DisconnectReasonCode::NormalDisconnect {
782 0
783 } else {
784 1
785 }
786 } else {
787 1 + self.properties.encode_len()
788 }
789 }
790}
791
792#[repr(u8)]
829#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
830#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
831pub enum DisconnectReasonCode {
832 NormalDisconnect = 0x00,
833 DisconnectWithWillMessage = 0x04,
834 UnspecifiedError = 0x80,
835 MalformedPacket = 0x81,
836 ProtocolError = 0x82,
837 ImplementationSpecificError = 0x83,
838 NotAuthorized = 0x87,
839 ServerBusy = 0x89,
840 ServerShuttingDown = 0x8B,
841 KeepAliveTimeout = 0x8D,
842 SessionTakenOver = 0x8E,
843 TopicFilterInvalid = 0x8F,
844 TopicNameInvalid = 0x90,
845 ReceiveMaximumExceeded = 0x93,
846 TopicAliasInvalid = 0x94,
847 PacketTooLarge = 0x95,
848 MessageRateTooHigh = 0x96,
849 QuotaExceeded = 0x97,
850 AdministrativeAction = 0x98,
851 PayloadFormatInvalid = 0x99,
852 RetainNotSupported = 0x9A,
853 QoSNotSupported = 0x9B,
854 UserAnotherServer = 0x9C,
855 ServerMoved = 0x9D,
856 SharedSubscriptionNotSupported = 0x9E,
857 ConnectionRateExceeded = 0x9F,
858 MaximumConnectTime = 0xA0,
859 SubscriptionIdentifiersNotSupported = 0xA1,
860 WildcardSubscriptionsNotSupported = 0xA2,
861}
862
863impl DisconnectReasonCode {
864 pub fn from_u8(value: u8) -> Option<Self> {
865 let code = match value {
866 0x00 => Self::NormalDisconnect,
867 0x04 => Self::DisconnectWithWillMessage,
868 0x80 => Self::UnspecifiedError,
869 0x81 => Self::MalformedPacket,
870 0x82 => Self::ProtocolError,
871 0x83 => Self::ImplementationSpecificError,
872 0x87 => Self::NotAuthorized,
873 0x89 => Self::ServerBusy,
874 0x8B => Self::ServerShuttingDown,
875 0x8D => Self::KeepAliveTimeout,
876 0x8E => Self::SessionTakenOver,
877 0x8F => Self::TopicFilterInvalid,
878 0x90 => Self::TopicNameInvalid,
879 0x93 => Self::ReceiveMaximumExceeded,
880 0x94 => Self::TopicAliasInvalid,
881 0x95 => Self::PacketTooLarge,
882 0x96 => Self::MessageRateTooHigh,
883 0x97 => Self::QuotaExceeded,
884 0x98 => Self::AdministrativeAction,
885 0x99 => Self::PayloadFormatInvalid,
886 0x9A => Self::RetainNotSupported,
887 0x9B => Self::QoSNotSupported,
888 0x9C => Self::UserAnotherServer,
889 0x9D => Self::ServerMoved,
890 0x9E => Self::SharedSubscriptionNotSupported,
891 0x9F => Self::ConnectionRateExceeded,
892 0xA0 => Self::MaximumConnectTime,
893 0xA1 => Self::SubscriptionIdentifiersNotSupported,
894 0xA2 => Self::WildcardSubscriptionsNotSupported,
895 _ => return None,
896 };
897 Some(code)
898 }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, Default)]
903#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
904pub struct DisconnectProperties {
905 pub session_expiry_interval: Option<u32>,
906 pub reason_string: Option<Arc<String>>,
907 pub user_properties: Vec<UserProperty>,
908 pub server_reference: Option<Arc<String>>,
909}
910
911impl DisconnectProperties {
912 pub async fn decode_async<T: AsyncRead + Unpin>(
913 reader: &mut T,
914 packet_type: PacketType,
915 ) -> Result<Self, ErrorV5> {
916 let mut properties = DisconnectProperties::default();
917 decode_properties!(
918 packet_type,
919 properties,
920 reader,
921 SessionExpiryInterval,
922 ReasonString,
923 ServerReference,
924 );
925 Ok(properties)
926 }
927}
928
929impl Encodable for DisconnectProperties {
930 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
931 encode_properties!(
932 self,
933 writer,
934 SessionExpiryInterval,
935 ReasonString,
936 ServerReference,
937 );
938 Ok(())
939 }
940
941 fn encode_len(&self) -> usize {
942 let mut len = 0;
943 encode_properties_len!(
944 self,
945 len,
946 SessionExpiryInterval,
947 ReasonString,
948 ServerReference,
949 );
950 len
951 }
952}
953
954#[derive(Debug, Clone, PartialEq, Eq)]
956#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
957pub struct Auth {
958 pub reason_code: AuthReasonCode,
959 pub properties: AuthProperties,
960}
961
962impl Auth {
963 pub fn new(reason_code: AuthReasonCode) -> Self {
964 Auth {
965 reason_code,
966 properties: AuthProperties::default(),
967 }
968 }
969
970 pub fn new_success() -> Self {
971 Self::new(AuthReasonCode::Success)
972 }
973
974 pub async fn decode_async<T: AsyncRead + Unpin>(
975 reader: &mut T,
976 header: Header,
977 ) -> Result<Self, ErrorV5> {
978 let auth = if header.remaining_len == 0 {
979 Auth {
980 reason_code: AuthReasonCode::Success,
981 properties: AuthProperties::default(),
982 }
983 } else {
984 let reason_byte = read_u8(reader).await?;
985 let reason_code = AuthReasonCode::from_u8(reason_byte)
986 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
987 let properties = AuthProperties::decode_async(reader, header.typ).await?;
988 Auth {
989 reason_code,
990 properties,
991 }
992 };
993 Ok(auth)
994 }
995}
996
997impl Encodable for Auth {
998 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
999 if self.reason_code != AuthReasonCode::Success
1000 || self.properties != AuthProperties::default()
1001 {
1002 write_u8(writer, self.reason_code as u8)?;
1003 self.properties.encode(writer)?;
1004 }
1005 Ok(())
1006 }
1007
1008 fn encode_len(&self) -> usize {
1009 if self.reason_code == AuthReasonCode::Success
1010 && self.properties == AuthProperties::default()
1011 {
1012 0
1013 } else {
1014 1 + self.properties.encode_len()
1015 }
1016 }
1017}
1018
1019#[repr(u8)]
1027#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1028#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
1029pub enum AuthReasonCode {
1030 Success = 0x00,
1031 ContinueAuthentication = 0x18,
1032 ReAuthentication = 0x19,
1033}
1034
1035impl AuthReasonCode {
1036 pub fn from_u8(value: u8) -> Option<Self> {
1037 let code = match value {
1038 0x00 => Self::Success,
1039 0x18 => Self::ContinueAuthentication,
1040 0x19 => Self::ReAuthentication,
1041 _ => return None,
1042 };
1043 Some(code)
1044 }
1045}
1046
1047#[derive(Debug, Clone, PartialEq, Eq, Default)]
1049pub struct AuthProperties {
1050 pub auth_method: Option<Arc<String>>,
1051 pub auth_data: Option<Bytes>,
1052 pub reason_string: Option<Arc<String>>,
1053 pub user_properties: Vec<UserProperty>,
1054}
1055
1056#[cfg(feature = "arbitrary")]
1057impl<'a> arbitrary::Arbitrary<'a> for AuthProperties {
1058 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1059 Ok(AuthProperties {
1060 auth_method: u.arbitrary()?,
1061 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
1062 reason_string: u.arbitrary()?,
1063 user_properties: u.arbitrary()?,
1064 })
1065 }
1066}
1067
1068impl AuthProperties {
1069 pub async fn decode_async<T: AsyncRead + Unpin>(
1070 reader: &mut T,
1071 packet_type: PacketType,
1072 ) -> Result<Self, ErrorV5> {
1073 let mut properties = AuthProperties::default();
1074 decode_properties!(
1075 packet_type,
1076 properties,
1077 reader,
1078 AuthenticationMethod,
1079 AuthenticationData,
1080 ReasonString,
1081 );
1082 Ok(properties)
1083 }
1084}
1085
1086impl Encodable for AuthProperties {
1087 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
1088 encode_properties!(
1089 self,
1090 writer,
1091 AuthenticationMethod,
1092 AuthenticationData,
1093 ReasonString,
1094 );
1095 Ok(())
1096 }
1097
1098 fn encode_len(&self) -> usize {
1099 let mut len = 0;
1100 encode_properties_len!(
1101 self,
1102 len,
1103 AuthenticationMethod,
1104 AuthenticationData,
1105 ReasonString,
1106 );
1107 len
1108 }
1109}