1use super::property::{Property, PropertyFrame, property_decode, property_encode, property_len};
8use crate::Error;
9use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
10use crate::codec::{Decode, Encode, RawPacket};
11use crate::protocol::util::len_bytes;
12use crate::protocol::{FixedHeader, Flags, PacketType, QoS, common};
13use bytes::{Buf, Bytes, BytesMut};
14use std::time::Duration;
15
16#[derive(Debug, Default, Clone, PartialEq, Eq)]
46pub struct PublishProperties {
47 pub payload_format_indicator: Option<u8>,
49 pub message_expiry_interval: Option<Duration>,
51 pub topic_alias: Option<u16>,
53 pub response_topic: Option<String>,
55 pub correlation_data: Option<Bytes>,
57 pub user_properties: Vec<(String, String)>,
59 pub subscription_id: Vec<u32>,
61 pub content_type: Option<String>,
63}
64
65impl PropertyFrame for PublishProperties {
66 fn encoded_len(&self) -> usize {
68 let mut len = 0usize;
69
70 len += property_len!(&self.payload_format_indicator);
71 len += property_len!(&self.message_expiry_interval);
72 len += property_len!(&self.topic_alias);
73 len += property_len!(&self.response_topic);
74 len += property_len!(&self.correlation_data);
75 len += property_len!(&self.user_properties);
76 len += property_len!(&self.subscription_id);
77 len += property_len!(&self.content_type);
78
79 len
80 }
81
82 fn encode(&self, buf: &mut BytesMut) {
84 property_encode!(
85 &self.payload_format_indicator,
86 Property::PayloadFormatIndicator,
87 buf
88 );
89 property_encode!(
90 &self.message_expiry_interval,
91 Property::MessageExpiryInterval,
92 buf
93 );
94 property_encode!(&self.topic_alias, Property::TopicAlias, buf);
95 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
96 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
97 property_encode!(&self.user_properties, Property::UserProp, buf);
98 property_encode!(&self.subscription_id, Property::SubscriptionIdentifier, buf);
99 property_encode!(&self.content_type, Property::ContentType, buf);
100 }
101
102 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
103 where
104 Self: Sized,
105 {
106 if buf.is_empty() {
107 return Ok(None);
108 }
109
110 let mut payload_format_indicator: Option<u8> = None;
111 let mut message_expiry_interval: Option<Duration> = None;
112 let mut topic_alias: Option<u16> = None;
113 let mut response_topic: Option<String> = None;
114 let mut correlation_data: Option<Bytes> = None;
115 let mut user_properties: Vec<(String, String)> = Vec::new();
116 let mut subscription_id: Vec<u32> = Vec::new();
117 let mut content_type: Option<String> = None;
118
119 while buf.has_remaining() {
120 let property: Property = decode_byte(buf)?.try_into()?;
121 match property {
122 Property::PayloadFormatIndicator => {
123 property_decode!(&mut payload_format_indicator, buf);
124 }
125 Property::MessageExpiryInterval => {
126 property_decode!(&mut message_expiry_interval, buf);
127 }
128 Property::TopicAlias => {
129 property_decode!(&mut topic_alias, buf);
130 }
131 Property::ResponseTopic => {
132 property_decode!(&mut response_topic, buf);
133 }
134 Property::CorrelationData => {
135 property_decode!(&mut correlation_data, buf);
136 }
137 Property::UserProp => {
138 property_decode!(&mut user_properties, buf);
139 }
140 Property::SubscriptionIdentifier => {
141 property_decode!(&mut subscription_id, buf);
142 }
143 Property::ContentType => {
144 property_decode!(&mut content_type, buf);
145 }
146 _ => return Err(Error::PropertyMismatch),
147 }
148 }
149
150 Ok(Some(PublishProperties {
151 payload_format_indicator,
152 message_expiry_interval,
153 topic_alias,
154 response_topic,
155 correlation_data,
156 user_properties,
157 subscription_id,
158 content_type,
159 }))
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
165pub(crate) struct PublishHeader {
166 pub(crate) inner: common::PublishHeader,
168 pub(crate) properties: Option<PublishProperties>,
170}
171
172impl PublishHeader {
173 pub(crate) fn new<T: Into<String>>(
175 topic: T,
176 packet_id: u16,
177 properties: Option<PublishProperties>,
178 ) -> Self {
179 PublishHeader {
180 inner: common::PublishHeader::new(topic, packet_id),
181 properties,
182 }
183 }
184
185 pub(crate) fn encoded_len(&self, qos: QoS) -> usize {
187 let properties_len = self
188 .properties
189 .as_ref()
190 .map(|properties| properties.encoded_len())
191 .unwrap_or(0);
192
193 self.inner.encoded_len(qos) + len_bytes(properties_len) + properties_len
194 }
195
196 pub(crate) fn encode(&self, buf: &mut BytesMut, qos: QoS) -> Result<(), Error> {
198 self.inner.encode(buf, qos);
199
200 let properties_len = self
201 .properties
202 .as_ref()
203 .map(|properties| properties.encoded_len())
204 .unwrap_or(0) as u32;
205
206 encode_variable_integer(buf, properties_len)?;
207
208 if let Some(properties) = self.properties.as_ref() {
209 properties.encode(buf);
210 }
211
212 Ok(())
213 }
214
215 pub(crate) fn decode(payload: &mut Bytes, qos: QoS) -> Result<Self, Error> {
217 let inner = common::PublishHeader::decode(payload, qos)?;
218
219 let properties_len = decode_variable_integer(payload)? as usize;
220 if payload.len() < properties_len + len_bytes(properties_len) {
221 return Err(Error::MalformedPacket);
222 }
223
224 payload.advance(len_bytes(properties_len));
225 let mut properties_buf = payload.split_to(properties_len);
226 let properties = PublishProperties::decode(&mut properties_buf)?;
227
228 Ok(PublishHeader { inner, properties })
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct Publish {
268 header: PublishHeader,
269 payload: Bytes,
270 flags: Flags,
271}
272
273impl Publish {
274 pub fn new<T: Into<String>>(
282 topic: T,
283 packet_id: u16,
284 properties: Option<PublishProperties>,
285 payload: Bytes,
286 flags: Flags,
287 ) -> Self {
288 if flags.qos != QoS::AtMostOnce && packet_id == 0 {
289 panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
290 }
291
292 Publish {
293 header: PublishHeader::new(topic, packet_id, properties),
294 payload,
295 flags,
296 }
297 }
298
299 pub fn flags(&self) -> Flags {
301 self.flags
302 }
303
304 pub fn packet_id(&self) -> Option<u16> {
306 if self.flags.qos != QoS::AtMostOnce {
307 Some(self.header.inner.packet_id)
308 } else {
309 None
310 }
311 }
312
313 pub fn topic(&self) -> String {
315 self.header.inner.topic.clone()
316 }
317
318 pub fn properties(&self) -> Option<PublishProperties> {
320 self.header.properties.clone()
321 }
322
323 pub fn payload(&self) -> Bytes {
325 self.payload.clone()
326 }
327}
328
329impl Decode for Publish {
330 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
332 if packet.header.packet_type() != PacketType::Publish {
333 return Err(Error::MalformedPacket);
334 }
335
336 let flags = packet.header.flags();
337 let header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
338 let packet = Publish {
339 header,
340 payload: packet.payload,
341 flags,
342 };
343 Ok(packet)
344 }
345}
346
347impl Encode for Publish {
348 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
350 let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
351 header.encode(buf)?;
352 self.header.encode(buf, self.flags.qos)?;
353 buf.extend_from_slice(&self.payload);
354 Ok(())
355 }
356
357 fn payload_len(&self) -> usize {
359 self.header.encoded_len(self.flags.qos) + self.payload.len()
360 }
361}