ntex_mqtt/v5/codec/packet/
publish.rs1use std::{fmt, num::NonZeroU16, num::NonZeroU32};
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use crate::error::{DecodeError, EncodeError};
6use crate::types::{packet_type, QoS};
7use crate::utils::{self, write_variable_length, Decode, Encode, Property};
8use crate::v5::codec::{encode::*, property_type as pt, UserProperties};
9
10#[derive(Debug, PartialEq, Eq, Clone)]
12pub struct Publish {
13 pub dup: bool,
15 pub retain: bool,
16 pub qos: QoS,
18 pub packet_id: Option<NonZeroU16>,
20 pub topic: ByteString,
21 pub payload_size: u32,
22 pub properties: PublishProperties,
23}
24
25#[derive(Debug, PartialEq, Eq, Clone, Default)]
26pub struct PublishProperties {
27 pub topic_alias: Option<NonZeroU16>,
28 pub correlation_data: Option<Bytes>,
29 pub message_expiry_interval: Option<NonZeroU32>,
30 pub content_type: Option<ByteString>,
31 pub user_properties: UserProperties,
32 pub is_utf8_payload: bool,
33 pub response_topic: Option<ByteString>,
34 pub subscription_ids: Vec<NonZeroU32>,
35}
36
37impl Publish {
38 pub(crate) fn decode(
39 src: &mut Bytes,
40 packet_flags: u8,
41 payload_size: u32,
42 ) -> Result<Self, DecodeError> {
43 let topic = ByteString::decode(src)?;
44 let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
45 let packet_id = if qos == QoS::AtMostOnce {
46 None
47 } else {
48 Some(NonZeroU16::decode(src)?) };
50 let properties = parse_publish_properties(src)?;
51
52 Ok(Self {
53 qos,
54 topic,
55 packet_id,
56 properties,
57 payload_size,
58 dup: (packet_flags & 0b1000) == 0b1000,
59 retain: (packet_flags & 0b0001) == 0b0001,
60 })
61 }
62
63 pub(crate) fn packet_header_size(
64 src: &BytesMut,
65 packet_flags: u8,
66 ) -> Result<Option<u32>, DecodeError> {
67 if src.remaining() < 2 {
68 return Ok(None);
69 }
70
71 let mut len = u16::from_be_bytes([src[0], src[1]]) as u32 + 2;
73
74 let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
76 if qos != QoS::AtMostOnce {
77 len += 2; }
79 if src.remaining() < len as usize {
80 return Ok(None);
81 }
82
83 if let Some((prop_len, pos)) = utils::decode_variable_length(&src[len as usize..])? {
85 Ok(Some(len + prop_len + pos as u32))
86 } else {
87 Ok(None)
88 }
89 }
90}
91
92fn parse_publish_properties(src: &mut Bytes) -> Result<PublishProperties, DecodeError> {
93 let prop_src = &mut utils::take_properties(src)?;
94
95 let mut message_expiry_interval = None;
96 let mut topic_alias = None;
97 let mut content_type = None;
98 let mut correlation_data = None;
99 let mut subscription_ids = Vec::new();
100 let mut response_topic = None;
101 let mut is_utf8_payload = None;
102 let mut user_props = Vec::new();
103
104 while prop_src.has_remaining() {
105 match prop_src.get_u8() {
106 pt::UTF8_PAYLOAD => is_utf8_payload.read_value(prop_src)?,
107 pt::MSG_EXPIRY_INT => message_expiry_interval.read_value(prop_src)?,
108 pt::CONTENT_TYPE => content_type.read_value(prop_src)?,
109 pt::RESP_TOPIC => response_topic.read_value(prop_src)?,
110 pt::CORR_DATA => correlation_data.read_value(prop_src)?,
111 pt::SUB_ID => {
112 let id = utils::decode_variable_length_cursor(prop_src)?;
113 subscription_ids.push(NonZeroU32::new(id).ok_or(DecodeError::MalformedPacket)?);
114 }
115 pt::TOPIC_ALIAS => topic_alias.read_value(prop_src)?,
116 pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
117 _ => return Err(DecodeError::MalformedPacket),
118 }
119 }
120
121 Ok(PublishProperties {
122 message_expiry_interval,
123 topic_alias,
124 content_type,
125 correlation_data,
126 subscription_ids,
127 response_topic,
128 is_utf8_payload: is_utf8_payload.unwrap_or(false),
129 user_properties: user_props,
130 })
131}
132
133impl EncodeLtd for Publish {
134 fn encoded_size(&self, _limit: u32) -> usize {
135 let packet_id_size = if self.qos == QoS::AtMostOnce { 0 } else { 2 };
136 self.topic.encoded_size()
137 + packet_id_size
138 + self.properties.encoded_size(_limit)
139 + self.payload_size as usize
140 }
141
142 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
143 buf.put_u8(
145 packet_type::PUBLISH_START
146 | (u8::from(self.qos) << 1)
147 | ((self.dup as u8) << 3)
148 | (self.retain as u8),
149 );
150 utils::write_variable_length(size, buf);
151
152 let start_len = buf.len();
154
155 self.topic.encode(buf)?;
156 if self.qos == QoS::AtMostOnce {
157 if self.packet_id.is_some() {
158 return Err(EncodeError::MalformedPacket); }
160 } else {
161 self.packet_id.ok_or(EncodeError::PacketIdRequired)?.encode(buf)?;
162 }
163 self.properties
164 .encode(buf, size - (buf.len() - start_len + self.payload_size as usize) as u32)?;
165
166 Ok(())
167 }
168}
169
170impl EncodeLtd for PublishProperties {
171 fn encoded_size(&self, _limit: u32) -> usize {
172 let prop_len = encoded_property_size(&self.topic_alias)
173 + encoded_property_size(&self.correlation_data)
174 + encoded_property_size(&self.message_expiry_interval)
175 + encoded_property_size(&self.content_type)
176 + encoded_property_size_default(&self.is_utf8_payload, false)
177 + encoded_property_size(&self.response_topic)
178 + self
179 .subscription_ids
180 .iter()
181 .fold(0, |acc, id| acc + 1 + var_int_len(id.get() as usize) as usize)
182 + self.user_properties.encoded_size();
183 prop_len + var_int_len(prop_len) as usize
184 }
185
186 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
187 let prop_len = var_int_len_from_size(size);
188 utils::write_variable_length(prop_len, buf);
189 encode_property(&self.topic_alias, pt::TOPIC_ALIAS, buf)?;
190 encode_property(&self.correlation_data, pt::CORR_DATA, buf)?;
191 encode_property(&self.message_expiry_interval, pt::MSG_EXPIRY_INT, buf)?;
192 encode_property(&self.content_type, pt::CONTENT_TYPE, buf)?;
193 encode_property_default(&self.is_utf8_payload, false, pt::UTF8_PAYLOAD, buf)?;
194 encode_property(&self.response_topic, pt::RESP_TOPIC, buf)?;
195 for sub_id in self.subscription_ids.iter() {
196 buf.put_u8(pt::SUB_ID);
197 write_variable_length(sub_id.get(), buf);
198 }
199 self.user_properties.encode(buf)
200 }
201}