1use std::convert::TryFrom;
2use std::io;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use simdutf8::basic::from_utf8;
7use tokio::io::{AsyncRead, AsyncReadExt};
8
9use super::{
10 decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
11 UserProperty,
12};
13use crate::{
14 read_bytes, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, Encodable, Error,
15 Protocol, QoS, TopicName,
16};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct Connect {
21 pub protocol: Protocol,
23
24 pub clean_start: bool,
28
29 pub keep_alive: u16,
36
37 pub properties: ConnectProperties,
39
40 pub client_id: Arc<String>,
44
45 pub last_will: Option<LastWill>,
49
50 pub username: Option<Arc<String>>,
52
53 pub password: Option<Bytes>,
55}
56#[cfg(feature = "arbitrary")]
57impl<'a> arbitrary::Arbitrary<'a> for Connect {
58 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
59 Ok(Connect {
60 protocol: u.arbitrary()?,
61 clean_start: u.arbitrary()?,
62 keep_alive: u.arbitrary()?,
63 properties: u.arbitrary()?,
64 client_id: u.arbitrary()?,
65 last_will: u.arbitrary()?,
66 username: u.arbitrary()?,
67 password: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
68 })
69 }
70}
71
72impl Connect {
73 pub fn new(client_id: Arc<String>, keep_alive: u16) -> Self {
74 Connect {
75 protocol: Protocol::V500,
76 clean_start: true,
77 keep_alive,
78 properties: ConnectProperties::default(),
79 client_id,
80 last_will: None,
81 username: None,
82 password: None,
83 }
84 }
85
86 pub async fn decode_async<T: AsyncRead + Unpin>(
87 reader: &mut T,
88 header: Header,
89 ) -> Result<Self, ErrorV5> {
90 let protocol = Protocol::decode_async(reader).await?;
91 Self::decode_with_protocol(reader, header, protocol).await
92 }
93
94 #[inline]
95 pub async fn decode_with_protocol<T: AsyncRead + Unpin>(
96 reader: &mut T,
97 header: Header,
98 protocol: Protocol,
99 ) -> Result<Self, ErrorV5> {
100 if protocol != Protocol::V500 {
101 return Err(Error::UnexpectedProtocol(protocol).into());
102 }
103 let connect_flags: u8 = read_u8(reader).await?;
104 if connect_flags & 1 != 0 {
105 return Err(Error::InvalidConnectFlags(connect_flags).into());
106 }
107 let keep_alive = read_u16(reader).await?;
108
109 let properties = ConnectProperties::decode_async(reader, header.typ).await?;
112 let client_id = Arc::new(read_string(reader).await?);
113 let last_will = if connect_flags & 0b100 != 0 {
114 let qos = QoS::from_u8((connect_flags & 0b11000) >> 3)?;
115 let retain = (connect_flags & 0b00100000) != 0;
116 Some(LastWill::decode_async(reader, qos, retain).await?)
117 } else if connect_flags & 0b11000 != 0 {
118 return Err(Error::InvalidConnectFlags(connect_flags).into());
119 } else {
120 None
121 };
122 let username = if connect_flags & 0b10000000 != 0 {
123 Some(Arc::new(read_string(reader).await?))
124 } else {
125 None
126 };
127 let password = if connect_flags & 0b01000000 != 0 {
128 Some(Bytes::from(read_bytes(reader).await?))
129 } else {
130 None
131 };
132 let clean_start = (connect_flags & 0b10) != 0;
133
134 Ok(Connect {
135 protocol,
136 clean_start,
137 properties,
138 keep_alive,
139 client_id,
140 last_will,
141 username,
142 password,
143 })
144 }
145}
146
147impl Encodable for Connect {
148 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
149 let mut connect_flags: u8 = 0b00000000;
150 if self.clean_start {
151 connect_flags |= 0b10;
152 }
153 if self.username.is_some() {
154 connect_flags |= 0b10000000;
155 }
156 if self.password.is_some() {
157 connect_flags |= 0b01000000;
158 }
159 if let Some(last_will) = self.last_will.as_ref() {
160 connect_flags |= 0b00000100;
161 connect_flags |= (last_will.qos as u8) << 3;
162 if last_will.retain {
163 connect_flags |= 0b00100000;
164 }
165 }
166
167 self.protocol.encode(writer)?;
168 write_u8(writer, connect_flags)?;
169 write_u16(writer, self.keep_alive)?;
170 self.properties.encode(writer)?;
171 write_bytes(writer, self.client_id.as_bytes())?;
172 if let Some(last_will) = self.last_will.as_ref() {
173 last_will.encode(writer)?;
174 }
175 if let Some(username) = self.username.as_ref() {
176 write_bytes(writer, username.as_bytes())?;
177 }
178 if let Some(password) = self.password.as_ref() {
179 write_bytes(writer, password.as_ref())?;
180 }
181 Ok(())
182 }
183
184 fn encode_len(&self) -> usize {
185 let mut len = self.protocol.encode_len();
186 len += 1 + 2;
188 len += self.properties.encode_len();
190 len += 2 + self.client_id.len();
192 if let Some(last_will) = self.last_will.as_ref() {
193 len += last_will.encode_len();
194 }
195 if let Some(username) = self.username.as_ref() {
196 len += 2 + username.len();
197 }
198 if let Some(password) = self.password.as_ref() {
199 len += 2 + password.len();
200 }
201 len
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq, Default)]
207pub struct ConnectProperties {
208 pub session_expiry_interval: Option<u32>,
210 pub receive_max: Option<u16>,
212 pub max_packet_size: Option<u32>,
214 pub topic_alias_max: Option<u16>,
216 pub request_response_info: Option<bool>,
218 pub request_problem_info: Option<bool>,
220 pub user_properties: Vec<UserProperty>,
222 pub auth_method: Option<Arc<String>>,
224 pub auth_data: Option<Bytes>,
226}
227
228#[cfg(feature = "arbitrary")]
229impl<'a> arbitrary::Arbitrary<'a> for ConnectProperties {
230 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
231 Ok(ConnectProperties {
232 session_expiry_interval: u.arbitrary()?,
233 receive_max: u.arbitrary()?,
234 max_packet_size: u.arbitrary()?,
235 topic_alias_max: u.arbitrary()?,
236 request_response_info: u.arbitrary()?,
237 request_problem_info: u.arbitrary()?,
238 user_properties: u.arbitrary()?,
239 auth_method: u.arbitrary()?,
240 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
241 })
242 }
243}
244
245impl ConnectProperties {
246 pub async fn decode_async<T: AsyncRead + Unpin>(
247 reader: &mut T,
248 packet_type: PacketType,
249 ) -> Result<Self, ErrorV5> {
250 let mut properties = ConnectProperties::default();
251 decode_properties!(
252 packet_type,
253 properties,
254 reader,
255 SessionExpiryInterval,
256 ReceiveMaximum,
257 MaximumPacketSize,
258 TopicAliasMaximum,
259 RequestResponseInformation,
260 RequestProblemInformation,
261 AuthenticationMethod,
262 AuthenticationData,
263 );
264 Ok(properties)
265 }
266}
267
268impl Encodable for ConnectProperties {
269 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
270 encode_properties!(
271 self,
272 writer,
273 SessionExpiryInterval,
274 ReceiveMaximum,
275 MaximumPacketSize,
276 TopicAliasMaximum,
277 RequestResponseInformation,
278 RequestProblemInformation,
279 AuthenticationMethod,
280 AuthenticationData,
281 );
282 Ok(())
283 }
284
285 fn encode_len(&self) -> usize {
286 let mut len = 0;
287 encode_properties_len!(
288 self,
289 len,
290 SessionExpiryInterval,
291 ReceiveMaximum,
292 MaximumPacketSize,
293 TopicAliasMaximum,
294 RequestResponseInformation,
295 RequestProblemInformation,
296 AuthenticationMethod,
297 AuthenticationData,
298 );
299 len
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq)]
305pub struct LastWill {
306 pub qos: QoS,
307 pub retain: bool,
308 pub topic_name: TopicName,
309 pub payload: Bytes,
310 pub properties: WillProperties,
311}
312
313#[cfg(feature = "arbitrary")]
314impl<'a> arbitrary::Arbitrary<'a> for LastWill {
315 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
316 Ok(LastWill {
317 qos: u.arbitrary()?,
318 retain: u.arbitrary()?,
319 properties: u.arbitrary()?,
320 topic_name: u.arbitrary()?,
321 payload: Bytes::from(Vec::<u8>::arbitrary(u)?),
322 })
323 }
324}
325
326impl LastWill {
327 pub fn new(qos: QoS, topic_name: TopicName, payload: Bytes) -> Self {
328 LastWill {
329 qos,
330 retain: false,
331 topic_name,
332 payload,
333 properties: WillProperties::default(),
334 }
335 }
336
337 pub async fn decode_async<T: AsyncRead + Unpin>(
338 reader: &mut T,
339 qos: QoS,
340 retain: bool,
341 ) -> Result<Self, ErrorV5> {
342 let properties = WillProperties::decode_async(reader).await?;
343 let topic_name = TopicName::try_from(read_string(reader).await?)?;
344 let payload = read_bytes(reader).await?;
345 if properties.payload_is_utf8 == Some(true) && from_utf8(&payload).is_err() {
346 return Err(ErrorV5::InvalidPayloadFormat);
347 }
348 Ok(LastWill {
349 qos,
350 retain,
351 properties,
352 topic_name,
353 payload: Bytes::from(payload),
354 })
355 }
356}
357
358impl Encodable for LastWill {
359 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
360 self.properties.encode(writer)?;
361 write_bytes(writer, self.topic_name.as_bytes())?;
362 write_bytes(writer, self.payload.as_ref())?;
363 Ok(())
364 }
365
366 fn encode_len(&self) -> usize {
367 let mut len = self.properties.encode_len();
368 len += 4;
369 len += self.topic_name.len();
370 len += self.payload.len();
371 len
372 }
373}
374
375#[derive(Debug, Clone, PartialEq, Eq, Default)]
377pub struct WillProperties {
378 pub delay_interval: Option<u32>,
379 pub payload_is_utf8: Option<bool>,
380 pub message_expiry_interval: Option<u32>,
381 pub content_type: Option<Arc<String>>,
382 pub response_topic: Option<TopicName>,
383 pub correlation_data: Option<Bytes>,
384 pub user_properties: Vec<UserProperty>,
385}
386#[cfg(feature = "arbitrary")]
387impl<'a> arbitrary::Arbitrary<'a> for WillProperties {
388 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
389 Ok(WillProperties {
390 delay_interval: u.arbitrary()?,
391 payload_is_utf8: u.arbitrary()?,
392 message_expiry_interval: u.arbitrary()?,
393 content_type: u.arbitrary()?,
394 response_topic: u.arbitrary()?,
395 correlation_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
396 user_properties: u.arbitrary()?,
397 })
398 }
399}
400
401impl WillProperties {
402 pub async fn decode_async<T: AsyncRead + Unpin>(reader: &mut T) -> Result<Self, ErrorV5> {
403 let mut properties = WillProperties::default();
404 decode_properties!(
405 LastWill,
406 properties,
407 reader,
408 WillDelayInterval,
409 PayloadFormatIndicator,
410 MessageExpiryInterval,
411 ContentType,
412 ResponseTopic,
413 CorrelationData,
414 );
415 Ok(properties)
416 }
417}
418
419impl Encodable for WillProperties {
420 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
421 encode_properties!(
422 self,
423 writer,
424 WillDelayInterval,
425 PayloadFormatIndicator,
426 MessageExpiryInterval,
427 ContentType,
428 ResponseTopic,
429 CorrelationData,
430 );
431 Ok(())
432 }
433
434 fn encode_len(&self) -> usize {
435 let mut len = 0;
436 encode_properties_len!(
437 self,
438 len,
439 WillDelayInterval,
440 PayloadFormatIndicator,
441 MessageExpiryInterval,
442 ContentType,
443 ResponseTopic,
444 CorrelationData,
445 );
446 len
447 }
448}
449
450#[derive(Debug, Clone, PartialEq, Eq)]
452#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
453pub struct Connack {
454 pub session_present: bool,
455 pub reason_code: ConnectReasonCode,
456 pub properties: ConnackProperties,
457}
458
459impl Connack {
460 pub fn new(session_present: bool, reason_code: ConnectReasonCode) -> Self {
461 Connack {
462 session_present,
463 reason_code,
464 properties: ConnackProperties::default(),
465 }
466 }
467 pub async fn decode_async<T: AsyncRead + Unpin>(
468 reader: &mut T,
469 header: Header,
470 ) -> Result<Self, ErrorV5> {
471 let mut payload = [0u8; 2];
472 reader
473 .read_exact(&mut payload)
474 .await
475 .map_err(|err| Error::IoError(err.kind(), err.to_string()))?;
476 let session_present = match payload[0] {
477 0 => false,
478 1 => true,
479 _ => return Err(Error::InvalidConnackFlags(payload[0]).into()),
480 };
481 let reason_code = ConnectReasonCode::from_u8(payload[1])
482 .ok_or(ErrorV5::InvalidReasonCode(header.typ, payload[1]))?;
483 let properties = ConnackProperties::decode_async(reader, header.typ).await?;
484 Ok(Connack {
485 session_present,
486 reason_code,
487 properties,
488 })
489 }
490}
491
492impl Encodable for Connack {
493 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
494 write_u8(writer, u8::from(self.session_present))?;
495 write_u8(writer, self.reason_code as u8)?;
496 self.properties.encode(writer)?;
497 Ok(())
498 }
499
500 fn encode_len(&self) -> usize {
501 2 + self.properties.encode_len()
502 }
503}
504
505#[repr(u8)]
532#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
533#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
534pub enum ConnectReasonCode {
535 Success = 0x00,
536 UnspecifiedError = 0x80,
537 MalformedPacket = 0x81,
538 ProtocolError = 0x82,
539 ImplementationSpecificError = 0x83,
540 UnsupportedProtocolVersion = 0x84,
541 ClientIdentifierNotValid = 0x85,
542 BadUserNameOrPassword = 0x86,
543 NotAuthorized = 0x87,
544 ServerUnavailable = 0x88,
545 ServerBusy = 0x89,
546 Banned = 0x8A,
547 BadAuthMethod = 0x8C,
548 TopicNameInvalid = 0x90,
549 PacketTooLarge = 0x95,
550 QuotaExceeded = 0x97,
551 PayloadFormatInvalid = 0x99,
552 RetainNotSupported = 0x9A,
553 QoSNotSupported = 0x9B,
554 UseAnotherServer = 0x9C,
555 ServerMoved = 0x9D,
556 ConnectionRateExceeded = 0x9F,
557}
558
559impl ConnectReasonCode {
560 pub fn from_u8(value: u8) -> Option<ConnectReasonCode> {
561 let code = match value {
562 0x00 => ConnectReasonCode::Success,
563 0x80 => ConnectReasonCode::UnspecifiedError,
564 0x81 => ConnectReasonCode::MalformedPacket,
565 0x82 => ConnectReasonCode::ProtocolError,
566 0x83 => ConnectReasonCode::ImplementationSpecificError,
567 0x84 => ConnectReasonCode::UnsupportedProtocolVersion,
568 0x85 => ConnectReasonCode::ClientIdentifierNotValid,
569 0x86 => ConnectReasonCode::BadUserNameOrPassword,
570 0x87 => ConnectReasonCode::NotAuthorized,
571 0x88 => ConnectReasonCode::ServerUnavailable,
572 0x89 => ConnectReasonCode::ServerBusy,
573 0x8A => ConnectReasonCode::Banned,
574 0x8C => ConnectReasonCode::BadAuthMethod,
575 0x90 => ConnectReasonCode::TopicNameInvalid,
576 0x95 => ConnectReasonCode::PacketTooLarge,
577 0x97 => ConnectReasonCode::QuotaExceeded,
578 0x99 => ConnectReasonCode::PayloadFormatInvalid,
579 0x9A => ConnectReasonCode::RetainNotSupported,
580 0x9B => ConnectReasonCode::QoSNotSupported,
581 0x9C => ConnectReasonCode::UseAnotherServer,
582 0x9D => ConnectReasonCode::ServerMoved,
583 0x9F => ConnectReasonCode::ConnectionRateExceeded,
584 _ => return None,
585 };
586 Some(code)
587 }
588}
589
590#[derive(Debug, Clone, PartialEq, Eq, Default)]
592pub struct ConnackProperties {
593 pub session_expiry_interval: Option<u32>,
594 pub receive_max: Option<u16>,
595 pub max_qos: Option<QoS>,
596 pub retain_available: Option<bool>,
597 pub max_packet_size: Option<u32>,
598 pub assigned_client_id: Option<Arc<String>>,
599 pub topic_alias_max: Option<u16>,
600 pub reason_string: Option<Arc<String>>,
601 pub user_properties: Vec<UserProperty>,
602 pub wildcard_subscription_available: Option<bool>,
603 pub subscription_id_available: Option<bool>,
604 pub shared_subscription_available: Option<bool>,
605 pub server_keep_alive: Option<u16>,
606 pub response_info: Option<Arc<String>>,
607 pub server_reference: Option<Arc<String>>,
608 pub auth_method: Option<Arc<String>>,
609 pub auth_data: Option<Bytes>,
610}
611
612#[cfg(feature = "arbitrary")]
613impl<'a> arbitrary::Arbitrary<'a> for ConnackProperties {
614 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
615 Ok(ConnackProperties {
616 session_expiry_interval: u.arbitrary()?,
617 receive_max: u.arbitrary()?,
618 max_qos: u.arbitrary()?,
619 retain_available: u.arbitrary()?,
620 max_packet_size: u.arbitrary()?,
621 assigned_client_id: u.arbitrary()?,
622 topic_alias_max: u.arbitrary()?,
623 reason_string: u.arbitrary()?,
624 user_properties: u.arbitrary()?,
625 wildcard_subscription_available: u.arbitrary()?,
626 subscription_id_available: u.arbitrary()?,
627 shared_subscription_available: u.arbitrary()?,
628 server_keep_alive: u.arbitrary()?,
629 response_info: u.arbitrary()?,
630 server_reference: u.arbitrary()?,
631 auth_method: u.arbitrary()?,
632 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
633 })
634 }
635}
636
637impl ConnackProperties {
638 pub async fn decode_async<T: AsyncRead + Unpin>(
639 reader: &mut T,
640 packet_type: PacketType,
641 ) -> Result<Self, ErrorV5> {
642 let mut properties = ConnackProperties::default();
643 decode_properties!(
644 packet_type,
645 properties,
646 reader,
647 SessionExpiryInterval,
648 ReceiveMaximum,
649 MaximumQoS,
650 RetainAvailable,
651 MaximumPacketSize,
652 AssignedClientIdentifier,
653 TopicAliasMaximum,
654 ReasonString,
655 WildcardSubscriptionAvailable,
656 SubscriptionIdentifierAvailable,
657 SharedSubscriptionAvailable,
658 ServerKeepAlive,
659 ResponseInformation,
660 ServerReference,
661 AuthenticationMethod,
662 AuthenticationData,
663 );
664 Ok(properties)
665 }
666}
667
668impl Encodable for ConnackProperties {
669 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
670 encode_properties!(
671 self,
672 writer,
673 SessionExpiryInterval,
674 ReceiveMaximum,
675 MaximumQoS,
676 RetainAvailable,
677 MaximumPacketSize,
678 AssignedClientIdentifier,
679 TopicAliasMaximum,
680 ReasonString,
681 WildcardSubscriptionAvailable,
682 SubscriptionIdentifierAvailable,
683 SharedSubscriptionAvailable,
684 ServerKeepAlive,
685 ResponseInformation,
686 ServerReference,
687 AuthenticationMethod,
688 AuthenticationData,
689 );
690 Ok(())
691 }
692
693 fn encode_len(&self) -> usize {
694 let mut len = 0;
695 encode_properties_len!(
696 self,
697 len,
698 SessionExpiryInterval,
699 ReceiveMaximum,
700 MaximumQoS,
701 RetainAvailable,
702 MaximumPacketSize,
703 AssignedClientIdentifier,
704 TopicAliasMaximum,
705 ReasonString,
706 WildcardSubscriptionAvailable,
707 SubscriptionIdentifierAvailable,
708 SharedSubscriptionAvailable,
709 ServerKeepAlive,
710 ResponseInformation,
711 ServerReference,
712 AuthenticationMethod,
713 AuthenticationData,
714 );
715 len
716 }
717}
718
719#[derive(Debug, Clone, PartialEq, Eq)]
721#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
722pub struct Disconnect {
723 pub reason_code: DisconnectReasonCode,
724 pub properties: DisconnectProperties,
725}
726
727impl Disconnect {
728 pub fn new(reason_code: DisconnectReasonCode) -> Self {
729 Disconnect {
730 reason_code,
731 properties: DisconnectProperties::default(),
732 }
733 }
734
735 pub fn new_normal() -> Self {
736 Self::new(DisconnectReasonCode::NormalDisconnect)
737 }
738
739 pub async fn decode_async<T: AsyncRead + Unpin>(
740 reader: &mut T,
741 header: Header,
742 ) -> Result<Self, ErrorV5> {
743 let (reason_code, properties) = if header.remaining_len == 0 {
744 (DisconnectReasonCode::NormalDisconnect, Default::default())
745 } else if header.remaining_len == 1 {
746 let reason_byte = read_u8(reader).await?;
747 let reason_code = DisconnectReasonCode::from_u8(reason_byte)
748 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
749 (reason_code, Default::default())
750 } else {
751 let reason_byte = read_u8(reader).await?;
752 let reason_code = DisconnectReasonCode::from_u8(reason_byte)
753 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
754 let properties = DisconnectProperties::decode_async(reader, header.typ).await?;
755 (reason_code, properties)
756 };
757 Ok(Disconnect {
758 reason_code,
759 properties,
760 })
761 }
762}
763
764impl Encodable for Disconnect {
765 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
766 if self.properties == DisconnectProperties::default() {
767 if self.reason_code != DisconnectReasonCode::NormalDisconnect {
768 write_u8(writer, self.reason_code as u8)?;
769 }
770 } else {
771 write_u8(writer, self.reason_code as u8)?;
772 self.properties.encode(writer)?;
773 }
774 Ok(())
775 }
776
777 fn encode_len(&self) -> usize {
778 if self.properties == DisconnectProperties::default() {
779 if self.reason_code == DisconnectReasonCode::NormalDisconnect {
780 0
781 } else {
782 1
783 }
784 } else {
785 1 + self.properties.encode_len()
786 }
787 }
788}
789
790#[repr(u8)]
827#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
828#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
829pub enum DisconnectReasonCode {
830 NormalDisconnect = 0x00,
831 DisconnectWithWillMessage = 0x04,
832 UnspecifiedError = 0x80,
833 MalformedPacket = 0x81,
834 ProtocolError = 0x82,
835 ImplementationSpecificError = 0x83,
836 NotAuthorized = 0x87,
837 ServerBusy = 0x89,
838 ServerShuttingDown = 0x8B,
839 KeepAliveTimeout = 0x8D,
840 SessionTakenOver = 0x8E,
841 TopicFilterInvalid = 0x8F,
842 TopicNameInvalid = 0x90,
843 ReceiveMaximumExceeded = 0x93,
844 TopicAliasInvalid = 0x94,
845 PacketTooLarge = 0x95,
846 MessageRateTooHigh = 0x96,
847 QuotaExceeded = 0x97,
848 AdministrativeAction = 0x98,
849 PayloadFormatInvalid = 0x99,
850 RetainNotSupported = 0x9A,
851 QoSNotSupported = 0x9B,
852 UserAnotherServer = 0x9C,
853 ServerMoved = 0x9D,
854 SharedSubscriptionNotSupported = 0x9E,
855 ConnectionRateExceeded = 0x9F,
856 MaximumConnectTime = 0xA0,
857 SubscriptionIdentifiersNotSupported = 0xA1,
858 WildcardSubscriptionsNotSupported = 0xA2,
859}
860
861impl DisconnectReasonCode {
862 pub fn from_u8(value: u8) -> Option<Self> {
863 let code = match value {
864 0x00 => Self::NormalDisconnect,
865 0x04 => Self::DisconnectWithWillMessage,
866 0x80 => Self::UnspecifiedError,
867 0x81 => Self::MalformedPacket,
868 0x82 => Self::ProtocolError,
869 0x83 => Self::ImplementationSpecificError,
870 0x87 => Self::NotAuthorized,
871 0x89 => Self::ServerBusy,
872 0x8B => Self::ServerShuttingDown,
873 0x8D => Self::KeepAliveTimeout,
874 0x8E => Self::SessionTakenOver,
875 0x8F => Self::TopicFilterInvalid,
876 0x90 => Self::TopicNameInvalid,
877 0x93 => Self::ReceiveMaximumExceeded,
878 0x94 => Self::TopicAliasInvalid,
879 0x95 => Self::PacketTooLarge,
880 0x96 => Self::MessageRateTooHigh,
881 0x97 => Self::QuotaExceeded,
882 0x98 => Self::AdministrativeAction,
883 0x99 => Self::PayloadFormatInvalid,
884 0x9A => Self::RetainNotSupported,
885 0x9B => Self::QoSNotSupported,
886 0x9C => Self::UserAnotherServer,
887 0x9D => Self::ServerMoved,
888 0x9E => Self::SharedSubscriptionNotSupported,
889 0x9F => Self::ConnectionRateExceeded,
890 0xA0 => Self::MaximumConnectTime,
891 0xA1 => Self::SubscriptionIdentifiersNotSupported,
892 0xA2 => Self::WildcardSubscriptionsNotSupported,
893 _ => return None,
894 };
895 Some(code)
896 }
897}
898
899#[derive(Debug, Clone, PartialEq, Eq, Default)]
901#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
902pub struct DisconnectProperties {
903 pub session_expiry_interval: Option<u32>,
904 pub reason_string: Option<Arc<String>>,
905 pub user_properties: Vec<UserProperty>,
906 pub server_reference: Option<Arc<String>>,
907}
908
909impl DisconnectProperties {
910 pub async fn decode_async<T: AsyncRead + Unpin>(
911 reader: &mut T,
912 packet_type: PacketType,
913 ) -> Result<Self, ErrorV5> {
914 let mut properties = DisconnectProperties::default();
915 decode_properties!(
916 packet_type,
917 properties,
918 reader,
919 SessionExpiryInterval,
920 ReasonString,
921 ServerReference,
922 );
923 Ok(properties)
924 }
925}
926
927impl Encodable for DisconnectProperties {
928 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
929 encode_properties!(
930 self,
931 writer,
932 SessionExpiryInterval,
933 ReasonString,
934 ServerReference,
935 );
936 Ok(())
937 }
938
939 fn encode_len(&self) -> usize {
940 let mut len = 0;
941 encode_properties_len!(
942 self,
943 len,
944 SessionExpiryInterval,
945 ReasonString,
946 ServerReference,
947 );
948 len
949 }
950}
951
952#[derive(Debug, Clone, PartialEq, Eq)]
954#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
955pub struct Auth {
956 pub reason_code: AuthReasonCode,
957 pub properties: AuthProperties,
958}
959
960impl Auth {
961 pub fn new(reason_code: AuthReasonCode) -> Self {
962 Auth {
963 reason_code,
964 properties: AuthProperties::default(),
965 }
966 }
967
968 pub fn new_success() -> Self {
969 Self::new(AuthReasonCode::Success)
970 }
971
972 pub async fn decode_async<T: AsyncRead + Unpin>(
973 reader: &mut T,
974 header: Header,
975 ) -> Result<Self, ErrorV5> {
976 let auth = if header.remaining_len == 0 {
977 Auth {
978 reason_code: AuthReasonCode::Success,
979 properties: AuthProperties::default(),
980 }
981 } else {
982 let reason_byte = read_u8(reader).await?;
983 let reason_code = AuthReasonCode::from_u8(reason_byte)
984 .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
985 let properties = AuthProperties::decode_async(reader, header.typ).await?;
986 Auth {
987 reason_code,
988 properties,
989 }
990 };
991 Ok(auth)
992 }
993}
994
995impl Encodable for Auth {
996 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
997 if self.reason_code != AuthReasonCode::Success
998 || self.properties != AuthProperties::default()
999 {
1000 write_u8(writer, self.reason_code as u8)?;
1001 self.properties.encode(writer)?;
1002 }
1003 Ok(())
1004 }
1005
1006 fn encode_len(&self) -> usize {
1007 if self.reason_code == AuthReasonCode::Success
1008 && self.properties == AuthProperties::default()
1009 {
1010 0
1011 } else {
1012 1 + self.properties.encode_len()
1013 }
1014 }
1015}
1016
1017#[repr(u8)]
1025#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1026#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
1027pub enum AuthReasonCode {
1028 Success = 0x00,
1029 ContinueAuthentication = 0x18,
1030 ReAuthentication = 0x19,
1031}
1032
1033impl AuthReasonCode {
1034 pub fn from_u8(value: u8) -> Option<Self> {
1035 let code = match value {
1036 0x00 => Self::Success,
1037 0x18 => Self::ContinueAuthentication,
1038 0x19 => Self::ReAuthentication,
1039 _ => return None,
1040 };
1041 Some(code)
1042 }
1043}
1044
1045#[derive(Debug, Clone, PartialEq, Eq, Default)]
1047pub struct AuthProperties {
1048 pub auth_method: Option<Arc<String>>,
1049 pub auth_data: Option<Bytes>,
1050 pub reason_string: Option<Arc<String>>,
1051 pub user_properties: Vec<UserProperty>,
1052}
1053
1054#[cfg(feature = "arbitrary")]
1055impl<'a> arbitrary::Arbitrary<'a> for AuthProperties {
1056 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1057 Ok(AuthProperties {
1058 auth_method: u.arbitrary()?,
1059 auth_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
1060 reason_string: u.arbitrary()?,
1061 user_properties: u.arbitrary()?,
1062 })
1063 }
1064}
1065
1066impl AuthProperties {
1067 pub async fn decode_async<T: AsyncRead + Unpin>(
1068 reader: &mut T,
1069 packet_type: PacketType,
1070 ) -> Result<Self, ErrorV5> {
1071 let mut properties = AuthProperties::default();
1072 decode_properties!(
1073 packet_type,
1074 properties,
1075 reader,
1076 AuthenticationMethod,
1077 AuthenticationData,
1078 ReasonString,
1079 );
1080 Ok(properties)
1081 }
1082}
1083
1084impl Encodable for AuthProperties {
1085 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
1086 encode_properties!(
1087 self,
1088 writer,
1089 AuthenticationMethod,
1090 AuthenticationData,
1091 ReasonString,
1092 );
1093 Ok(())
1094 }
1095
1096 fn encode_len(&self) -> usize {
1097 let mut len = 0;
1098 encode_properties_len!(
1099 self,
1100 len,
1101 AuthenticationMethod,
1102 AuthenticationData,
1103 ReasonString,
1104 );
1105 len
1106 }
1107}