1use crate::{
2 topic::{Topic, TopicFilter, TopicParseError},
3 SHARED_SUBSCRIPTION_PREFIX,
4};
5use bytes::{BufMut, Bytes, BytesMut};
6use num_enum::TryFromPrimitive;
7use properties::*;
8
9#[derive(Debug)]
10pub enum DecodeError {
11 InvalidPacketType,
12 InvalidProtocolVersion,
13 InvalidRemainingLength,
14 PacketTooLarge,
15 InvalidUtf8,
16 InvalidQoS,
17 InvalidRetainHandling,
18 InvalidConnectReason,
19 InvalidDisconnectReason,
20 InvalidPublishAckReason,
21 InvalidPublishReceivedReason,
22 InvalidPublishReleaseReason,
23 InvalidPublishCompleteReason,
24 InvalidSubscribeAckReason,
25 InvalidUnsubscribeAckReason,
26 InvalidAuthenticateReason,
27 InvalidPropertyId,
28 InvalidPropertyForPacket,
29 InvalidTopic(TopicParseError),
30 InvalidTopicFilter(TopicParseError),
31 Io(std::io::Error),
32 BadTransport, }
34
35#[derive(Debug)]
36pub enum EncodeError {
37 BadTransport,
38 Io(std::io::Error),
39}
40
41impl From<websocket_codec::Error> for EncodeError {
42 fn from(_err: websocket_codec::Error) -> EncodeError {
43 EncodeError::BadTransport
44 }
45}
46
47#[derive(Debug)]
48pub enum ProtocolError {
49 MalformedPacket(DecodeError),
50 ConnectTimedOut,
51 FirstPacketNotConnect,
52}
53
54#[repr(u8)]
55#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
56pub enum ProtocolVersion {
57 V311 = 4,
58 V500 = 5,
59}
60
61#[derive(Debug, Clone, PartialEq)]
62pub struct VariableByteInt(pub u32);
63
64impl VariableByteInt {
65 pub fn encode_to_bytes(&self, bytes: &mut BytesMut) {
66 let mut x = self.0;
67
68 loop {
69 let mut encoded_byte: u8 = (x % 128) as u8;
70 x /= 128;
71
72 if x > 0 {
73 encoded_byte |= 128;
74 }
75
76 bytes.put_u8(encoded_byte);
77
78 if x == 0 {
79 break;
80 }
81 }
82 }
83
84 pub fn calculate_size(&self, protocol_version: ProtocolVersion) -> u32 {
85 self.calc_size(protocol_version)
86 }
87}
88
89impl From<std::io::Error> for DecodeError {
90 fn from(err: std::io::Error) -> Self {
91 DecodeError::Io(err)
92 }
93}
94
95impl From<std::io::Error> for EncodeError {
96 fn from(err: std::io::Error) -> Self {
97 EncodeError::Io(err)
98 }
99}
100
101trait PacketSize {
102 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32;
103}
104
105pub trait PropertySize {
106 fn property_size(&self, protocol_version: ProtocolVersion) -> u32;
107}
108
109pub trait Encode {
110 fn encode(&self, bytes: &mut BytesMut);
111}
112
113impl<T: Encode> Encode for Option<T> {
114 fn encode(&self, bytes: &mut BytesMut) {
115 if let Some(data) = self {
116 data.encode(bytes);
117 }
118 }
119}
120
121impl Encode for Vec<UserProperty> {
122 fn encode(&self, bytes: &mut BytesMut) {
123 for property in self {
124 property.encode(bytes);
125 }
126 }
127}
128
129impl PacketSize for u16 {
130 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
131 2
132 }
133}
134
135impl PacketSize for VariableByteInt {
136 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
137 match self.0 {
138 0..=127 => 1,
139 128..=16_383 => 2,
140 16384..=2_097_151 => 3,
141 2_097_152..=268_435_455 => 4,
142 _ => unreachable!(),
143 }
144 }
145}
146
147impl PacketSize for String {
148 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
149 2 + self.len() as u32
150 }
151}
152
153impl PacketSize for &str {
154 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
155 2 + self.len() as u32
156 }
157}
158
159impl PacketSize for &[u8] {
160 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
161 2 + self.len() as u32
162 }
163}
164
165impl PacketSize for Bytes {
166 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
167 2 + self.len() as u32
168 }
169}
170
171impl PacketSize for Vec<UserProperty> {
172 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
173 self.iter().map(|x| x.calc_size(protocol_version)).sum()
174 }
175}
176
177impl PacketSize for Vec<SubscriptionTopic> {
178 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
179 self.iter().map(|x| x.calc_size(protocol_version)).sum()
180 }
181}
182
183impl PacketSize for Vec<String> {
184 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
185 self.iter().map(|x| x.calc_size(protocol_version)).sum()
186 }
187}
188
189impl<T: PacketSize> PacketSize for Option<T> {
190 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
191 match self {
192 Some(p) => p.calc_size(protocol_version),
193 None => 0,
194 }
195 }
196}
197
198impl PacketSize for Topic {
199 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
200 self.topic_name().calc_size(protocol_version)
201 }
202}
203
204impl PacketSize for TopicFilter {
205 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
206 match self {
207 TopicFilter::Concrete { filter, .. } | TopicFilter::Wildcard { filter, .. } => {
208 filter.calc_size(protocol_version)
209 },
210 TopicFilter::SharedConcrete { group_name, filter, .. }
211 | TopicFilter::SharedWildcard { group_name, filter, .. } => {
212 (2 + SHARED_SUBSCRIPTION_PREFIX.len() + group_name.len() + 1 + filter.len()) as u32
213 },
214 }
215 }
216}
217
218impl PacketSize for Vec<TopicFilter> {
219 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
220 self.iter().map(|x| x.calc_size(protocol_version)).sum()
221 }
222}
223
224#[repr(u8)]
225#[derive(Debug, TryFromPrimitive)]
226pub enum PacketType {
227 Connect = 1,
228 ConnectAck = 2,
229 Publish = 3,
230 PublishAck = 4,
231 PublishReceived = 5,
232 PublishRelease = 6,
233 PublishComplete = 7,
234 Subscribe = 8,
235 SubscribeAck = 9,
236 Unsubscribe = 10,
237 UnsubscribeAck = 11,
238 PingRequest = 12,
239 PingResponse = 13,
240 Disconnect = 14,
241 Authenticate = 15,
242}
243
244#[repr(u8)]
245#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, TryFromPrimitive)]
246pub enum QoS {
247 AtMostOnce = 0, AtLeastOnce = 1, ExactlyOnce = 2, }
251
252#[repr(u8)]
253#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, TryFromPrimitive)]
254pub enum RetainHandling {
255 SendAtSubscribeTime = 0,
256 SendAtSubscribeTimeIfNonexistent = 1,
257 DoNotSend = 2,
258}
259
260pub mod properties {
261 use super::{PacketSize, QoS, VariableByteInt};
262 use crate::types::ProtocolVersion;
263 use bytes::Bytes;
264 use num_enum::TryFromPrimitive;
265
266 #[derive(Debug, Clone, PartialEq)]
271 pub struct PayloadFormatIndicator(pub u8);
272 impl PacketSize for PayloadFormatIndicator {
273 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
274 1 + 1
275 }
276 }
277
278 #[derive(Debug, Clone, PartialEq)]
279 pub struct MessageExpiryInterval(pub u32);
280 impl PacketSize for MessageExpiryInterval {
281 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
282 1 + 4
283 }
284 }
285
286 #[derive(Debug, Clone, PartialEq)]
287 pub struct ContentType(pub String);
288 impl PacketSize for ContentType {
289 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
290 1 + self.0.calc_size(protocol_version)
291 }
292 }
293
294 #[derive(Debug, Clone, PartialEq)]
295 pub struct ResponseTopic(pub String);
296 impl PacketSize for ResponseTopic {
297 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
298 1 + self.0.calc_size(protocol_version)
299 }
300 }
301
302 #[derive(Debug, Clone, PartialEq)]
303 pub struct CorrelationData(pub Bytes);
304 impl PacketSize for CorrelationData {
305 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
306 1 + self.0.calc_size(protocol_version)
307 }
308 }
309
310 #[derive(Debug, Clone, PartialEq)]
311 pub struct SubscriptionIdentifier(pub VariableByteInt);
312 impl PacketSize for SubscriptionIdentifier {
313 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
314 1 + self.0.calc_size(protocol_version)
315 }
316 }
317
318 #[derive(Debug, Clone, PartialEq)]
319 pub struct SessionExpiryInterval(pub u32);
320 impl PacketSize for SessionExpiryInterval {
321 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
322 1 + 4
323 }
324 }
325
326 #[derive(Debug, Clone, PartialEq)]
327 pub struct AssignedClientIdentifier(pub String);
328 impl PacketSize for AssignedClientIdentifier {
329 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
330 1 + self.0.calc_size(protocol_version)
331 }
332 }
333
334 #[derive(Debug, Clone, PartialEq)]
335 pub struct ServerKeepAlive(pub u16);
336 impl PacketSize for ServerKeepAlive {
337 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
338 1 + 2
339 }
340 }
341
342 #[derive(Debug, Clone, PartialEq)]
343 pub struct AuthenticationMethod(pub String);
344 impl PacketSize for AuthenticationMethod {
345 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
346 1 + self.0.calc_size(protocol_version)
347 }
348 }
349
350 #[derive(Debug, Clone, PartialEq)]
351 pub struct AuthenticationData(pub Bytes);
352 impl PacketSize for AuthenticationData {
353 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
354 1 + self.0.calc_size(protocol_version)
355 }
356 }
357
358 #[derive(Debug, Clone, PartialEq)]
359 pub struct RequestProblemInformation(pub u8);
360 impl PacketSize for RequestProblemInformation {
361 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
362 1 + 1
363 }
364 }
365
366 #[derive(Debug, Clone, PartialEq)]
367 pub struct WillDelayInterval(pub u32);
368 impl PacketSize for WillDelayInterval {
369 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
370 1 + 4
371 }
372 }
373
374 #[derive(Debug, Clone, PartialEq)]
375 pub struct RequestResponseInformation(pub u8);
376 impl PacketSize for RequestResponseInformation {
377 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
378 1 + 1
379 }
380 }
381
382 #[derive(Debug, Clone, PartialEq)]
383 pub struct ResponseInformation(pub String);
384 impl PacketSize for ResponseInformation {
385 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
386 1 + self.0.calc_size(protocol_version)
387 }
388 }
389
390 #[derive(Debug, Clone, PartialEq)]
391 pub struct ServerReference(pub String);
392 impl PacketSize for ServerReference {
393 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
394 1 + self.0.calc_size(protocol_version)
395 }
396 }
397
398 #[derive(Debug, Clone, PartialEq)]
399 pub struct ReasonString(pub String);
400 impl PacketSize for ReasonString {
401 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
402 1 + self.0.calc_size(protocol_version)
403 }
404 }
405
406 #[derive(Debug, Clone, PartialEq)]
407 pub struct ReceiveMaximum(pub u16);
408 impl PacketSize for ReceiveMaximum {
409 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
410 1 + 2
411 }
412 }
413
414 #[derive(Debug, Clone, PartialEq)]
415 pub struct TopicAliasMaximum(pub u16);
416 impl PacketSize for TopicAliasMaximum {
417 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
418 1 + 2
419 }
420 }
421
422 #[derive(Debug, Clone, PartialEq)]
423 pub struct TopicAlias(pub u16);
424 impl PacketSize for TopicAlias {
425 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
426 1 + 2
427 }
428 }
429
430 #[derive(Debug, Clone, PartialEq)]
431 pub struct MaximumQos(pub QoS);
432 impl PacketSize for MaximumQos {
433 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
434 1 + 1
435 }
436 }
437
438 #[derive(Debug, Clone, PartialEq)]
439 pub struct RetainAvailable(pub u8);
440 impl PacketSize for RetainAvailable {
441 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
442 1 + 1
443 }
444 }
445
446 #[derive(Debug, Clone, PartialEq)]
447 pub struct UserProperty(pub String, pub String);
448 impl PacketSize for UserProperty {
449 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
450 1 + self.0.calc_size(protocol_version) + self.1.calc_size(protocol_version)
451 }
452 }
453
454 #[derive(Debug, Clone, PartialEq)]
455 pub struct MaximumPacketSize(pub u32);
456 impl PacketSize for MaximumPacketSize {
457 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
458 1 + 4
459 }
460 }
461
462 #[derive(Debug, Clone, PartialEq)]
463 pub struct WildcardSubscriptionAvailable(pub u8);
464 impl PacketSize for WildcardSubscriptionAvailable {
465 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
466 1 + 1
467 }
468 }
469
470 #[derive(Debug, Clone, PartialEq)]
471 pub struct SubscriptionIdentifierAvailable(pub u8);
472 impl PacketSize for SubscriptionIdentifierAvailable {
473 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
474 1 + 1
475 }
476 }
477
478 #[derive(Debug, Clone, PartialEq)]
479 pub struct SharedSubscriptionAvailable(pub u8);
480 impl PacketSize for SharedSubscriptionAvailable {
481 fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
482 1 + 1
483 }
484 }
485
486 #[repr(u32)]
487 #[derive(Debug, PartialEq, TryFromPrimitive)]
488 pub enum PropertyType {
489 PayloadFormatIndicator = 1,
490 MessageExpiryInterval = 2,
491 ContentType = 3,
492 ResponseTopic = 8,
493 CorrelationData = 9,
494 SubscriptionIdentifier = 11,
495 SessionExpiryInterval = 17,
496 AssignedClientIdentifier = 18,
497 ServerKeepAlive = 19,
498 AuthenticationMethod = 21,
499 AuthenticationData = 22,
500 RequestProblemInformation = 23,
501 WillDelayInterval = 24,
502 RequestResponseInformation = 25,
503 ResponseInformation = 26,
504 ServerReference = 28,
505 ReasonString = 31,
506 ReceiveMaximum = 33,
507 TopicAliasMaximum = 34,
508 TopicAlias = 35,
509 MaximumQos = 36,
510 RetainAvailable = 37,
511 UserProperty = 38,
512 MaximumPacketSize = 39,
513 WildcardSubscriptionAvailable = 40,
514 SubscriptionIdentifierAvailable = 41,
515 SharedSubscriptionAvailable = 42,
516 }
517
518 #[derive(Debug, Clone, PartialEq)]
519 pub enum Property {
520 PayloadFormatIndicator(PayloadFormatIndicator),
521 MessageExpiryInterval(MessageExpiryInterval),
522 ContentType(ContentType),
523 ResponseTopic(ResponseTopic),
524 CorrelationData(CorrelationData),
525 SubscriptionIdentifier(SubscriptionIdentifier),
526 SessionExpiryInterval(SessionExpiryInterval),
527 AssignedClientIdentifier(AssignedClientIdentifier),
528 ServerKeepAlive(ServerKeepAlive),
529 AuthenticationMethod(AuthenticationMethod),
530 AuthenticationData(AuthenticationData),
531 RequestProblemInformation(RequestProblemInformation),
532 WillDelayInterval(WillDelayInterval),
533 RequestResponseInformation(RequestResponseInformation),
534 ResponseInformation(ResponseInformation),
535 ServerReference(ServerReference),
536 ReasonString(ReasonString),
537 ReceiveMaximum(ReceiveMaximum),
538 TopicAliasMaximum(TopicAliasMaximum),
539 TopicAlias(TopicAlias),
540 MaximumQos(MaximumQos),
541 RetainAvailable(RetainAvailable),
542 UserProperty(UserProperty),
543 MaximumPacketSize(MaximumPacketSize),
544 WildcardSubscriptionAvailable(WildcardSubscriptionAvailable),
545 SubscriptionIdentifierAvailable(SubscriptionIdentifierAvailable),
546 SharedSubscriptionAvailable(SharedSubscriptionAvailable),
547 }
548
549 impl Property {
550 pub fn property_type(&self) -> PropertyType {
551 match self {
552 Property::PayloadFormatIndicator(_) => PropertyType::PayloadFormatIndicator,
553 Property::MessageExpiryInterval(_) => PropertyType::MessageExpiryInterval,
554 Property::ContentType(_) => PropertyType::ContentType,
555 Property::ResponseTopic(_) => PropertyType::ResponseTopic,
556 Property::CorrelationData(_) => PropertyType::CorrelationData,
557 Property::SubscriptionIdentifier(_) => PropertyType::SubscriptionIdentifier,
558 Property::SessionExpiryInterval(_) => PropertyType::SessionExpiryInterval,
559 Property::AssignedClientIdentifier(_) => PropertyType::AssignedClientIdentifier,
560 Property::ServerKeepAlive(_) => PropertyType::ServerKeepAlive,
561 Property::AuthenticationMethod(_) => PropertyType::AuthenticationMethod,
562 Property::AuthenticationData(_) => PropertyType::AuthenticationData,
563 Property::RequestProblemInformation(_) => PropertyType::RequestProblemInformation,
564 Property::WillDelayInterval(_) => PropertyType::WillDelayInterval,
565 Property::RequestResponseInformation(_) => PropertyType::RequestResponseInformation,
566 Property::ResponseInformation(_) => PropertyType::ResponseInformation,
567 Property::ServerReference(_) => PropertyType::ServerReference,
568 Property::ReasonString(_) => PropertyType::ReasonString,
569 Property::ReceiveMaximum(_) => PropertyType::ReceiveMaximum,
570 Property::TopicAliasMaximum(_) => PropertyType::TopicAliasMaximum,
571 Property::TopicAlias(_) => PropertyType::TopicAlias,
572 Property::MaximumQos(_) => PropertyType::MaximumQos,
573 Property::RetainAvailable(_) => PropertyType::RetainAvailable,
574 Property::UserProperty(_) => PropertyType::UserProperty,
575 Property::MaximumPacketSize(_) => PropertyType::MaximumPacketSize,
576 Property::WildcardSubscriptionAvailable(_) => {
577 PropertyType::WildcardSubscriptionAvailable
578 },
579 Property::SubscriptionIdentifierAvailable(_) => {
580 PropertyType::SubscriptionIdentifierAvailable
581 },
582 Property::SharedSubscriptionAvailable(_) => {
583 PropertyType::SharedSubscriptionAvailable
584 },
585 }
586 }
587 }
588}
589
590#[repr(u8)]
591#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
592pub enum ConnectReason {
593 Success = 0,
594 UnspecifiedError = 128,
595 MalformedPacket = 129,
596 ProtocolError = 130,
597 ImplementationSpecificError = 131,
598 UnsupportedProtocolVersion = 132,
599 ClientIdentifierNotValid = 133,
600 BadUserNameOrPassword = 134,
601 NotAuthorized = 135,
602 ServerUnavailable = 136,
603 ServerBusy = 137,
604 Banned = 138,
605 BadAuthenticationMethod = 140,
606 TopicNameInvalid = 144,
607 PacketTooLarge = 149,
608 QuotaExceeded = 151,
609 PayloadFormatInvalid = 153,
610 RetainNotSupported = 154,
611 QosNotSupported = 155,
612 UseAnotherServer = 156,
613 ServerMoved = 157,
614 ConnectionRateExceeded = 159,
615}
616
617#[repr(u8)]
618#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
619pub enum PublishAckReason {
620 Success = 0,
621 NoMatchingSubscribers = 16,
622 UnspecifiedError = 128,
623 ImplementationSpecificError = 131,
624 NotAuthorized = 135,
625 TopicNameInvalid = 144,
626 PacketIdentifierInUse = 145,
627 QuotaExceeded = 151,
628 PayloadFormatInvalid = 153,
629}
630
631#[repr(u8)]
632#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
633pub enum PublishReceivedReason {
634 Success = 0,
635 NoMatchingSubscribers = 16,
636 UnspecifiedError = 128,
637 ImplementationSpecificError = 131,
638 NotAuthorized = 135,
639 TopicNameInvalid = 144,
640 PacketIdentifierInUse = 145,
641 QuotaExceeded = 151,
642 PayloadFormatInvalid = 153,
643}
644
645#[repr(u8)]
646#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
647pub enum PublishReleaseReason {
648 Success = 0,
649 PacketIdentifierNotFound = 146,
650}
651
652#[repr(u8)]
653#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
654pub enum PublishCompleteReason {
655 Success = 0,
656 PacketIdentifierNotFound = 146,
657}
658
659#[repr(u8)]
660#[derive(Clone, Copy, Debug, PartialEq, TryFromPrimitive)]
661pub enum SubscribeAckReason {
662 GrantedQoSZero = 0,
663 GrantedQoSOne = 1,
664 GrantedQoSTwo = 2,
665 UnspecifiedError = 128,
666 ImplementationSpecificError = 131,
667 NotAuthorized = 135,
668 TopicFilterInvalid = 143,
669 PacketIdentifierInUse = 145,
670 QuotaExceeded = 151,
671 SharedSubscriptionsNotSupported = 158,
672 SubscriptionIdentifiersNotSupported = 161,
673 WildcardSubscriptionsNotSupported = 162,
674}
675
676#[repr(u8)]
677#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
678pub enum UnsubscribeAckReason {
679 Success = 0,
680 NoSubscriptionExisted = 17,
681 UnspecifiedError = 128,
682 ImplementationSpecificError = 131,
683 NotAuthorized = 135,
684 TopicFilterInvalid = 143,
685 PacketIdentifierInUse = 145,
686}
687
688#[repr(u8)]
689#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
690pub enum DisconnectReason {
691 NormalDisconnection = 0,
692 DisconnectWithWillMessage = 4,
693 UnspecifiedError = 128,
694 MalformedPacket = 129,
695 ProtocolError = 130,
696 ImplementationSpecificError = 131,
697 NotAuthorized = 135,
698 ServerBusy = 137,
699 ServerShuttingDown = 139,
700 KeepAliveTimeout = 141,
701 SessionTakenOver = 142,
702 TopicFilterInvalid = 143,
703 TopicNameInvalid = 144,
704 ReceiveMaximumExceeded = 147,
705 TopicAliasInvalid = 148,
706 PacketTooLarge = 149,
707 MessageRateTooHigh = 150,
708 QuotaExceeded = 151,
709 AdministrativeAction = 152,
710 PayloadFormatInvalid = 153,
711 RetainNotSupported = 154,
712 QosNotSupported = 155,
713 UseAnotherServer = 156,
714 ServerMoved = 157,
715 SharedSubscriptionNotAvailable = 158,
716 ConnectionRateExceeded = 159,
717 MaximumConnectTime = 160,
718 SubscriptionIdentifiersNotAvailable = 161,
719 WildcardSubscriptionsNotAvailable = 162,
720}
721
722#[repr(u8)]
723#[derive(Copy, Clone, Debug, PartialEq, TryFromPrimitive)]
724pub enum AuthenticateReason {
725 Success = 0,
726 ContinueAuthentication = 24,
727 ReAuthenticate = 25,
728}
729
730#[derive(Debug, PartialEq)]
732pub struct FinalWill {
733 pub topic: String,
734 pub payload: Bytes,
735 pub qos: QoS,
736 pub should_retain: bool,
737
738 pub will_delay_interval: Option<WillDelayInterval>,
740 pub payload_format_indicator: Option<PayloadFormatIndicator>,
741 pub message_expiry_interval: Option<MessageExpiryInterval>,
742 pub content_type: Option<ContentType>,
743 pub response_topic: Option<ResponseTopic>,
744 pub correlation_data: Option<CorrelationData>,
745 pub user_properties: Vec<UserProperty>,
746}
747
748impl PacketSize for FinalWill {
749 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
750 let mut size = 0;
751
752 size += self.topic.calc_size(protocol_version);
753 size += self.payload.calc_size(protocol_version);
754
755 let property_size = self.property_size(protocol_version);
756 size += property_size + VariableByteInt(property_size).calc_size(protocol_version);
757
758 size
759 }
760}
761
762impl PropertySize for FinalWill {
763 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
764 let mut property_size = 0;
765 property_size += self.will_delay_interval.calc_size(protocol_version);
766 property_size += self.payload_format_indicator.calc_size(protocol_version);
767 property_size += self.message_expiry_interval.calc_size(protocol_version);
768 property_size += self.content_type.calc_size(protocol_version);
769 property_size += self.response_topic.calc_size(protocol_version);
770 property_size += self.correlation_data.calc_size(protocol_version);
771 property_size += self.user_properties.calc_size(protocol_version);
772
773 property_size
774 }
775}
776
777#[derive(Debug, PartialEq)]
778pub struct SubscriptionTopic {
779 pub topic_filter: TopicFilter,
780 pub maximum_qos: QoS,
781 pub no_local: bool,
782 pub retain_as_published: bool,
783 pub retain_handling: RetainHandling,
784}
785
786impl PacketSize for SubscriptionTopic {
787 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
788 self.topic_filter.calc_size(protocol_version) + 1
789 }
790}
791
792#[derive(Debug, PartialEq)]
794pub struct ConnectPacket {
795 pub protocol_name: String,
797 pub protocol_version: ProtocolVersion,
798 pub clean_start: bool,
799 pub keep_alive: u16,
800
801 pub session_expiry_interval: Option<SessionExpiryInterval>,
803 pub receive_maximum: Option<ReceiveMaximum>,
804 pub maximum_packet_size: Option<MaximumPacketSize>,
805 pub topic_alias_maximum: Option<TopicAliasMaximum>,
806 pub request_response_information: Option<RequestResponseInformation>,
807 pub request_problem_information: Option<RequestProblemInformation>,
808 pub user_properties: Vec<UserProperty>,
809 pub authentication_method: Option<AuthenticationMethod>,
810 pub authentication_data: Option<AuthenticationData>,
811
812 pub client_id: String,
814 pub will: Option<FinalWill>,
815 pub user_name: Option<String>,
816 pub password: Option<String>,
817}
818
819impl PropertySize for ConnectPacket {
820 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
821 let mut property_size = 0;
822 property_size += self.session_expiry_interval.calc_size(protocol_version);
823 property_size += self.receive_maximum.calc_size(protocol_version);
824 property_size += self.maximum_packet_size.calc_size(protocol_version);
825 property_size += self.topic_alias_maximum.calc_size(protocol_version);
826 property_size += self.request_response_information.calc_size(protocol_version);
827 property_size += self.request_problem_information.calc_size(protocol_version);
828 property_size += self.user_properties.calc_size(protocol_version);
829 property_size += self.authentication_method.calc_size(protocol_version);
830 property_size += self.authentication_data.calc_size(protocol_version);
831
832 property_size
833 }
834}
835
836#[derive(Debug, PartialEq)]
837pub struct ConnectAckPacket {
838 pub session_present: bool,
840 pub reason_code: ConnectReason,
841
842 pub session_expiry_interval: Option<SessionExpiryInterval>,
844 pub receive_maximum: Option<ReceiveMaximum>,
845 pub maximum_qos: Option<MaximumQos>,
846 pub retain_available: Option<RetainAvailable>,
847 pub maximum_packet_size: Option<MaximumPacketSize>,
848 pub assigned_client_identifier: Option<AssignedClientIdentifier>,
849 pub topic_alias_maximum: Option<TopicAliasMaximum>,
850 pub reason_string: Option<ReasonString>,
851 pub user_properties: Vec<UserProperty>,
852 pub wildcard_subscription_available: Option<WildcardSubscriptionAvailable>,
853 pub subscription_identifiers_available: Option<SubscriptionIdentifierAvailable>,
854 pub shared_subscription_available: Option<SharedSubscriptionAvailable>,
855 pub server_keep_alive: Option<ServerKeepAlive>,
856 pub response_information: Option<ResponseInformation>,
857 pub server_reference: Option<ServerReference>,
858 pub authentication_method: Option<AuthenticationMethod>,
859 pub authentication_data: Option<AuthenticationData>,
860}
861
862impl PropertySize for ConnectAckPacket {
863 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
864 let mut property_size = 0;
865 property_size += self.session_expiry_interval.calc_size(protocol_version);
866 property_size += self.receive_maximum.calc_size(protocol_version);
867 property_size += self.maximum_qos.calc_size(protocol_version);
868 property_size += self.retain_available.calc_size(protocol_version);
869 property_size += self.maximum_packet_size.calc_size(protocol_version);
870 property_size += self.assigned_client_identifier.calc_size(protocol_version);
871 property_size += self.topic_alias_maximum.calc_size(protocol_version);
872 property_size += self.reason_string.calc_size(protocol_version);
873 property_size += self.user_properties.calc_size(protocol_version);
874 property_size += self.wildcard_subscription_available.calc_size(protocol_version);
875 property_size += self.subscription_identifiers_available.calc_size(protocol_version);
876 property_size += self.shared_subscription_available.calc_size(protocol_version);
877 property_size += self.server_keep_alive.calc_size(protocol_version);
878 property_size += self.response_information.calc_size(protocol_version);
879 property_size += self.server_reference.calc_size(protocol_version);
880 property_size += self.authentication_method.calc_size(protocol_version);
881 property_size += self.authentication_data.calc_size(protocol_version);
882
883 property_size
884 }
885}
886
887#[derive(Debug, Clone, PartialEq)]
888pub struct PublishPacket {
889 pub is_duplicate: bool,
891 pub qos: QoS,
892 pub retain: bool,
893
894 pub topic: Topic,
896 pub packet_id: Option<u16>,
897
898 pub payload_format_indicator: Option<PayloadFormatIndicator>, pub message_expiry_interval: Option<MessageExpiryInterval>,
901 pub topic_alias: Option<TopicAlias>,
902 pub response_topic: Option<ResponseTopic>,
903 pub correlation_data: Option<CorrelationData>,
904 pub user_properties: Vec<UserProperty>,
905 pub subscription_identifier: Option<SubscriptionIdentifier>,
906 pub content_type: Option<ContentType>,
907
908 pub payload: Bytes,
910}
911
912impl PropertySize for PublishPacket {
913 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
914 let mut property_size = 0;
915 property_size += self.payload_format_indicator.calc_size(protocol_version);
916 property_size += self.message_expiry_interval.calc_size(protocol_version);
917 property_size += self.topic_alias.calc_size(protocol_version);
918 property_size += self.response_topic.calc_size(protocol_version);
919 property_size += self.correlation_data.calc_size(protocol_version);
920 property_size += self.user_properties.calc_size(protocol_version);
921 property_size += self.subscription_identifier.calc_size(protocol_version);
922 property_size += self.content_type.calc_size(protocol_version);
923
924 property_size
925 }
926}
927
928#[derive(Debug, PartialEq)]
929pub struct PublishAckPacket {
930 pub packet_id: u16,
932 pub reason_code: PublishAckReason,
933
934 pub reason_string: Option<ReasonString>,
936 pub user_properties: Vec<UserProperty>,
937}
938
939impl PropertySize for PublishAckPacket {
940 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
941 let mut property_size = 0;
942 property_size += self.reason_string.calc_size(protocol_version);
943 property_size += self.user_properties.calc_size(protocol_version);
944
945 property_size
946 }
947}
948
949#[derive(Debug, PartialEq)]
950pub struct PublishReceivedPacket {
951 pub packet_id: u16,
953 pub reason_code: PublishReceivedReason,
954
955 pub reason_string: Option<ReasonString>,
957 pub user_properties: Vec<UserProperty>,
958}
959
960impl PropertySize for PublishReceivedPacket {
961 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
962 let mut property_size = 0;
963 property_size += self.reason_string.calc_size(protocol_version);
964 property_size += self.user_properties.calc_size(protocol_version);
965
966 property_size
967 }
968}
969
970#[derive(Debug, PartialEq)]
971pub struct PublishReleasePacket {
972 pub packet_id: u16,
974 pub reason_code: PublishReleaseReason,
975
976 pub reason_string: Option<ReasonString>,
978 pub user_properties: Vec<UserProperty>,
979}
980
981impl PropertySize for PublishReleasePacket {
982 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
983 let mut property_size = 0;
984 property_size += self.reason_string.calc_size(protocol_version);
985 property_size += self.user_properties.calc_size(protocol_version);
986
987 property_size
988 }
989}
990
991#[derive(Debug, PartialEq)]
992pub struct PublishCompletePacket {
993 pub packet_id: u16,
995 pub reason_code: PublishCompleteReason,
996
997 pub reason_string: Option<ReasonString>,
999 pub user_properties: Vec<UserProperty>,
1000}
1001
1002impl PropertySize for PublishCompletePacket {
1003 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1004 let mut property_size = 0;
1005 property_size += self.reason_string.calc_size(protocol_version);
1006 property_size += self.user_properties.calc_size(protocol_version);
1007
1008 property_size
1009 }
1010}
1011
1012#[derive(Debug, PartialEq)]
1013pub struct SubscribePacket {
1014 pub packet_id: u16,
1016
1017 pub subscription_identifier: Option<SubscriptionIdentifier>,
1019 pub user_properties: Vec<UserProperty>,
1020
1021 pub subscription_topics: Vec<SubscriptionTopic>,
1023}
1024
1025impl PropertySize for SubscribePacket {
1026 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1027 let mut property_size = 0;
1028 property_size += self.subscription_identifier.calc_size(protocol_version);
1029 property_size += self.user_properties.calc_size(protocol_version);
1030
1031 property_size
1032 }
1033}
1034
1035#[derive(Debug, PartialEq)]
1036pub struct SubscribeAckPacket {
1037 pub packet_id: u16,
1039
1040 pub reason_string: Option<ReasonString>,
1042 pub user_properties: Vec<UserProperty>,
1043
1044 pub reason_codes: Vec<SubscribeAckReason>,
1046}
1047
1048impl PropertySize for SubscribeAckPacket {
1049 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1050 let mut property_size = 0;
1051 property_size += self.reason_string.calc_size(protocol_version);
1052 property_size += self.user_properties.calc_size(protocol_version);
1053
1054 property_size
1055 }
1056}
1057
1058#[derive(Debug, PartialEq)]
1059pub struct UnsubscribePacket {
1060 pub packet_id: u16,
1062
1063 pub user_properties: Vec<UserProperty>,
1065
1066 pub topic_filters: Vec<TopicFilter>,
1068}
1069
1070impl PropertySize for UnsubscribePacket {
1071 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1072 let mut property_size = 0;
1073 property_size += self.user_properties.calc_size(protocol_version);
1074
1075 property_size
1076 }
1077}
1078
1079#[derive(Debug, PartialEq)]
1080pub struct UnsubscribeAckPacket {
1081 pub packet_id: u16,
1083
1084 pub reason_string: Option<ReasonString>,
1086 pub user_properties: Vec<UserProperty>,
1087
1088 pub reason_codes: Vec<UnsubscribeAckReason>,
1090}
1091
1092impl PropertySize for UnsubscribeAckPacket {
1093 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1094 let mut property_size = 0;
1095 property_size += self.reason_string.calc_size(protocol_version);
1096 property_size += self.user_properties.calc_size(protocol_version);
1097
1098 property_size
1099 }
1100}
1101
1102#[derive(Debug, PartialEq)]
1103pub struct DisconnectPacket {
1104 pub reason_code: DisconnectReason,
1106
1107 pub session_expiry_interval: Option<SessionExpiryInterval>,
1109 pub reason_string: Option<ReasonString>,
1110 pub user_properties: Vec<UserProperty>,
1111 pub server_reference: Option<ServerReference>,
1112}
1113
1114impl PropertySize for DisconnectPacket {
1115 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1116 let mut property_size = 0;
1117 property_size += self.session_expiry_interval.calc_size(protocol_version);
1118 property_size += self.reason_string.calc_size(protocol_version);
1119 property_size += self.user_properties.calc_size(protocol_version);
1120 property_size += self.server_reference.calc_size(protocol_version);
1121
1122 property_size
1123 }
1124}
1125
1126#[derive(Debug, PartialEq)]
1127pub struct AuthenticatePacket {
1128 pub reason_code: AuthenticateReason,
1130
1131 pub authentication_method: Option<AuthenticationMethod>,
1133 pub authentication_data: Option<AuthenticationData>,
1134 pub reason_string: Option<ReasonString>,
1135 pub user_properties: Vec<UserProperty>,
1136}
1137
1138impl PropertySize for AuthenticatePacket {
1139 fn property_size(&self, protocol_version: ProtocolVersion) -> u32 {
1140 let mut property_size = 0;
1141 property_size += self.authentication_method.calc_size(protocol_version);
1142 property_size += self.authentication_data.calc_size(protocol_version);
1143 property_size += self.reason_string.calc_size(protocol_version);
1144 property_size += self.user_properties.calc_size(protocol_version);
1145
1146 property_size
1147 }
1148}
1149
1150#[allow(clippy::large_enum_variant)]
1151#[derive(Debug, PartialEq)]
1152pub enum Packet {
1153 Connect(ConnectPacket),
1154 ConnectAck(ConnectAckPacket),
1155 Publish(PublishPacket),
1156 PublishAck(PublishAckPacket),
1157 PublishReceived(PublishReceivedPacket),
1158 PublishRelease(PublishReleasePacket),
1159 PublishComplete(PublishCompletePacket),
1160 Subscribe(SubscribePacket),
1161 SubscribeAck(SubscribeAckPacket),
1162 Unsubscribe(UnsubscribePacket),
1163 UnsubscribeAck(UnsubscribeAckPacket),
1164 PingRequest,
1165 PingResponse,
1166 Disconnect(DisconnectPacket),
1167 Authenticate(AuthenticatePacket),
1168}
1169
1170impl Packet {
1171 pub fn to_byte(&self) -> u8 {
1172 match self {
1173 Packet::Connect(_) => 1,
1174 Packet::ConnectAck(_) => 2,
1175 Packet::Publish(_) => 3,
1176 Packet::PublishAck(_) => 4,
1177 Packet::PublishReceived(_) => 5,
1178 Packet::PublishRelease(_) => 6,
1179 Packet::PublishComplete(_) => 7,
1180 Packet::Subscribe(_) => 8,
1181 Packet::SubscribeAck(_) => 9,
1182 Packet::Unsubscribe(_) => 10,
1183 Packet::UnsubscribeAck(_) => 11,
1184 Packet::PingRequest => 12,
1185 Packet::PingResponse => 13,
1186 Packet::Disconnect(_) => 14,
1187 Packet::Authenticate(_) => 15,
1188 }
1189 }
1190
1191 pub fn fixed_header_flags(&self) -> u8 {
1192 match self {
1193 Packet::Connect(_)
1194 | Packet::ConnectAck(_)
1195 | Packet::PublishAck(_)
1196 | Packet::PublishReceived(_)
1197 | Packet::PublishComplete(_)
1198 | Packet::SubscribeAck(_)
1199 | Packet::UnsubscribeAck(_)
1200 | Packet::PingRequest
1201 | Packet::PingResponse
1202 | Packet::Disconnect(_)
1203 | Packet::Authenticate(_) => 0b0000_0000,
1204 Packet::PublishRelease(_) | Packet::Subscribe(_) | Packet::Unsubscribe(_) => {
1205 0b0000_0010
1206 },
1207 Packet::Publish(publish_packet) => {
1208 let mut flags: u8 = 0;
1209
1210 if publish_packet.is_duplicate {
1211 flags |= 0b0000_1000;
1212 }
1213
1214 let qos = publish_packet.qos as u8;
1215 let qos_bits = 0b0000_0110 & (qos << 1);
1216 flags |= qos_bits;
1217
1218 if publish_packet.retain {
1219 flags |= 0b0000_0001;
1220 }
1221
1222 flags
1223 },
1224 }
1225 }
1226
1227 pub fn calculate_size(&self, protocol_version: ProtocolVersion) -> u32 {
1228 self.calc_size(protocol_version)
1229 }
1230}
1231
1232impl PacketSize for Packet {
1233 fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
1234 match self {
1235 Packet::Connect(p) => {
1236 let mut size = p.protocol_name.calc_size(protocol_version);
1237
1238 size += 1 + 1 + 2;
1240
1241 if protocol_version == ProtocolVersion::V500 {
1242 let property_size = p.property_size(protocol_version);
1243 size +=
1244 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1245 }
1246
1247 size += p.client_id.calc_size(protocol_version);
1248 size += p.will.calc_size(protocol_version);
1249 size += p.user_name.calc_size(protocol_version);
1250 size += p.password.calc_size(protocol_version);
1251
1252 size
1253 },
1254 Packet::ConnectAck(p) => {
1255 let mut size = 1 + 1;
1257
1258 if protocol_version == ProtocolVersion::V500 {
1259 let property_size = p.property_size(protocol_version);
1260 size +=
1261 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1262 }
1263
1264 size
1265 },
1266 Packet::Publish(p) => {
1267 let mut size = p.topic.calc_size(protocol_version);
1268 size += p.packet_id.calc_size(protocol_version);
1269
1270 if protocol_version == ProtocolVersion::V500 {
1271 let property_size = p.property_size(protocol_version);
1272 size +=
1273 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1274 }
1275
1276 size += p.payload.len() as u32;
1278
1279 size
1280 },
1281 Packet::PublishAck(p) => {
1282 let mut size = 2;
1284
1285 if protocol_version == ProtocolVersion::V500 {
1286 let property_size = p.property_size(protocol_version);
1287 size += 1
1289 + property_size
1290 + VariableByteInt(property_size).calc_size(protocol_version);
1291 }
1292
1293 size
1294 },
1295 Packet::PublishReceived(p) => {
1296 let mut size = 2 + 1;
1298
1299 if protocol_version == ProtocolVersion::V500 {
1300 let property_size = p.property_size(protocol_version);
1301 size +=
1302 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1303 }
1304
1305 size
1306 },
1307 Packet::PublishRelease(p) => {
1308 let mut size = 2 + 1;
1310
1311 if protocol_version == ProtocolVersion::V500 {
1312 let property_size = p.property_size(protocol_version);
1313 size +=
1314 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1315 }
1316
1317 size
1318 },
1319 Packet::PublishComplete(p) => {
1320 let mut size = 2 + 1;
1322
1323 if protocol_version == ProtocolVersion::V500 {
1324 let property_size = p.property_size(protocol_version);
1325 size +=
1326 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1327 }
1328
1329 size
1330 },
1331 Packet::Subscribe(p) => {
1332 let mut size = 2;
1334
1335 if protocol_version == ProtocolVersion::V500 {
1336 let property_size = p.property_size(protocol_version);
1337 size +=
1338 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1339 }
1340
1341 size += p.subscription_topics.calc_size(protocol_version);
1342
1343 size
1344 },
1345 Packet::SubscribeAck(p) => {
1346 let mut size = 2;
1348
1349 if protocol_version == ProtocolVersion::V500 {
1350 let property_size = p.property_size(protocol_version);
1351 size +=
1352 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1353 }
1354
1355 size += p.reason_codes.len() as u32;
1356
1357 size
1358 },
1359 Packet::Unsubscribe(p) => {
1360 let mut size = 2;
1362
1363 if protocol_version == ProtocolVersion::V500 {
1364 let property_size = p.property_size(protocol_version);
1365 size +=
1366 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1367 }
1368
1369 size += p.topic_filters.calc_size(protocol_version);
1370
1371 size
1372 },
1373 Packet::UnsubscribeAck(p) => {
1374 let mut size = 2;
1376
1377 if protocol_version == ProtocolVersion::V500 {
1378 let property_size = p.property_size(protocol_version);
1379 size +=
1380 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1381 }
1382
1383 size += p.reason_codes.len() as u32;
1384
1385 size
1386 },
1387 Packet::PingRequest => 0,
1388 Packet::PingResponse => 0,
1389 Packet::Disconnect(p) => {
1390 let mut size = 1;
1392
1393 if protocol_version == ProtocolVersion::V500 {
1394 let property_size = p.property_size(protocol_version);
1395 size +=
1396 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1397 }
1398
1399 size
1400 },
1401 Packet::Authenticate(p) => {
1402 let mut size = 1;
1404
1405 if protocol_version == ProtocolVersion::V500 {
1406 let property_size = p.property_size(protocol_version);
1407 size +=
1408 property_size + VariableByteInt(property_size).calc_size(protocol_version);
1409 }
1410
1411 size
1412 },
1413 }
1414 }
1415}