mqrstt/packets/publish/
properties.rs

1use bytes::BufMut;
2
3use crate::packets::VariableInteger;
4
5use crate::packets::mqtt_trait::{MqttRead, MqttWrite, WireLength};
6use crate::packets::{
7    error::{DeserializeError, SerializeError},
8    PacketType, PropertyType,
9};
10
11crate::packets::macros::define_properties!(
12    /// Publish Properties
13    PublishProperties,
14    PayloadFormatIndicator,
15    MessageExpiryInterval,
16    ContentType,
17    ResponseTopic,
18    CorrelationData,
19    ListSubscriptionIdentifier,
20    TopicAlias,
21    UserProperty
22);
23
24impl MqttRead for PublishProperties {
25    fn read(buf: &mut bytes::Bytes) -> Result<Self, crate::packets::error::DeserializeError> {
26        let (len, _) = VariableInteger::read_variable_integer(buf).map_err(DeserializeError::from)?;
27
28        if len == 0 {
29            return Ok(Self::default());
30        } else if buf.len() < len {
31            return Err(DeserializeError::InsufficientData(std::any::type_name::<Self>(), buf.len(), len));
32        }
33
34        let mut property_data = buf.split_to(len);
35
36        let mut properties = Self::default();
37
38        loop {
39            match PropertyType::try_from(u8::read(&mut property_data)?)? {
40                PropertyType::PayloadFormatIndicator => {
41                    if properties.payload_format_indicator.is_some() {
42                        return Err(DeserializeError::DuplicateProperty(PropertyType::PayloadFormatIndicator));
43                    }
44                    properties.payload_format_indicator = Some(u8::read(&mut property_data)?);
45                }
46                PropertyType::MessageExpiryInterval => {
47                    if properties.message_expiry_interval.is_some() {
48                        return Err(DeserializeError::DuplicateProperty(PropertyType::MessageExpiryInterval));
49                    }
50                    properties.message_expiry_interval = Some(u32::read(&mut property_data)?);
51                }
52                PropertyType::TopicAlias => {
53                    if properties.topic_alias.is_some() {
54                        return Err(DeserializeError::DuplicateProperty(PropertyType::TopicAlias));
55                    }
56                    properties.topic_alias = Some(u16::read(&mut property_data)?);
57                }
58                PropertyType::ResponseTopic => {
59                    if properties.response_topic.is_some() {
60                        return Err(DeserializeError::DuplicateProperty(PropertyType::ResponseTopic));
61                    }
62                    properties.response_topic = Some(Box::<str>::read(&mut property_data)?);
63                }
64                PropertyType::CorrelationData => {
65                    if properties.correlation_data.is_some() {
66                        return Err(DeserializeError::DuplicateProperty(PropertyType::CorrelationData));
67                    }
68                    properties.correlation_data = Some(Vec::<u8>::read(&mut property_data)?);
69                }
70                PropertyType::SubscriptionIdentifier => {
71                    properties.subscription_identifiers.push(VariableInteger::read_variable_integer(&mut property_data)?.0);
72                }
73                PropertyType::UserProperty => properties.user_properties.push((Box::<str>::read(&mut property_data)?, Box::<str>::read(&mut property_data)?)),
74                PropertyType::ContentType => {
75                    if properties.content_type.is_some() {
76                        return Err(DeserializeError::DuplicateProperty(PropertyType::ContentType));
77                    }
78                    properties.content_type = Some(Box::<str>::read(&mut property_data)?);
79                }
80                t => return Err(DeserializeError::UnexpectedProperty(t, PacketType::Publish)),
81            }
82            if property_data.is_empty() {
83                break;
84            }
85        }
86
87        Ok(properties)
88    }
89}
90
91impl MqttWrite for PublishProperties {
92    fn write(&self, buf: &mut bytes::BytesMut) -> Result<(), SerializeError> {
93        self.wire_len().write_variable_integer(buf)?;
94
95        if let Some(payload_format_indicator) = self.payload_format_indicator {
96            buf.put_u8(PropertyType::PayloadFormatIndicator.into());
97            buf.put_u8(payload_format_indicator);
98        }
99        if let Some(message_expiry_interval) = self.message_expiry_interval {
100            buf.put_u8(PropertyType::MessageExpiryInterval.into());
101            buf.put_u32(message_expiry_interval);
102        }
103        if let Some(topic_alias) = self.topic_alias {
104            buf.put_u8(PropertyType::TopicAlias.into());
105            buf.put_u16(topic_alias);
106        }
107        if let Some(response_topic) = &self.response_topic {
108            buf.put_u8(PropertyType::ResponseTopic.into());
109            response_topic.as_ref().write(buf)?;
110        }
111        if let Some(correlation_data) = &self.correlation_data {
112            buf.put_u8(PropertyType::CorrelationData.into());
113            correlation_data.write(buf)?;
114        }
115        for sub_id in &self.subscription_identifiers {
116            buf.put_u8(PropertyType::SubscriptionIdentifier.into());
117            sub_id.write_variable_integer(buf)?;
118        }
119        for (key, val) in &self.user_properties {
120            buf.put_u8(PropertyType::UserProperty.into());
121            key.write(buf)?;
122            val.write(buf)?;
123        }
124        if let Some(content_type) = &self.content_type {
125            buf.put_u8(PropertyType::ContentType.into());
126            content_type.write(buf)?;
127        }
128
129        Ok(())
130    }
131}