1use crate::mqtt::packet::escape_binary_json_string;
2use crate::mqtt::packet::mqtt_binary::MqttBinary;
3use crate::mqtt::packet::mqtt_string::MqttString;
4use crate::mqtt::packet::DecodeResult;
5use crate::mqtt::packet::VariableByteInteger;
6use crate::mqtt::result_code::MqttError;
7use num_enum::TryFromPrimitive;
8use serde::ser::SerializeStruct;
9use serde::ser::Serializer;
10use serde::{Deserialize, Serialize};
11use std::convert::TryFrom;
12use std::fmt;
36use std::io::IoSlice;
37
38#[derive(Deserialize, PartialEq, Eq, Copy, Clone, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PropertyId {
64 PayloadFormatIndicator = 1,
66 MessageExpiryInterval = 2,
68 ContentType = 3,
70 ResponseTopic = 8,
72 CorrelationData = 9,
74 SubscriptionIdentifier = 11,
76 SessionExpiryInterval = 17,
78 AssignedClientIdentifier = 18,
80 ServerKeepAlive = 19,
82 AuthenticationMethod = 21,
84 AuthenticationData = 22,
86 RequestProblemInformation = 23,
88 WillDelayInterval = 24,
90 RequestResponseInformation = 25,
92 ResponseInformation = 26,
94 ServerReference = 28,
96 ReasonString = 31,
98 ReceiveMaximum = 33,
100 TopicAliasMaximum = 34,
102 TopicAlias = 35,
104 MaximumQos = 36,
106 RetainAvailable = 37,
108 UserProperty = 38,
110 MaximumPacketSize = 39,
112 WildcardSubscriptionAvailable = 40,
114 SubscriptionIdentifierAvailable = 41,
116 SharedSubscriptionAvailable = 42,
118}
119
120impl PropertyId {
121 pub fn as_u8(self) -> u8 {
134 self as u8
135 }
136
137 pub fn as_str(&self) -> &'static str {
151 match self {
152 PropertyId::PayloadFormatIndicator => "payload_format_indicator",
153 PropertyId::MessageExpiryInterval => "message_expiry_interval",
154 PropertyId::ContentType => "content_type",
155 PropertyId::ResponseTopic => "response_topic",
156 PropertyId::CorrelationData => "correlation_data",
157 PropertyId::SubscriptionIdentifier => "subscription_identifier",
158 PropertyId::SessionExpiryInterval => "session_expiry_interval",
159 PropertyId::AssignedClientIdentifier => "assigned_client_identifier",
160 PropertyId::ServerKeepAlive => "server_keep_alive",
161 PropertyId::AuthenticationMethod => "authentication_method",
162 PropertyId::AuthenticationData => "authentication_data",
163 PropertyId::RequestProblemInformation => "request_problem_information",
164 PropertyId::WillDelayInterval => "will_delay_interval",
165 PropertyId::RequestResponseInformation => "request_response_information",
166 PropertyId::ResponseInformation => "response_information",
167 PropertyId::ServerReference => "server_reference",
168 PropertyId::ReasonString => "reason_string",
169 PropertyId::ReceiveMaximum => "receive_maximum",
170 PropertyId::TopicAliasMaximum => "topic_alias_maximum",
171 PropertyId::TopicAlias => "topic_alias",
172 PropertyId::MaximumQos => "maximum_qos",
173 PropertyId::RetainAvailable => "retain_available",
174 PropertyId::UserProperty => "user_property",
175 PropertyId::MaximumPacketSize => "maximum_packet_size",
176 PropertyId::WildcardSubscriptionAvailable => "wildcard_subscription_available",
177 PropertyId::SubscriptionIdentifierAvailable => "subscription_identifier_available",
178 PropertyId::SharedSubscriptionAvailable => "shared_subscription_available",
179 }
180 }
181}
182
183impl Serialize for PropertyId {
184 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
185 where
186 S: Serializer,
187 {
188 serializer.serialize_str(self.as_str())
189 }
190}
191
192impl fmt::Display for PropertyId {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 match serde_json::to_string(self) {
195 Ok(json) => write!(f, "{json}"),
196 Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
197 }
198 }
199}
200
201impl fmt::Debug for PropertyId {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 fmt::Display::fmt(self, f)
204 }
205}
206
207#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TryFromPrimitive)]
226#[repr(u8)]
227pub enum PayloadFormat {
228 Binary = 0,
230 String = 1,
232}
233impl fmt::Display for PayloadFormat {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 let s = match self {
236 PayloadFormat::Binary => "binary",
237 PayloadFormat::String => "string",
238 };
239 write!(f, "{s}")
240 }
241}
242
243pub trait PropertySize {
248 fn size(&self) -> usize;
252}
253
254impl PropertySize for u8 {
256 fn size(&self) -> usize {
257 1
258 }
259}
260
261impl PropertySize for u16 {
263 fn size(&self) -> usize {
264 2
265 }
266}
267
268impl PropertySize for u32 {
270 fn size(&self) -> usize {
271 4
272 }
273}
274impl PropertySize for String {
276 fn size(&self) -> usize {
277 2 + self.len()
278 }
279}
280
281impl PropertySize for Vec<u8> {
283 fn size(&self) -> usize {
284 2 + self.len()
285 }
286}
287
288impl PropertySize for VariableByteInteger {
291 fn size(&self) -> usize {
292 match self.to_u32() {
293 0..=0x7F => 1,
294 0x80..=0x3FFF => 2,
295 0x4000..=0x1F_FFFF => 3,
296 _ => 4,
297 }
298 }
299}
300
301macro_rules! mqtt_property_common {
302 ($name:ident, $id:expr, $ty:ty) => {
303 #[derive(Debug, PartialEq, Eq, Clone)]
304 pub struct $name {
305 id_bytes: [u8; 1],
306 value: $ty,
307 }
308
309 impl $name {
310 pub fn id(&self) -> PropertyId {
311 $id
312 }
313 }
314
315 impl From<$name> for Property {
316 fn from(v: $name) -> Self {
317 Property::$name(v)
318 }
319 }
320 };
321}
322
323macro_rules! mqtt_property_binary {
324 ($name:ident, $id:expr) => {
325 mqtt_property_common!($name, $id, MqttBinary);
326
327 impl serde::Serialize for $name {
328 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
329 where
330 S: serde::Serializer,
331 {
332 let escaped = escape_binary_json_string(self.val());
333
334 let mut state = serializer.serialize_struct(stringify!($name), 2)?;
335 state.serialize_field("id", &($id as u8))?;
336 state.serialize_field("val", &escaped)?;
337 state.end()
338 }
339 }
340
341 impl $name {
342 pub fn new<T>(v: T) -> Result<Self, MqttError>
343 where
344 T: AsRef<[u8]>,
345 {
346 let binary = MqttBinary::new(v)?;
347
348 Ok(Self {
349 id_bytes: [$id as u8],
350 value: binary,
351 })
352 }
353
354 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
355 let (mqtt_binary, consumed) = MqttBinary::decode(bytes)?;
356 Ok((
357 Self {
358 id_bytes: [$id as u8],
359 value: mqtt_binary,
360 },
361 consumed,
362 ))
363 }
364
365 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
366 let mut result = vec![IoSlice::new(&self.id_bytes)];
367 let mut binary_bufs = self.value.to_buffers();
368 result.append(&mut binary_bufs);
369 result
370 }
371
372 pub fn val(&self) -> &[u8] {
373 self.value.as_slice()
374 }
375
376 pub fn size(&self) -> usize {
377 1 + self.value.size() }
379 }
380
381 impl fmt::Display for $name {
382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 match escape_binary_json_string(self.val()) {
384 Some(escaped) => write!(
385 f,
386 "{{\"id\": \"{}\", \"value\": \"{}\"}}",
387 self.id(),
388 escaped
389 ),
390 None => write!(
391 f,
392 "{{\"id\": \"{}\", \"value\": \"{:?}\"}}",
393 self.id(),
394 self.val()
395 ),
396 }
397 }
398 }
399 };
400}
401
402macro_rules! mqtt_property_string {
403 ($name:ident, $id:expr) => {
404 mqtt_property_common!($name, $id, MqttString);
405
406 impl serde::Serialize for $name {
407 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
408 where
409 S: serde::Serializer,
410 {
411 let mut s = serializer.serialize_struct(stringify!($name), 2)?;
412 s.serialize_field("id", &($id as u8))?;
413 s.serialize_field("val", self.val())?;
414 s.end()
415 }
416 }
417
418 impl $name {
419 pub fn new<T>(s: T) -> Result<Self, MqttError>
420 where
421 T: AsRef<str>,
422 {
423 let value = MqttString::new(s)?;
424
425 Ok(Self {
426 id_bytes: [$id as u8],
427 value,
428 })
429 }
430
431 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
432 let (mqtt_string, consumed) = MqttString::decode(bytes)?;
433 Ok((
434 Self {
435 id_bytes: [$id as u8],
436 value: mqtt_string,
437 },
438 consumed,
439 ))
440 }
441
442 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
443 let mut result = vec![IoSlice::new(&self.id_bytes)];
444 let mut string_bufs = self.value.to_buffers();
445 result.append(&mut string_bufs);
446 result
447 }
448
449 pub fn val(&self) -> &str {
450 self.value.as_str()
451 }
452
453 pub fn size(&self) -> usize {
454 1 + self.value.size() }
456 }
457
458 impl fmt::Display for $name {
459 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460 write!(
461 f,
462 "{{\"id\": \"{}\", \"value\": \"{}\"}}",
463 self.id(),
464 self.val()
465 )
466 }
467 }
468 };
469}
470
471macro_rules! mqtt_property_string_pair {
472 ($name:ident, $id:expr) => {
473 mqtt_property_common!($name, $id, (MqttString, MqttString));
474
475 impl serde::Serialize for $name {
476 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
477 where
478 S: serde::Serializer,
479 {
480 let mut s = serializer.serialize_struct(stringify!($name), 3)?;
481 s.serialize_field("id", &($id as u8))?;
482 s.serialize_field("key", self.key())?;
483 s.serialize_field("val", self.val())?;
484 s.end()
485 }
486 }
487
488 impl $name {
489 pub fn new<K, V>(key: K, val: V) -> Result<Self, MqttError>
490 where
491 K: AsRef<str>,
492 V: AsRef<str>,
493 {
494 let key_mqtt = MqttString::new(key)?;
495 let val_mqtt = MqttString::new(val)?;
496
497 Ok(Self {
498 id_bytes: [$id as u8],
499 value: (key_mqtt, val_mqtt),
500 })
501 }
502
503 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
504 let (key, key_consumed) = MqttString::decode(bytes)?;
505 let (val, val_consumed) = MqttString::decode(&bytes[key_consumed..])?;
506
507 Ok((
508 Self {
509 id_bytes: [$id as u8],
510 value: (key, val),
511 },
512 key_consumed + val_consumed,
513 ))
514 }
515
516 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
517 let mut result = vec![IoSlice::new(&self.id_bytes)];
518 let mut key_bufs = self.value.0.to_buffers();
519 let mut val_bufs = self.value.1.to_buffers();
520
521 result.append(&mut key_bufs);
522 result.append(&mut val_bufs);
523 result
524 }
525
526 pub fn key(&self) -> &str {
527 self.value.0.as_str()
528 }
529
530 pub fn val(&self) -> &str {
531 self.value.1.as_str()
532 }
533
534 pub fn size(&self) -> usize {
535 1 + self.value.0.size() + self.value.1.size() }
537 }
538
539 impl fmt::Display for $name {
540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541 write!(
542 f,
543 "{{\"id\": \"{}\", \"key\": \"{}\", \"val\": \"{}\"}}",
544 self.id(),
545 self.key(),
546 self.val()
547 )
548 }
549 }
550 };
551}
552
553macro_rules! mqtt_property_u8_custom_new {
554 ($name:ident, $id:expr, $validator:expr) => {
555 mqtt_property_common!($name, $id, [u8; 1]);
556
557 impl serde::Serialize for $name {
558 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
559 where
560 S: serde::Serializer,
561 {
562 let mut s = serializer.serialize_struct(stringify!($name), 2)?;
563 s.serialize_field("id", &($id as u8))?;
564 s.serialize_field("val", &self.val())?;
565 s.end()
566 }
567 }
568
569 impl $name {
570 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
571 if bytes.len() < 1 {
572 return Err(MqttError::MalformedPacket);
573 }
574 if let Some(validator) = $validator {
575 validator(bytes[0])?;
576 }
577 Ok((
578 Self {
579 id_bytes: [$id as u8],
580 value: [bytes[0]],
581 },
582 1,
583 ))
584 }
585
586 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
587 vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
588 }
589
590 pub fn val(&self) -> u8 {
591 self.value[0]
592 }
593
594 pub fn size(&self) -> usize {
595 1 + self.value.len()
596 }
597 }
598
599 impl fmt::Display for $name {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 write!(
602 f,
603 "{{\"id\": \"{}\", \"value\": {}}}",
604 self.id(),
605 self.val()
606 )
607 }
608 }
609 };
610}
611
612macro_rules! mqtt_property_u8 {
613 ($name:ident, $id:expr, $validator:expr) => {
614 mqtt_property_u8_custom_new!($name, $id, $validator);
615
616 impl $name {
617 pub fn new(v: u8) -> Result<Self, MqttError> {
618 if let Some(validator) = $validator {
619 validator(v)?;
620 }
621 Ok(Self {
622 id_bytes: [$id as u8],
623 value: [v],
624 })
625 }
626 }
627 };
628}
629
630macro_rules! mqtt_property_u16 {
631 ($name:ident, $id:expr, $validator:expr) => {
632 mqtt_property_common!($name, $id, [u8; 2]);
633
634 impl serde::Serialize for $name {
635 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
636 where
637 S: serde::Serializer,
638 {
639 let mut s = serializer.serialize_struct(stringify!($name), 2)?;
640 s.serialize_field("id", &($id as u8))?;
641 s.serialize_field("val", &self.val())?;
642 s.end()
643 }
644 }
645
646 impl $name {
647 pub fn new(v: u16) -> Result<Self, MqttError> {
648 if let Some(validator) = $validator {
649 validator(v)?;
650 }
651 Ok(Self {
652 id_bytes: [$id as u8],
653 value: v.to_be_bytes(),
654 })
655 }
656
657 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
658 if bytes.len() < 2 {
659 return Err(MqttError::MalformedPacket);
660 }
661 let v = u16::from_be_bytes([bytes[0], bytes[1]]);
662 if let Some(validator) = $validator {
663 validator(v)?;
664 }
665 Ok((
666 Self {
667 id_bytes: [$id as u8],
668 value: bytes[..2].try_into().unwrap(),
669 },
670 2,
671 ))
672 }
673
674 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
675 vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
676 }
677
678 pub fn val(&self) -> u16 {
679 u16::from_be_bytes([self.value[0], self.value[1]])
680 }
681
682 pub fn size(&self) -> usize {
683 1 + self.value.len()
684 }
685 }
686
687 impl fmt::Display for $name {
688 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689 write!(
690 f,
691 "{{\"id\": \"{}\", \"value\": {}}}",
692 self.id(),
693 self.val()
694 )
695 }
696 }
697 };
698}
699
700macro_rules! mqtt_property_u32 {
701 ($name:ident, $id:expr, $validator:expr) => {
702 mqtt_property_common!($name, $id, [u8; 4]);
703
704 impl serde::Serialize for $name {
705 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
706 where
707 S: serde::Serializer,
708 {
709 let mut s = serializer.serialize_struct(stringify!($name), 2)?;
710 s.serialize_field("id", &($id as u8))?;
711 s.serialize_field("value", &self.val())?;
712 s.end()
713 }
714 }
715
716 impl $name {
717 pub fn new(v: u32) -> Result<Self, MqttError> {
718 if let Some(validator) = $validator {
719 validator(v)?;
720 }
721 Ok(Self {
722 id_bytes: [$id as u8],
723 value: v.to_be_bytes(),
724 })
725 }
726
727 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
728 if bytes.len() < 4 {
729 return Err(MqttError::MalformedPacket);
730 }
731 let v = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
732 if let Some(validator) = $validator {
733 validator(v)?;
734 }
735 Ok((
736 Self {
737 id_bytes: [$id as u8],
738 value: bytes[..4].try_into().unwrap(),
739 },
740 4,
741 ))
742 }
743
744 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
745 vec![IoSlice::new(&self.id_bytes), IoSlice::new(&self.value)]
746 }
747
748 pub fn val(&self) -> u32 {
749 u32::from_be_bytes([self.value[0], self.value[1], self.value[2], self.value[3]])
750 }
751
752 pub fn size(&self) -> usize {
753 1 + self.value.len()
754 }
755 }
756
757 impl fmt::Display for $name {
758 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
759 write!(
760 f,
761 "{{\"id\": \"{}\", \"value\": {}}}",
762 self.id(),
763 self.val()
764 )
765 }
766 }
767 };
768}
769
770macro_rules! mqtt_property_variable_integer {
771 ($name:ident, $id:expr, $validator:expr) => {
772 mqtt_property_common!($name, $id, VariableByteInteger);
773
774 impl serde::Serialize for $name {
775 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
776 where
777 S: serde::Serializer,
778 {
779 let mut s = serializer.serialize_struct(stringify!($name), 2)?;
780 s.serialize_field("id", &($id as u8))?;
781 s.serialize_field("val", &self.val())?;
782 s.end()
783 }
784 }
785
786 impl $name {
787 pub fn new(v: u32) -> Result<Self, MqttError> {
788 let vbi = VariableByteInteger::from_u32(v).ok_or(MqttError::ValueOutOfRange)?;
789 if let Some(validator) = $validator {
790 validator(v)?;
791 }
792 Ok(Self {
793 id_bytes: [$id as u8],
794 value: vbi,
795 })
796 }
797
798 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
799 match VariableByteInteger::decode_stream(bytes) {
800 DecodeResult::Ok(vbi, len) => {
801 if let Some(validator) = $validator {
802 validator(vbi.to_u32())?;
803 }
804 Ok((
805 Self {
806 id_bytes: [$id as u8],
807 value: vbi,
808 },
809 len,
810 ))
811 }
812 DecodeResult::Incomplete => Err(MqttError::InsufficientBytes),
813 DecodeResult::Err(_) => Err(MqttError::InsufficientBytes),
814 }
815 }
816
817 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
818 vec![
819 IoSlice::new(&self.id_bytes),
820 IoSlice::new(&self.value.as_bytes()),
821 ]
822 }
823
824 pub fn val(&self) -> u32 {
825 self.value.to_u32()
826 }
827
828 pub fn size(&self) -> usize {
829 1 + self.value.size()
830 }
831 }
832
833 impl fmt::Display for $name {
834 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
835 write!(
836 f,
837 "{{\"id\": \"{}\", \"value\": {}}}",
838 self.id(),
839 self.val()
840 )
841 }
842 }
843 };
844}
845
846type U16Validator = fn(u16) -> Result<(), MqttError>;
847type U32Validator = fn(u32) -> Result<(), MqttError>;
848
849mqtt_property_u8_custom_new!(
850 PayloadFormatIndicator,
851 PropertyId::PayloadFormatIndicator,
852 Some(|v| {
853 if v > 1 {
854 Err(MqttError::ProtocolError)
855 } else {
856 Ok(())
857 }
858 })
859);
860impl PayloadFormatIndicator {
861 pub fn new(v: PayloadFormat) -> Result<Self, MqttError> {
862 Ok(Self {
863 id_bytes: [PropertyId::PayloadFormatIndicator.as_u8(); 1],
864 value: [v as u8],
865 })
866 }
867}
868
869mqtt_property_u32!(
870 MessageExpiryInterval,
871 PropertyId::MessageExpiryInterval,
872 None::<U32Validator>
873);
874mqtt_property_string!(ContentType, PropertyId::ContentType);
875mqtt_property_string!(ResponseTopic, PropertyId::ResponseTopic);
876mqtt_property_binary!(CorrelationData, PropertyId::CorrelationData);
877mqtt_property_variable_integer!(
878 SubscriptionIdentifier,
879 PropertyId::SubscriptionIdentifier,
880 Some(|v| {
881 if v == 0 {
882 Err(MqttError::ProtocolError)
883 } else {
884 Ok(())
885 }
886 })
887);
888mqtt_property_u32!(
889 SessionExpiryInterval,
890 PropertyId::SessionExpiryInterval,
891 None::<U32Validator>
892);
893mqtt_property_string!(
894 AssignedClientIdentifier,
895 PropertyId::AssignedClientIdentifier
896);
897mqtt_property_u16!(
898 ServerKeepAlive,
899 PropertyId::ServerKeepAlive,
900 None::<U16Validator>
901);
902mqtt_property_string!(AuthenticationMethod, PropertyId::AuthenticationMethod);
903mqtt_property_binary!(AuthenticationData, PropertyId::AuthenticationData);
904mqtt_property_u8!(
905 RequestProblemInformation,
906 PropertyId::RequestProblemInformation,
907 Some(|v| {
908 if v > 1 {
909 Err(MqttError::ProtocolError)
910 } else {
911 Ok(())
912 }
913 })
914);
915mqtt_property_u32!(
916 WillDelayInterval,
917 PropertyId::WillDelayInterval,
918 None::<U32Validator>
919);
920mqtt_property_u8!(
921 RequestResponseInformation,
922 PropertyId::RequestResponseInformation,
923 Some(|v| {
924 if v > 1 {
925 Err(MqttError::ProtocolError)
926 } else {
927 Ok(())
928 }
929 })
930);
931mqtt_property_string!(ResponseInformation, PropertyId::ResponseInformation);
932mqtt_property_string!(ServerReference, PropertyId::ServerReference);
933mqtt_property_string!(ReasonString, PropertyId::ReasonString);
934mqtt_property_u16!(
935 ReceiveMaximum,
936 PropertyId::ReceiveMaximum,
937 Some(|v| {
938 if v == 0 {
939 Err(MqttError::ProtocolError)
940 } else {
941 Ok(())
942 }
943 })
944);
945mqtt_property_u16!(
946 TopicAliasMaximum,
947 PropertyId::TopicAliasMaximum,
948 None::<U16Validator>
949);
950mqtt_property_u16!(
951 TopicAlias,
952 PropertyId::TopicAlias,
953 Some(|v| {
954 if v == 0 {
955 Err(MqttError::ProtocolError)
956 } else {
957 Ok(())
958 }
959 })
960);
961mqtt_property_u8!(
962 MaximumQos,
963 PropertyId::MaximumQos,
964 Some(|v| {
965 if v > 1 {
966 Err(MqttError::ProtocolError)
967 } else {
968 Ok(())
969 }
970 })
971);
972mqtt_property_u8!(
973 RetainAvailable,
974 PropertyId::RetainAvailable,
975 Some(|v| {
976 if v > 1 {
977 Err(MqttError::ProtocolError)
978 } else {
979 Ok(())
980 }
981 })
982);
983mqtt_property_string_pair!(UserProperty, PropertyId::UserProperty);
984mqtt_property_u32!(
985 MaximumPacketSize,
986 PropertyId::MaximumPacketSize,
987 Some(|v| {
988 if v == 0 {
989 Err(MqttError::ProtocolError)
990 } else {
991 Ok(())
992 }
993 })
994);
995mqtt_property_u8!(
996 WildcardSubscriptionAvailable,
997 PropertyId::WildcardSubscriptionAvailable,
998 Some(|v| {
999 if v > 1 {
1000 Err(MqttError::ProtocolError)
1001 } else {
1002 Ok(())
1003 }
1004 })
1005);
1006mqtt_property_u8!(
1007 SubscriptionIdentifierAvailable,
1008 PropertyId::SubscriptionIdentifierAvailable,
1009 Some(|v| {
1010 if v > 1 {
1011 Err(MqttError::ProtocolError)
1012 } else {
1013 Ok(())
1014 }
1015 })
1016);
1017mqtt_property_u8!(
1018 SharedSubscriptionAvailable,
1019 PropertyId::SharedSubscriptionAvailable,
1020 Some(|v| {
1021 if v > 1 {
1022 Err(MqttError::ProtocolError)
1023 } else {
1024 Ok(())
1025 }
1026 })
1027);
1028
1029#[derive(Debug, Serialize, PartialEq, Eq, Clone)]
1057pub enum Property {
1058 PayloadFormatIndicator(PayloadFormatIndicator),
1059 MessageExpiryInterval(MessageExpiryInterval),
1060 ContentType(ContentType),
1061 ResponseTopic(ResponseTopic),
1062 CorrelationData(CorrelationData),
1063 SubscriptionIdentifier(SubscriptionIdentifier),
1064 SessionExpiryInterval(SessionExpiryInterval),
1065 AssignedClientIdentifier(AssignedClientIdentifier),
1066 ServerKeepAlive(ServerKeepAlive),
1067 AuthenticationMethod(AuthenticationMethod),
1068 AuthenticationData(AuthenticationData),
1069 RequestProblemInformation(RequestProblemInformation),
1070 WillDelayInterval(WillDelayInterval),
1071 RequestResponseInformation(RequestResponseInformation),
1072 ResponseInformation(ResponseInformation),
1073 ServerReference(ServerReference),
1074 ReasonString(ReasonString),
1075 ReceiveMaximum(ReceiveMaximum),
1076 TopicAliasMaximum(TopicAliasMaximum),
1077 TopicAlias(TopicAlias),
1078 MaximumQos(MaximumQos),
1079 RetainAvailable(RetainAvailable),
1080 UserProperty(UserProperty),
1081 MaximumPacketSize(MaximumPacketSize),
1082 WildcardSubscriptionAvailable(WildcardSubscriptionAvailable),
1083 SubscriptionIdentifierAvailable(SubscriptionIdentifierAvailable),
1084 SharedSubscriptionAvailable(SharedSubscriptionAvailable),
1085}
1086
1087impl fmt::Display for Property {
1088 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089 match self {
1090 Property::PayloadFormatIndicator(p) => write!(f, "{p}"),
1091 Property::MessageExpiryInterval(p) => write!(f, "{p}"),
1092 Property::ContentType(p) => write!(f, "{p}"),
1093 Property::ResponseTopic(p) => write!(f, "{p}"),
1094 Property::CorrelationData(p) => write!(f, "{p}"),
1095 Property::SubscriptionIdentifier(p) => write!(f, "{p}"),
1096 Property::SessionExpiryInterval(p) => write!(f, "{p}"),
1097 Property::AssignedClientIdentifier(p) => write!(f, "{p}"),
1098 Property::ServerKeepAlive(p) => write!(f, "{p}"),
1099 Property::AuthenticationMethod(p) => write!(f, "{p}"),
1100 Property::AuthenticationData(p) => write!(f, "{p}"),
1101 Property::RequestProblemInformation(p) => write!(f, "{p}"),
1102 Property::WillDelayInterval(p) => write!(f, "{p}"),
1103 Property::RequestResponseInformation(p) => write!(f, "{p}"),
1104 Property::ResponseInformation(p) => write!(f, "{p}"),
1105 Property::ServerReference(p) => write!(f, "{p}"),
1106 Property::ReasonString(p) => write!(f, "{p}"),
1107 Property::ReceiveMaximum(p) => write!(f, "{p}"),
1108 Property::TopicAliasMaximum(p) => write!(f, "{p}"),
1109 Property::TopicAlias(p) => write!(f, "{p}"),
1110 Property::MaximumQos(p) => write!(f, "{p}"),
1111 Property::RetainAvailable(p) => write!(f, "{p}"),
1112 Property::UserProperty(p) => write!(f, "{p}"),
1113 Property::MaximumPacketSize(p) => write!(f, "{p}"),
1114 Property::WildcardSubscriptionAvailable(p) => write!(f, "{p}"),
1115 Property::SubscriptionIdentifierAvailable(p) => write!(f, "{p}"),
1116 Property::SharedSubscriptionAvailable(p) => write!(f, "{p}"),
1117 }
1118 }
1119}
1120
1121pub trait PropertyValueAccess {
1142 fn as_u8(&self) -> Option<u8>;
1147
1148 fn as_u16(&self) -> Option<u16>;
1153
1154 fn as_u32(&self) -> Option<u32>;
1159
1160 fn as_str(&self) -> Option<&str>;
1165
1166 fn as_bytes(&self) -> Option<&[u8]>;
1171
1172 fn as_key_value(&self) -> Option<(&str, &str)>;
1176}
1177
1178impl PropertyValueAccess for Property {
1179 fn as_u8(&self) -> Option<u8> {
1180 match self {
1181 Property::PayloadFormatIndicator(p) => Some(p.val()),
1183 Property::MaximumQos(p) => Some(p.val()),
1184 Property::RetainAvailable(p) => Some(p.val()),
1185 Property::RequestProblemInformation(p) => Some(p.val()),
1186 Property::RequestResponseInformation(p) => Some(p.val()),
1187 Property::WildcardSubscriptionAvailable(p) => Some(p.val()),
1188 Property::SubscriptionIdentifierAvailable(p) => Some(p.val()),
1189 Property::SharedSubscriptionAvailable(p) => Some(p.val()),
1190 _ => None,
1191 }
1192 }
1193
1194 fn as_u16(&self) -> Option<u16> {
1195 match self {
1196 Property::TopicAlias(p) => Some(p.val()),
1198 Property::ReceiveMaximum(p) => Some(p.val()),
1199 Property::TopicAliasMaximum(p) => Some(p.val()),
1200 Property::ServerKeepAlive(p) => Some(p.val()),
1201 _ => None,
1202 }
1203 }
1204
1205 fn as_u32(&self) -> Option<u32> {
1206 match self {
1207 Property::MessageExpiryInterval(p) => Some(p.val()),
1209 Property::SessionExpiryInterval(p) => Some(p.val()),
1210 Property::WillDelayInterval(p) => Some(p.val()),
1211 Property::MaximumPacketSize(p) => Some(p.val()),
1212 Property::SubscriptionIdentifier(p) => Some(p.val()),
1213 _ => None,
1214 }
1215 }
1216
1217 fn as_str(&self) -> Option<&str> {
1218 match self {
1219 Property::ContentType(p) => Some(p.val()),
1221 Property::ResponseTopic(p) => Some(p.val()),
1222 Property::AssignedClientIdentifier(p) => Some(p.val()),
1223 Property::AuthenticationMethod(p) => Some(p.val()),
1224 Property::ResponseInformation(p) => Some(p.val()),
1225 Property::ServerReference(p) => Some(p.val()),
1226 Property::ReasonString(p) => Some(p.val()),
1227 _ => None,
1228 }
1229 }
1230
1231 fn as_bytes(&self) -> Option<&[u8]> {
1232 match self {
1233 Property::CorrelationData(p) => Some(p.val()),
1235 Property::AuthenticationData(p) => Some(p.val()),
1236 _ => None,
1237 }
1238 }
1239
1240 fn as_key_value(&self) -> Option<(&str, &str)> {
1241 match self {
1242 Property::UserProperty(p) => Some((p.key(), p.val())),
1244 _ => None,
1245 }
1246 }
1247}
1248
1249impl Property {
1250 pub fn id(&self) -> PropertyId {
1267 match self {
1268 Property::PayloadFormatIndicator(p) => p.id(),
1269 Property::MessageExpiryInterval(p) => p.id(),
1270 Property::ContentType(p) => p.id(),
1271 Property::ResponseTopic(p) => p.id(),
1272 Property::CorrelationData(p) => p.id(),
1273 Property::SubscriptionIdentifier(p) => p.id(),
1274 Property::SessionExpiryInterval(p) => p.id(),
1275 Property::AssignedClientIdentifier(p) => p.id(),
1276 Property::ServerKeepAlive(p) => p.id(),
1277 Property::AuthenticationMethod(p) => p.id(),
1278 Property::AuthenticationData(p) => p.id(),
1279 Property::RequestProblemInformation(p) => p.id(),
1280 Property::WillDelayInterval(p) => p.id(),
1281 Property::RequestResponseInformation(p) => p.id(),
1282 Property::ResponseInformation(p) => p.id(),
1283 Property::ServerReference(p) => p.id(),
1284 Property::ReasonString(p) => p.id(),
1285 Property::ReceiveMaximum(p) => p.id(),
1286 Property::TopicAliasMaximum(p) => p.id(),
1287 Property::TopicAlias(p) => p.id(),
1288 Property::MaximumQos(p) => p.id(),
1289 Property::RetainAvailable(p) => p.id(),
1290 Property::UserProperty(p) => p.id(),
1291 Property::MaximumPacketSize(p) => p.id(),
1292 Property::WildcardSubscriptionAvailable(p) => p.id(),
1293 Property::SubscriptionIdentifierAvailable(p) => p.id(),
1294 Property::SharedSubscriptionAvailable(p) => p.id(),
1295 }
1296 }
1297
1298 pub fn size(&self) -> usize {
1315 match self {
1316 Property::PayloadFormatIndicator(p) => p.size(),
1317 Property::MessageExpiryInterval(p) => p.size(),
1318 Property::ContentType(p) => p.size(),
1319 Property::ResponseTopic(p) => p.size(),
1320 Property::CorrelationData(p) => p.size(),
1321 Property::SubscriptionIdentifier(p) => p.size(),
1322 Property::SessionExpiryInterval(p) => p.size(),
1323 Property::AssignedClientIdentifier(p) => p.size(),
1324 Property::ServerKeepAlive(p) => p.size(),
1325 Property::AuthenticationMethod(p) => p.size(),
1326 Property::AuthenticationData(p) => p.size(),
1327 Property::RequestProblemInformation(p) => p.size(),
1328 Property::WillDelayInterval(p) => p.size(),
1329 Property::RequestResponseInformation(p) => p.size(),
1330 Property::ResponseInformation(p) => p.size(),
1331 Property::ServerReference(p) => p.size(),
1332 Property::ReasonString(p) => p.size(),
1333 Property::ReceiveMaximum(p) => p.size(),
1334 Property::TopicAliasMaximum(p) => p.size(),
1335 Property::TopicAlias(p) => p.size(),
1336 Property::MaximumQos(p) => p.size(),
1337 Property::RetainAvailable(p) => p.size(),
1338 Property::UserProperty(p) => p.size(),
1339 Property::MaximumPacketSize(p) => p.size(),
1340 Property::WildcardSubscriptionAvailable(p) => p.size(),
1341 Property::SubscriptionIdentifierAvailable(p) => p.size(),
1342 Property::SharedSubscriptionAvailable(p) => p.size(),
1343 }
1344 }
1345
1346 pub fn to_buffers(&self) -> Vec<IoSlice<'_>> {
1365 match self {
1366 Property::PayloadFormatIndicator(p) => p.to_buffers(),
1367 Property::MessageExpiryInterval(p) => p.to_buffers(),
1368 Property::ContentType(p) => p.to_buffers(),
1369 Property::ResponseTopic(p) => p.to_buffers(),
1370 Property::CorrelationData(p) => p.to_buffers(),
1371 Property::SubscriptionIdentifier(p) => p.to_buffers(),
1372 Property::SessionExpiryInterval(p) => p.to_buffers(),
1373 Property::AssignedClientIdentifier(p) => p.to_buffers(),
1374 Property::ServerKeepAlive(p) => p.to_buffers(),
1375 Property::AuthenticationMethod(p) => p.to_buffers(),
1376 Property::AuthenticationData(p) => p.to_buffers(),
1377 Property::RequestProblemInformation(p) => p.to_buffers(),
1378 Property::WillDelayInterval(p) => p.to_buffers(),
1379 Property::RequestResponseInformation(p) => p.to_buffers(),
1380 Property::ResponseInformation(p) => p.to_buffers(),
1381 Property::ServerReference(p) => p.to_buffers(),
1382 Property::ReasonString(p) => p.to_buffers(),
1383 Property::ReceiveMaximum(p) => p.to_buffers(),
1384 Property::TopicAliasMaximum(p) => p.to_buffers(),
1385 Property::TopicAlias(p) => p.to_buffers(),
1386 Property::MaximumQos(p) => p.to_buffers(),
1387 Property::RetainAvailable(p) => p.to_buffers(),
1388 Property::UserProperty(p) => p.to_buffers(),
1389 Property::MaximumPacketSize(p) => p.to_buffers(),
1390 Property::WildcardSubscriptionAvailable(p) => p.to_buffers(),
1391 Property::SubscriptionIdentifierAvailable(p) => p.to_buffers(),
1392 Property::SharedSubscriptionAvailable(p) => p.to_buffers(),
1393 }
1394 }
1395
1396 pub fn parse(bytes: &[u8]) -> Result<(Self, usize), MqttError> {
1429 if bytes.is_empty() {
1430 return Err(MqttError::MalformedPacket);
1431 }
1432
1433 let id = PropertyId::try_from(bytes[0]).map_err(|_| MqttError::MalformedPacket)?;
1434
1435 let (prop, len) = match id {
1436 PropertyId::PayloadFormatIndicator => {
1437 let (p, l) = PayloadFormatIndicator::parse(&bytes[1..])?;
1438 (Self::PayloadFormatIndicator(p), l + 1)
1439 }
1440 PropertyId::MessageExpiryInterval => {
1441 let (p, l) = MessageExpiryInterval::parse(&bytes[1..])?;
1442 (Self::MessageExpiryInterval(p), l + 1)
1443 }
1444 PropertyId::ContentType => {
1445 let (p, l) = ContentType::parse(&bytes[1..])?;
1446 (Self::ContentType(p), l + 1)
1447 }
1448 PropertyId::ResponseTopic => {
1449 let (p, l) = ResponseTopic::parse(&bytes[1..])?;
1450 (Self::ResponseTopic(p), l + 1)
1451 }
1452 PropertyId::CorrelationData => {
1453 let (p, l) = CorrelationData::parse(&bytes[1..])?;
1454 (Self::CorrelationData(p), l + 1)
1455 }
1456 PropertyId::SubscriptionIdentifier => {
1457 let (p, l) = SubscriptionIdentifier::parse(&bytes[1..])?;
1458 (Self::SubscriptionIdentifier(p), l + 1)
1459 }
1460 PropertyId::SessionExpiryInterval => {
1461 let (p, l) = SessionExpiryInterval::parse(&bytes[1..])?;
1462 (Self::SessionExpiryInterval(p), l + 1)
1463 }
1464 PropertyId::AssignedClientIdentifier => {
1465 let (p, l) = AssignedClientIdentifier::parse(&bytes[1..])?;
1466 (Self::AssignedClientIdentifier(p), l + 1)
1467 }
1468 PropertyId::ServerKeepAlive => {
1469 let (p, l) = ServerKeepAlive::parse(&bytes[1..])?;
1470 (Self::ServerKeepAlive(p), l + 1)
1471 }
1472 PropertyId::AuthenticationMethod => {
1473 let (p, l) = AuthenticationMethod::parse(&bytes[1..])?;
1474 (Self::AuthenticationMethod(p), l + 1)
1475 }
1476 PropertyId::AuthenticationData => {
1477 let (p, l) = AuthenticationData::parse(&bytes[1..])?;
1478 (Self::AuthenticationData(p), l + 1)
1479 }
1480 PropertyId::RequestProblemInformation => {
1481 let (p, l) = RequestProblemInformation::parse(&bytes[1..])?;
1482 (Self::RequestProblemInformation(p), l + 1)
1483 }
1484 PropertyId::WillDelayInterval => {
1485 let (p, l) = WillDelayInterval::parse(&bytes[1..])?;
1486 (Self::WillDelayInterval(p), l + 1)
1487 }
1488 PropertyId::RequestResponseInformation => {
1489 let (p, l) = RequestResponseInformation::parse(&bytes[1..])?;
1490 (Self::RequestResponseInformation(p), l + 1)
1491 }
1492 PropertyId::ResponseInformation => {
1493 let (p, l) = ResponseInformation::parse(&bytes[1..])?;
1494 (Self::ResponseInformation(p), l + 1)
1495 }
1496 PropertyId::ServerReference => {
1497 let (p, l) = ServerReference::parse(&bytes[1..])?;
1498 (Self::ServerReference(p), l + 1)
1499 }
1500 PropertyId::ReasonString => {
1501 let (p, l) = ReasonString::parse(&bytes[1..])?;
1502 (Self::ReasonString(p), l + 1)
1503 }
1504 PropertyId::ReceiveMaximum => {
1505 let (p, l) = ReceiveMaximum::parse(&bytes[1..])?;
1506 (Self::ReceiveMaximum(p), l + 1)
1507 }
1508 PropertyId::TopicAliasMaximum => {
1509 let (p, l) = TopicAliasMaximum::parse(&bytes[1..])?;
1510 (Self::TopicAliasMaximum(p), l + 1)
1511 }
1512 PropertyId::TopicAlias => {
1513 let (p, l) = TopicAlias::parse(&bytes[1..])?;
1514 (Self::TopicAlias(p), l + 1)
1515 }
1516 PropertyId::MaximumQos => {
1517 let (p, l) = MaximumQos::parse(&bytes[1..])?;
1518 (Self::MaximumQos(p), l + 1)
1519 }
1520 PropertyId::RetainAvailable => {
1521 let (p, l) = RetainAvailable::parse(&bytes[1..])?;
1522 (Self::RetainAvailable(p), l + 1)
1523 }
1524 PropertyId::UserProperty => {
1525 let (p, l) = UserProperty::parse(&bytes[1..])?;
1526 (Self::UserProperty(p), l + 1)
1527 }
1528 PropertyId::MaximumPacketSize => {
1529 let (p, l) = MaximumPacketSize::parse(&bytes[1..])?;
1530 (Self::MaximumPacketSize(p), l + 1)
1531 }
1532 PropertyId::WildcardSubscriptionAvailable => {
1533 let (p, l) = WildcardSubscriptionAvailable::parse(&bytes[1..])?;
1534 (Self::WildcardSubscriptionAvailable(p), l + 1)
1535 }
1536 PropertyId::SubscriptionIdentifierAvailable => {
1537 let (p, l) = SubscriptionIdentifierAvailable::parse(&bytes[1..])?;
1538 (Self::SubscriptionIdentifierAvailable(p), l + 1)
1539 }
1540 PropertyId::SharedSubscriptionAvailable => {
1541 let (p, l) = SharedSubscriptionAvailable::parse(&bytes[1..])?;
1542 (Self::SharedSubscriptionAvailable(p), l + 1)
1543 }
1544 };
1545
1546 Ok((prop, len))
1547 }
1548}
1549
1550pub type Properties = Vec<Property>;
1572
1573pub trait PropertiesToBuffers {
1578 fn to_buffers(&self) -> Vec<IoSlice<'_>>;
1583}
1584
1585impl PropertiesToBuffers for Properties {
1589 fn to_buffers(&self) -> Vec<IoSlice<'_>> {
1590 let mut result = Vec::new();
1591
1592 for prop in self {
1593 result.append(&mut prop.to_buffers());
1594 }
1595
1596 result
1597 }
1598}
1599
1600pub trait PropertiesSize {
1605 fn size(&self) -> usize;
1609}
1610
1611impl PropertiesSize for Properties {
1615 fn size(&self) -> usize {
1616 self.iter().map(|prop| prop.size()).sum()
1617 }
1618}
1619
1620pub trait PropertiesParse {
1625 fn parse(data: &[u8]) -> Result<(Self, usize), MqttError>
1639 where
1640 Self: Sized;
1641}
1642
1643impl PropertiesParse for Properties {
1649 fn parse(data: &[u8]) -> Result<(Self, usize), MqttError> {
1650 if data.is_empty() {
1651 return Err(MqttError::MalformedPacket);
1652 }
1653
1654 let (prop_len, consumed) = match VariableByteInteger::decode_stream(data) {
1655 DecodeResult::Ok(vbi, cons) => (vbi, cons),
1656 _ => return Err(MqttError::MalformedPacket),
1657 };
1658
1659 let mut cursor = consumed;
1660 let mut props = Properties::new();
1661
1662 if prop_len.to_u32() == 0 {
1663 return Ok((props, cursor));
1664 }
1665
1666 let props_end = cursor + prop_len.to_u32() as usize;
1667 if props_end > data.len() {
1668 return Err(MqttError::MalformedPacket);
1669 }
1670
1671 while cursor < props_end {
1672 let (p, c) = Property::parse(&data[cursor..props_end])?;
1673 props.push(p);
1674 cursor += c;
1675 }
1676
1677 Ok((props, cursor))
1678 }
1679}