rmqtt_codec/v5/packet/
publish.rs1use std::{num::NonZeroU16, num::NonZeroU32};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use rmqtt_utils::timestamp_millis;
8
9use crate::error::{DecodeError, EncodeError};
10use crate::types::QoS;
11use crate::utils::{self, write_variable_length, Decode, Encode, Property};
12use crate::v5::{encode::*, property_type as pt, UserProperties};
13
14pub(crate) type Publish = crate::types::Publish;
15
16#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
17pub struct PublishProperties {
18 pub topic_alias: Option<NonZeroU16>,
19 pub correlation_data: Option<Bytes>,
20 pub message_expiry_interval: Option<NonZeroU32>,
21 pub content_type: Option<ByteString>,
22 pub user_properties: UserProperties,
23 pub is_utf8_payload: bool,
24 pub response_topic: Option<ByteString>,
25 pub subscription_ids: Vec<NonZeroU32>,
26}
27
28impl Publish {
29 pub(crate) fn decode(mut src: Bytes, packet_flags: u8) -> Result<Self, DecodeError> {
30 let topic = ByteString::decode(&mut src)?;
31 let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
32 let packet_id = if qos == QoS::AtMostOnce {
33 None
34 } else {
35 Some(NonZeroU16::decode(&mut src)?) };
37
38 let properties = parse_publish_properties(&mut src)?;
39 let payload = src;
40
41 Ok(Self {
42 dup: (packet_flags & 0b1000) == 0b1000,
43 qos,
44 retain: (packet_flags & 0b0001) == 0b0001,
45 topic,
46 packet_id,
47 payload,
48 properties: Some(properties),
49 delay_interval: None,
50 create_time: Some(timestamp_millis()),
51 })
52 }
53}
54
55impl std::convert::From<UserProperties> for PublishProperties {
56 fn from(props: UserProperties) -> Self {
57 PublishProperties { user_properties: props, ..Default::default() }
58 }
59}
60
61fn parse_publish_properties(src: &mut Bytes) -> Result<PublishProperties, DecodeError> {
62 let prop_src = &mut utils::take_properties(src)?;
63
64 let mut message_expiry_interval = None;
65 let mut topic_alias = None;
66 let mut content_type = None;
67 let mut correlation_data = None;
68 let mut subscription_ids = Vec::new();
69 let mut response_topic = None;
70 let mut is_utf8_payload = None;
71 let mut user_props = Vec::new();
72
73 while prop_src.has_remaining() {
74 match prop_src.get_u8() {
75 pt::UTF8_PAYLOAD => is_utf8_payload.read_value(prop_src)?,
76 pt::MSG_EXPIRY_INT => message_expiry_interval.read_value(prop_src)?,
77 pt::CONTENT_TYPE => content_type.read_value(prop_src)?,
78 pt::RESP_TOPIC => response_topic.read_value(prop_src)?,
79 pt::CORR_DATA => correlation_data.read_value(prop_src)?,
80 pt::SUB_ID => {
81 let id = utils::decode_variable_length_cursor(prop_src)?;
82 subscription_ids.push(NonZeroU32::new(id).ok_or(DecodeError::MalformedPacket)?);
83 }
84 pt::TOPIC_ALIAS => topic_alias.read_value(prop_src)?,
85 pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
86 _ => return Err(DecodeError::MalformedPacket),
87 }
88 }
89
90 Ok(PublishProperties {
91 message_expiry_interval,
92 topic_alias,
93 content_type,
94 correlation_data,
95 subscription_ids,
96 response_topic,
97 is_utf8_payload: is_utf8_payload.unwrap_or(false),
98 user_properties: user_props,
99 })
100}
101
102impl EncodeLtd for Publish {
103 fn encoded_size(&self, _limit: u32) -> usize {
104 let packet_id_size = if self.qos == QoS::AtMostOnce { 0 } else { 2 };
105 self.topic.encoded_size()
106 + packet_id_size
107 + self
108 .properties
109 .as_ref()
110 .map(|p| p.encoded_size(_limit))
111 .unwrap_or_else(|| PublishProperties::default().encoded_size(_limit))
112 + self.payload.len()
113 }
114
115 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
116 let start_len = buf.len();
117 self.topic.encode(buf)?;
118 if self.qos == QoS::AtMostOnce {
119 if self.packet_id.is_some() {
120 return Err(EncodeError::MalformedPacket); }
122 } else {
123 self.packet_id.ok_or(EncodeError::PacketIdRequired)?.encode(buf)?;
124 }
125 if let Some(prop) = &self.properties {
126 prop.encode(buf, size - (buf.len() - start_len + self.payload.len()) as u32)?;
127 } else {
128 PublishProperties::default()
129 .encode(buf, size - (buf.len() - start_len + self.payload.len()) as u32)?;
130 }
131 buf.put(self.payload.as_ref());
132 Ok(())
133 }
134}
135
136impl EncodeLtd for PublishProperties {
137 fn encoded_size(&self, _limit: u32) -> usize {
138 let prop_len = encoded_property_size(&self.topic_alias)
139 + encoded_property_size(&self.correlation_data)
140 + encoded_property_size(&self.message_expiry_interval)
141 + encoded_property_size(&self.content_type)
142 + encoded_property_size_default(&self.is_utf8_payload, false)
143 + encoded_property_size(&self.response_topic)
144 + self
145 .subscription_ids
146 .iter()
147 .fold(0, |acc, id| acc + 1 + var_int_len(id.get() as usize) as usize)
148 + self.user_properties.encoded_size();
149 prop_len + var_int_len(prop_len) as usize
150 }
151
152 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
153 let prop_len = var_int_len_from_size(size);
154 utils::write_variable_length(prop_len, buf);
155 encode_property(&self.topic_alias, pt::TOPIC_ALIAS, buf)?;
156 encode_property(&self.correlation_data, pt::CORR_DATA, buf)?;
157 encode_property(&self.message_expiry_interval, pt::MSG_EXPIRY_INT, buf)?;
158 encode_property(&self.content_type, pt::CONTENT_TYPE, buf)?;
159 encode_property_default(&self.is_utf8_payload, false, pt::UTF8_PAYLOAD, buf)?;
160 encode_property(&self.response_topic, pt::RESP_TOPIC, buf)?;
161 for sub_id in self.subscription_ids.iter() {
162 buf.put_u8(pt::SUB_ID);
163 write_variable_length(sub_id.get(), buf);
164 }
165 self.user_properties.encode(buf)
166 }
167}