1use super::property::{property_decode, property_encode, property_len, Property, PropertyFrame};
8use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::util::len_bytes;
11use crate::protocol::{common, FixedHeader, Flags, PacketType, QoS};
12use crate::Error;
13use bytes::{Buf, Bytes, BytesMut};
14
15#[derive(Debug, Default, Clone, PartialEq, Eq)]
44pub struct PublishProperties {
45 pub payload_format_indicator: Option<u8>,
47 pub message_expiry_interval: Option<u32>,
49 pub topic_alias: Option<u16>,
51 pub response_topic: Option<String>,
53 pub correlation_data: Option<Bytes>,
55 pub user_properties: Vec<(String, String)>,
57 pub subscription_id: Vec<u32>,
59 pub content_type: Option<String>,
61}
62
63impl PropertyFrame for PublishProperties {
64 fn encoded_len(&self) -> usize {
66 let mut len = 0usize;
67
68 len += property_len!(&self.payload_format_indicator);
69 len += property_len!(&self.message_expiry_interval);
70 len += property_len!(&self.topic_alias);
71 len += property_len!(&self.response_topic);
72 len += property_len!(&self.correlation_data);
73 len += property_len!(&self.user_properties);
74 len += property_len!(&self.subscription_id);
75 len += property_len!(&self.content_type);
76
77 len
78 }
79
80 fn encode(&self, buf: &mut BytesMut) {
82 property_encode!(
83 &self.payload_format_indicator,
84 Property::PayloadFormatIndicator,
85 buf
86 );
87 property_encode!(
88 &self.message_expiry_interval,
89 Property::MessageExpiryInterval,
90 buf
91 );
92 property_encode!(&self.topic_alias, Property::TopicAlias, buf);
93 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
94 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
95 property_encode!(&self.user_properties, Property::UserProp, buf);
96 property_encode!(&self.subscription_id, Property::SubscriptionIdentifier, buf);
97 property_encode!(&self.content_type, Property::ContentType, buf);
98 }
99
100 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
101 where
102 Self: Sized,
103 {
104 if buf.is_empty() {
105 return Ok(None);
106 }
107
108 let mut payload_format_indicator: Option<u8> = None;
109 let mut message_expiry_interval: Option<u32> = None;
110 let mut topic_alias: Option<u16> = None;
111 let mut response_topic: Option<String> = None;
112 let mut correlation_data: Option<Bytes> = None;
113 let mut user_properties: Vec<(String, String)> = Vec::new();
114 let mut subscription_id: Vec<u32> = Vec::new();
115 let mut content_type: Option<String> = None;
116
117 while buf.has_remaining() {
118 let property: Property = decode_byte(buf)?.try_into()?;
119 match property {
120 Property::PayloadFormatIndicator => {
121 property_decode!(&mut payload_format_indicator, buf);
122 }
123 Property::MessageExpiryInterval => {
124 property_decode!(&mut message_expiry_interval, buf);
125 }
126 Property::TopicAlias => {
127 property_decode!(&mut topic_alias, buf);
128 }
129 Property::ResponseTopic => {
130 property_decode!(&mut response_topic, buf);
131 }
132 Property::CorrelationData => {
133 property_decode!(&mut correlation_data, buf);
134 }
135 Property::UserProp => {
136 property_decode!(&mut user_properties, buf);
137 }
138 Property::SubscriptionIdentifier => {
139 property_decode!(&mut subscription_id, buf);
140 }
141 Property::ContentType => {
142 property_decode!(&mut content_type, buf);
143 }
144 _ => return Err(Error::PropertyMismatch),
145 }
146 }
147
148 Ok(Some(PublishProperties {
149 payload_format_indicator,
150 message_expiry_interval,
151 topic_alias,
152 response_topic,
153 correlation_data,
154 user_properties,
155 subscription_id,
156 content_type,
157 }))
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
163pub(crate) struct PublishHeader {
164 pub(crate) inner: common::PublishHeader,
166 pub(crate) properties: Option<PublishProperties>,
168}
169
170impl PublishHeader {
171 pub(crate) fn new<T: Into<String>>(
173 topic: T,
174 packet_id: u16,
175 properties: Option<PublishProperties>,
176 ) -> Self {
177 PublishHeader {
178 inner: common::PublishHeader::new(topic, packet_id),
179 properties,
180 }
181 }
182
183 pub(crate) fn encoded_len(&self, qos: QoS) -> usize {
185 let properties_len = self
186 .properties
187 .as_ref()
188 .map(|properties| properties.encoded_len())
189 .unwrap_or(0);
190
191 self.inner.encoded_len(qos) + len_bytes(properties_len) + properties_len
192 }
193
194 pub(crate) fn encode(&self, buf: &mut BytesMut, qos: QoS) -> Result<(), Error> {
196 self.inner.encode(buf, qos);
197
198 let properties_len = self
199 .properties
200 .as_ref()
201 .map(|properties| properties.encoded_len())
202 .unwrap_or(0) as u32;
203
204 encode_variable_integer(buf, properties_len)?;
205
206 if let Some(properties) = self.properties.as_ref() {
207 properties.encode(buf);
208 }
209
210 Ok(())
211 }
212
213 pub(crate) fn decode(payload: &mut Bytes, qos: QoS) -> Result<Self, Error> {
215 let inner = common::PublishHeader::decode(payload, qos)?;
216
217 let properties_len = decode_variable_integer(payload)? as usize;
218 if payload.len() < properties_len + len_bytes(properties_len) {
219 return Err(Error::MalformedPacket);
220 }
221
222 payload.advance(len_bytes(properties_len));
223 let mut properties_buf = payload.split_to(properties_len);
224 let properties = PublishProperties::decode(&mut properties_buf)?;
225
226 Ok(PublishHeader { inner, properties })
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct Publish {
266 header: PublishHeader,
267 payload: Bytes,
268 flags: Flags,
269}
270
271impl Publish {
272 pub fn new<T: Into<String>>(
277 topic: T,
278 packet_id: u16,
279 properties: Option<PublishProperties>,
280 payload: Bytes,
281 flags: Flags,
282 ) -> Self {
283 if flags.qos != QoS::AtMostOnce && packet_id == 0 {
284 panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
285 }
286
287 Publish {
288 header: PublishHeader::new(topic, packet_id, properties),
289 payload,
290 flags,
291 }
292 }
293
294 pub fn flags(&self) -> Flags {
296 self.flags
297 }
298
299 pub fn packet_id(&self) -> Option<u16> {
301 if self.flags.qos != QoS::AtMostOnce {
302 Some(self.header.inner.packet_id)
303 } else {
304 None
305 }
306 }
307
308 pub fn topic(&self) -> String {
310 self.header.inner.topic.clone()
311 }
312
313 pub fn properties(&self) -> Option<PublishProperties> {
315 self.header.properties.clone()
316 }
317
318 pub fn payload(&self) -> Bytes {
320 self.payload.clone()
321 }
322}
323
324impl Decode for Publish {
325 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
327 if packet.header.packet_type() != PacketType::Publish {
328 return Err(Error::MalformedPacket);
329 }
330
331 let flags = packet.header.flags();
332 let header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
333 let packet = Publish {
334 header,
335 payload: packet.payload,
336 flags,
337 };
338 Ok(packet)
339 }
340}
341
342impl Encode for Publish {
343 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
345 let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
346 header.encode(buf)?;
347 self.header.encode(buf, self.flags.qos)?;
348 buf.extend_from_slice(&self.payload);
349 Ok(())
350 }
351
352 fn payload_len(&self) -> usize {
354 self.header.encoded_len(self.flags.qos) + self.payload.len()
355 }
356}