mqrstt/packets/publish/
properties.rs1use bytes::BufMut;
2
3use crate::packets::VariableInteger;
4
5use crate::packets::mqtt_trait::{MqttRead, MqttWrite, WireLength};
6use crate::packets::{
7 error::{DeserializeError, SerializeError},
8 PacketType, PropertyType,
9};
10
11crate::packets::macros::define_properties!(
12 PublishProperties,
14 PayloadFormatIndicator,
15 MessageExpiryInterval,
16 ContentType,
17 ResponseTopic,
18 CorrelationData,
19 ListSubscriptionIdentifier,
20 TopicAlias,
21 UserProperty
22);
23
24impl MqttRead for PublishProperties {
25 fn read(buf: &mut bytes::Bytes) -> Result<Self, crate::packets::error::DeserializeError> {
26 let (len, _) = VariableInteger::read_variable_integer(buf).map_err(DeserializeError::from)?;
27
28 if len == 0 {
29 return Ok(Self::default());
30 } else if buf.len() < len {
31 return Err(DeserializeError::InsufficientData(std::any::type_name::<Self>(), buf.len(), len));
32 }
33
34 let mut property_data = buf.split_to(len);
35
36 let mut properties = Self::default();
37
38 loop {
39 match PropertyType::try_from(u8::read(&mut property_data)?)? {
40 PropertyType::PayloadFormatIndicator => {
41 if properties.payload_format_indicator.is_some() {
42 return Err(DeserializeError::DuplicateProperty(PropertyType::PayloadFormatIndicator));
43 }
44 properties.payload_format_indicator = Some(u8::read(&mut property_data)?);
45 }
46 PropertyType::MessageExpiryInterval => {
47 if properties.message_expiry_interval.is_some() {
48 return Err(DeserializeError::DuplicateProperty(PropertyType::MessageExpiryInterval));
49 }
50 properties.message_expiry_interval = Some(u32::read(&mut property_data)?);
51 }
52 PropertyType::TopicAlias => {
53 if properties.topic_alias.is_some() {
54 return Err(DeserializeError::DuplicateProperty(PropertyType::TopicAlias));
55 }
56 properties.topic_alias = Some(u16::read(&mut property_data)?);
57 }
58 PropertyType::ResponseTopic => {
59 if properties.response_topic.is_some() {
60 return Err(DeserializeError::DuplicateProperty(PropertyType::ResponseTopic));
61 }
62 properties.response_topic = Some(Box::<str>::read(&mut property_data)?);
63 }
64 PropertyType::CorrelationData => {
65 if properties.correlation_data.is_some() {
66 return Err(DeserializeError::DuplicateProperty(PropertyType::CorrelationData));
67 }
68 properties.correlation_data = Some(Vec::<u8>::read(&mut property_data)?);
69 }
70 PropertyType::SubscriptionIdentifier => {
71 properties.subscription_identifiers.push(VariableInteger::read_variable_integer(&mut property_data)?.0);
72 }
73 PropertyType::UserProperty => properties.user_properties.push((Box::<str>::read(&mut property_data)?, Box::<str>::read(&mut property_data)?)),
74 PropertyType::ContentType => {
75 if properties.content_type.is_some() {
76 return Err(DeserializeError::DuplicateProperty(PropertyType::ContentType));
77 }
78 properties.content_type = Some(Box::<str>::read(&mut property_data)?);
79 }
80 t => return Err(DeserializeError::UnexpectedProperty(t, PacketType::Publish)),
81 }
82 if property_data.is_empty() {
83 break;
84 }
85 }
86
87 Ok(properties)
88 }
89}
90
91impl MqttWrite for PublishProperties {
92 fn write(&self, buf: &mut bytes::BytesMut) -> Result<(), SerializeError> {
93 self.wire_len().write_variable_integer(buf)?;
94
95 if let Some(payload_format_indicator) = self.payload_format_indicator {
96 buf.put_u8(PropertyType::PayloadFormatIndicator.into());
97 buf.put_u8(payload_format_indicator);
98 }
99 if let Some(message_expiry_interval) = self.message_expiry_interval {
100 buf.put_u8(PropertyType::MessageExpiryInterval.into());
101 buf.put_u32(message_expiry_interval);
102 }
103 if let Some(topic_alias) = self.topic_alias {
104 buf.put_u8(PropertyType::TopicAlias.into());
105 buf.put_u16(topic_alias);
106 }
107 if let Some(response_topic) = &self.response_topic {
108 buf.put_u8(PropertyType::ResponseTopic.into());
109 response_topic.as_ref().write(buf)?;
110 }
111 if let Some(correlation_data) = &self.correlation_data {
112 buf.put_u8(PropertyType::CorrelationData.into());
113 correlation_data.write(buf)?;
114 }
115 for sub_id in &self.subscription_identifiers {
116 buf.put_u8(PropertyType::SubscriptionIdentifier.into());
117 sub_id.write_variable_integer(buf)?;
118 }
119 for (key, val) in &self.user_properties {
120 buf.put_u8(PropertyType::UserProperty.into());
121 key.write(buf)?;
122 val.write(buf)?;
123 }
124 if let Some(content_type) = &self.content_type {
125 buf.put_u8(PropertyType::ContentType.into());
126 content_type.write(buf)?;
127 }
128
129 Ok(())
130 }
131}