mqute_codec/protocol/v5/
publish.rs

1//! # Publish Packet - MQTT v5
2//!
3//! This module implements the MQTT v5 `Publish` packet, which is used to transport
4//! application messages between clients and servers. The packet supports all three
5//! QoS levels and includes extensive message properties for enhanced functionality.
6
7use super::property::{property_decode, property_encode, property_len, Property, PropertyFrame};
8use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::util::len_bytes;
11use crate::protocol::{common, FixedHeader, Flags, PacketType, QoS};
12use crate::Error;
13use bytes::{Buf, Bytes, BytesMut};
14
15/// Represents properties of a `Publish` packet in MQTT v5
16///
17/// These properties provide extended message metadata including:
18/// - Message formatting hints
19/// - Expiration timing
20/// - Topic aliasing
21/// - Response routing
22/// - Correlation data
23/// - Subscription identifiers
24/// - Content type information
25///
26/// # Example
27///
28/// ```rust
29/// use bytes::Bytes;
30/// use mqute_codec::protocol::v5::PublishProperties;
31///
32/// let properties = PublishProperties {
33///     payload_format_indicator: Some(1), // UTF-8 payload
34///     message_expiry_interval: Some(3600), // Expires in 1 hour
35///     topic_alias: Some(5), // Use topic alias ID 5
36///     response_topic: Some("response/topic".into()),
37///     correlation_data: Some(Bytes::from("correlation-id")),
38///     user_properties: vec![("priority".into(), "high".into())],
39///     subscription_id: vec![42], // Subscription ID
40///     content_type: Some("application/json".into()),
41/// };
42/// ```
43#[derive(Debug, Default, Clone, PartialEq, Eq)]
44pub struct PublishProperties {
45    /// Indicates payload format (0=bytes, 1=UTF-8)
46    pub payload_format_indicator: Option<u8>,
47    /// Message lifetime in seconds
48    pub message_expiry_interval: Option<u32>,
49    /// Topic alias for message routing
50    pub topic_alias: Option<u16>,
51    /// Topic for response messages
52    pub response_topic: Option<String>,
53    /// Correlation data for request/response
54    pub correlation_data: Option<Bytes>,
55    /// User-defined key-value properties
56    pub user_properties: Vec<(String, String)>,
57    /// Subscription identifiers
58    pub subscription_id: Vec<u32>,
59    /// Content type descriptor
60    pub content_type: Option<String>,
61}
62
63impl PropertyFrame for PublishProperties {
64    /// Calculates the encoded length of all properties
65    fn encoded_len(&self) -> usize {
66        let mut len = 0usize;
67
68        len += property_len!(&self.payload_format_indicator);
69        len += property_len!(&self.message_expiry_interval);
70        len += property_len!(&self.topic_alias);
71        len += property_len!(&self.response_topic);
72        len += property_len!(&self.correlation_data);
73        len += property_len!(&self.user_properties);
74        len += property_len!(&self.subscription_id);
75        len += property_len!(&self.content_type);
76
77        len
78    }
79
80    /// Encodes all properties into a byte buffer
81    fn encode(&self, buf: &mut BytesMut) {
82        property_encode!(
83            &self.payload_format_indicator,
84            Property::PayloadFormatIndicator,
85            buf
86        );
87        property_encode!(
88            &self.message_expiry_interval,
89            Property::MessageExpiryInterval,
90            buf
91        );
92        property_encode!(&self.topic_alias, Property::TopicAlias, buf);
93        property_encode!(&self.response_topic, Property::ResponseTopic, buf);
94        property_encode!(&self.correlation_data, Property::CorrelationData, buf);
95        property_encode!(&self.user_properties, Property::UserProp, buf);
96        property_encode!(&self.subscription_id, Property::SubscriptionIdentifier, buf);
97        property_encode!(&self.content_type, Property::ContentType, buf);
98    }
99
100    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
101    where
102        Self: Sized,
103    {
104        if buf.is_empty() {
105            return Ok(None);
106        }
107
108        let mut payload_format_indicator: Option<u8> = None;
109        let mut message_expiry_interval: Option<u32> = None;
110        let mut topic_alias: Option<u16> = None;
111        let mut response_topic: Option<String> = None;
112        let mut correlation_data: Option<Bytes> = None;
113        let mut user_properties: Vec<(String, String)> = Vec::new();
114        let mut subscription_id: Vec<u32> = Vec::new();
115        let mut content_type: Option<String> = None;
116
117        while buf.has_remaining() {
118            let property: Property = decode_byte(buf)?.try_into()?;
119            match property {
120                Property::PayloadFormatIndicator => {
121                    property_decode!(&mut payload_format_indicator, buf);
122                }
123                Property::MessageExpiryInterval => {
124                    property_decode!(&mut message_expiry_interval, buf);
125                }
126                Property::TopicAlias => {
127                    property_decode!(&mut topic_alias, buf);
128                }
129                Property::ResponseTopic => {
130                    property_decode!(&mut response_topic, buf);
131                }
132                Property::CorrelationData => {
133                    property_decode!(&mut correlation_data, buf);
134                }
135                Property::UserProp => {
136                    property_decode!(&mut user_properties, buf);
137                }
138                Property::SubscriptionIdentifier => {
139                    property_decode!(&mut subscription_id, buf);
140                }
141                Property::ContentType => {
142                    property_decode!(&mut content_type, buf);
143                }
144                _ => return Err(Error::PropertyMismatch),
145            }
146        }
147
148        Ok(Some(PublishProperties {
149            payload_format_indicator,
150            message_expiry_interval,
151            topic_alias,
152            response_topic,
153            correlation_data,
154            user_properties,
155            subscription_id,
156            content_type,
157        }))
158    }
159}
160
161/// Internal header structure for `Publish` packets
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub(crate) struct PublishHeader {
164    /// Common publish header fields
165    pub(crate) inner: common::PublishHeader,
166    /// MQTT v5 specific properties
167    pub(crate) properties: Option<PublishProperties>,
168}
169
170impl PublishHeader {
171    /// Creates a new `Publish` header
172    pub(crate) fn new<T: Into<String>>(
173        topic: T,
174        packet_id: u16,
175        properties: Option<PublishProperties>,
176    ) -> Self {
177        PublishHeader {
178            inner: common::PublishHeader::new(topic, packet_id),
179            properties,
180        }
181    }
182
183    /// Calculates the encoded length of the header
184    pub(crate) fn encoded_len(&self, qos: QoS) -> usize {
185        let properties_len = self
186            .properties
187            .as_ref()
188            .map(|properties| properties.encoded_len())
189            .unwrap_or(0);
190
191        self.inner.encoded_len(qos) + len_bytes(properties_len) + properties_len
192    }
193
194    /// Encodes the header into a byte buffer
195    pub(crate) fn encode(&self, buf: &mut BytesMut, qos: QoS) -> Result<(), Error> {
196        self.inner.encode(buf, qos);
197
198        let properties_len = self
199            .properties
200            .as_ref()
201            .map(|properties| properties.encoded_len())
202            .unwrap_or(0) as u32;
203
204        encode_variable_integer(buf, properties_len)?;
205
206        if let Some(properties) = self.properties.as_ref() {
207            properties.encode(buf);
208        }
209
210        Ok(())
211    }
212
213    /// Decodes a header from byte payload
214    pub(crate) fn decode(payload: &mut Bytes, qos: QoS) -> Result<Self, Error> {
215        let inner = common::PublishHeader::decode(payload, qos)?;
216
217        let properties_len = decode_variable_integer(payload)? as usize;
218        if payload.len() < properties_len + len_bytes(properties_len) {
219            return Err(Error::MalformedPacket);
220        }
221
222        payload.advance(len_bytes(properties_len));
223        let mut properties_buf = payload.split_to(properties_len);
224        let properties = PublishProperties::decode(&mut properties_buf)?;
225
226        Ok(PublishHeader { inner, properties })
227    }
228}
229
230/// Represents an MQTT v5 `Publish` packet
231///
232/// This is the primary structure for message publication in MQTT, supporting:
233/// - All QoS levels (0, 1, and 2)
234/// - Retained messages
235/// - Duplicate delivery detection
236/// - Extensive message properties (v5 only)
237///
238/// # Example
239///
240/// ```rust
241/// use bytes::Bytes;
242/// use mqute_codec::protocol::{Flags, QoS};
243/// use mqute_codec::protocol::v5::{Publish, PublishProperties};
244///
245/// // Create a QoS 1 message with properties
246/// let properties = PublishProperties {
247///     content_type: Some("text/plain".into()),
248///     ..Default::default()
249/// };
250/// let publish = Publish::new(
251///     "test/topic",
252///     1234,
253///     Some(properties.clone()),
254///     Bytes::from("message payload"),
255///     Flags::new(QoS::AtLeastOnce)
256/// );
257///
258/// assert_eq!(publish.flags(), Flags::new(QoS::AtLeastOnce));
259///
260/// assert_eq!(publish.packet_id(), Some(1234u16));
261///
262/// assert_eq!(publish.properties(), Some(properties));
263/// ```
264#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct Publish {
266    header: PublishHeader,
267    payload: Bytes,
268    flags: Flags,
269}
270
271impl Publish {
272    /// Creates a new `Publish` packet
273    ///
274    /// # Panics
275    /// - If QoS > 0 but packet_id is 0
276    pub fn new<T: Into<String>>(
277        topic: T,
278        packet_id: u16,
279        properties: Option<PublishProperties>,
280        payload: Bytes,
281        flags: Flags,
282    ) -> Self {
283        if flags.qos != QoS::AtMostOnce && packet_id == 0 {
284            panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
285        }
286
287        Publish {
288            header: PublishHeader::new(topic, packet_id, properties),
289            payload,
290            flags,
291        }
292    }
293
294    /// Returns the packet flags
295    pub fn flags(&self) -> Flags {
296        self.flags
297    }
298
299    /// Returns the packet identifier (if QoS > 0)
300    pub fn packet_id(&self) -> Option<u16> {
301        if self.flags.qos != QoS::AtMostOnce {
302            Some(self.header.inner.packet_id)
303        } else {
304            None
305        }
306    }
307
308    /// Returns the message topic
309    pub fn topic(&self) -> String {
310        self.header.inner.topic.clone()
311    }
312
313    /// Returns a copy of the properties (if any)
314    pub fn properties(&self) -> Option<PublishProperties> {
315        self.header.properties.clone()
316    }
317
318    /// Returns the message payload
319    pub fn payload(&self) -> Bytes {
320        self.payload.clone()
321    }
322}
323
324impl Decode for Publish {
325    /// Decodes a `Publish` packet from raw bytes
326    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
327        if packet.header.packet_type() != PacketType::Publish {
328            return Err(Error::MalformedPacket);
329        }
330
331        let flags = packet.header.flags();
332        let header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
333        let packet = Publish {
334            header,
335            payload: packet.payload,
336            flags,
337        };
338        Ok(packet)
339    }
340}
341
342impl Encode for Publish {
343    /// Encodes the `Publish` packet into a byte buffer
344    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
345        let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
346        header.encode(buf)?;
347        self.header.encode(buf, self.flags.qos)?;
348        buf.extend_from_slice(&self.payload);
349        Ok(())
350    }
351
352    /// Calculates the total packet length
353    fn payload_len(&self) -> usize {
354        self.header.encoded_len(self.flags.qos) + self.payload.len()
355    }
356}