mqute_codec/protocol/v5/
publish.rs

1//! # Publish Packet - MQTT v5
2//!
3//! This module implements the MQTT v5 `Publish` packet, which is used to transport
4//! application messages between clients and servers. The packet supports all three
5//! QoS levels and includes extensive message properties for enhanced functionality.
6
7use super::property::{Property, PropertyFrame, property_decode, property_encode, property_len};
8use crate::Error;
9use crate::codec::util::{decode_byte, decode_variable_integer, encode_variable_integer};
10use crate::codec::{Decode, Encode, RawPacket};
11use crate::protocol::util::len_bytes;
12use crate::protocol::{FixedHeader, Flags, PacketType, QoS, common};
13use bytes::{Buf, Bytes, BytesMut};
14use std::time::Duration;
15
16/// Represents properties of a `Publish` packet in MQTT v5
17///
18/// These properties provide extended message metadata including:
19/// - Message formatting hints
20/// - Expiration timing
21/// - Topic aliasing
22/// - Response routing
23/// - Correlation data
24/// - Subscription identifiers
25/// - Content type information
26///
27/// # Example
28///
29/// ```rust
30/// use bytes::Bytes;
31/// use std::time::Duration;
32/// use mqute_codec::protocol::v5::PublishProperties;
33///
34/// let properties = PublishProperties {
35///     payload_format_indicator: Some(1), // UTF-8 payload
36///     message_expiry_interval: Some(Duration::from_secs(3600)), // Expires in 1 hour
37///     topic_alias: Some(5), // Use topic alias ID 5
38///     response_topic: Some("response/topic".into()),
39///     correlation_data: Some(Bytes::from("correlation-id")),
40///     user_properties: vec![("priority".into(), "high".into())],
41///     subscription_id: vec![42], // Subscription ID
42///     content_type: Some("application/json".into()),
43/// };
44/// ```
45#[derive(Debug, Default, Clone, PartialEq, Eq)]
46pub struct PublishProperties {
47    /// Indicates payload format (0=bytes, 1=UTF-8)
48    pub payload_format_indicator: Option<u8>,
49    /// Message lifetime in seconds
50    pub message_expiry_interval: Option<Duration>,
51    /// Topic alias for message routing
52    pub topic_alias: Option<u16>,
53    /// Topic for response messages
54    pub response_topic: Option<String>,
55    /// Correlation data for request/response
56    pub correlation_data: Option<Bytes>,
57    /// User-defined key-value properties
58    pub user_properties: Vec<(String, String)>,
59    /// Subscription identifiers
60    pub subscription_id: Vec<u32>,
61    /// Content type descriptor
62    pub content_type: Option<String>,
63}
64
65impl PropertyFrame for PublishProperties {
66    /// Calculates the encoded length of all properties
67    fn encoded_len(&self) -> usize {
68        let mut len = 0usize;
69
70        len += property_len!(&self.payload_format_indicator);
71        len += property_len!(&self.message_expiry_interval);
72        len += property_len!(&self.topic_alias);
73        len += property_len!(&self.response_topic);
74        len += property_len!(&self.correlation_data);
75        len += property_len!(&self.user_properties);
76        len += property_len!(&self.subscription_id);
77        len += property_len!(&self.content_type);
78
79        len
80    }
81
82    /// Encodes all properties into a byte buffer
83    fn encode(&self, buf: &mut BytesMut) {
84        property_encode!(
85            &self.payload_format_indicator,
86            Property::PayloadFormatIndicator,
87            buf
88        );
89        property_encode!(
90            &self.message_expiry_interval,
91            Property::MessageExpiryInterval,
92            buf
93        );
94        property_encode!(&self.topic_alias, Property::TopicAlias, buf);
95        property_encode!(&self.response_topic, Property::ResponseTopic, buf);
96        property_encode!(&self.correlation_data, Property::CorrelationData, buf);
97        property_encode!(&self.user_properties, Property::UserProp, buf);
98        property_encode!(&self.subscription_id, Property::SubscriptionIdentifier, buf);
99        property_encode!(&self.content_type, Property::ContentType, buf);
100    }
101
102    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
103    where
104        Self: Sized,
105    {
106        if buf.is_empty() {
107            return Ok(None);
108        }
109
110        let mut payload_format_indicator: Option<u8> = None;
111        let mut message_expiry_interval: Option<Duration> = None;
112        let mut topic_alias: Option<u16> = None;
113        let mut response_topic: Option<String> = None;
114        let mut correlation_data: Option<Bytes> = None;
115        let mut user_properties: Vec<(String, String)> = Vec::new();
116        let mut subscription_id: Vec<u32> = Vec::new();
117        let mut content_type: Option<String> = None;
118
119        while buf.has_remaining() {
120            let property: Property = decode_byte(buf)?.try_into()?;
121            match property {
122                Property::PayloadFormatIndicator => {
123                    property_decode!(&mut payload_format_indicator, buf);
124                }
125                Property::MessageExpiryInterval => {
126                    property_decode!(&mut message_expiry_interval, buf);
127                }
128                Property::TopicAlias => {
129                    property_decode!(&mut topic_alias, buf);
130                }
131                Property::ResponseTopic => {
132                    property_decode!(&mut response_topic, buf);
133                }
134                Property::CorrelationData => {
135                    property_decode!(&mut correlation_data, buf);
136                }
137                Property::UserProp => {
138                    property_decode!(&mut user_properties, buf);
139                }
140                Property::SubscriptionIdentifier => {
141                    property_decode!(&mut subscription_id, buf);
142                }
143                Property::ContentType => {
144                    property_decode!(&mut content_type, buf);
145                }
146                _ => return Err(Error::PropertyMismatch),
147            }
148        }
149
150        Ok(Some(PublishProperties {
151            payload_format_indicator,
152            message_expiry_interval,
153            topic_alias,
154            response_topic,
155            correlation_data,
156            user_properties,
157            subscription_id,
158            content_type,
159        }))
160    }
161}
162
163/// Internal header structure for `Publish` packets
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub(crate) struct PublishHeader {
166    /// Common publish header fields
167    pub(crate) inner: common::PublishHeader,
168    /// MQTT v5 specific properties
169    pub(crate) properties: Option<PublishProperties>,
170}
171
172impl PublishHeader {
173    /// Creates a new `Publish` header
174    pub(crate) fn new<T: Into<String>>(
175        topic: T,
176        packet_id: u16,
177        properties: Option<PublishProperties>,
178    ) -> Self {
179        PublishHeader {
180            inner: common::PublishHeader::new(topic, packet_id),
181            properties,
182        }
183    }
184
185    /// Calculates the encoded length of the header
186    pub(crate) fn encoded_len(&self, qos: QoS) -> usize {
187        let properties_len = self
188            .properties
189            .as_ref()
190            .map(|properties| properties.encoded_len())
191            .unwrap_or(0);
192
193        self.inner.encoded_len(qos) + len_bytes(properties_len) + properties_len
194    }
195
196    /// Encodes the header into a byte buffer
197    pub(crate) fn encode(&self, buf: &mut BytesMut, qos: QoS) -> Result<(), Error> {
198        self.inner.encode(buf, qos);
199
200        let properties_len = self
201            .properties
202            .as_ref()
203            .map(|properties| properties.encoded_len())
204            .unwrap_or(0) as u32;
205
206        encode_variable_integer(buf, properties_len)?;
207
208        if let Some(properties) = self.properties.as_ref() {
209            properties.encode(buf);
210        }
211
212        Ok(())
213    }
214
215    /// Decodes a header from byte payload
216    pub(crate) fn decode(payload: &mut Bytes, qos: QoS) -> Result<Self, Error> {
217        let inner = common::PublishHeader::decode(payload, qos)?;
218
219        let properties_len = decode_variable_integer(payload)? as usize;
220        if payload.len() < properties_len + len_bytes(properties_len) {
221            return Err(Error::MalformedPacket);
222        }
223
224        payload.advance(len_bytes(properties_len));
225        let mut properties_buf = payload.split_to(properties_len);
226        let properties = PublishProperties::decode(&mut properties_buf)?;
227
228        Ok(PublishHeader { inner, properties })
229    }
230}
231
232/// Represents an MQTT v5 `Publish` packet
233///
234/// This is the primary structure for message publication in MQTT, supporting:
235/// - All QoS levels (0, 1, and 2)
236/// - Retained messages
237/// - Duplicate delivery detection
238/// - Extensive message properties (v5 only)
239///
240/// # Example
241///
242/// ```rust
243/// use bytes::Bytes;
244/// use mqute_codec::protocol::{Flags, QoS};
245/// use mqute_codec::protocol::v5::{Publish, PublishProperties};
246///
247/// // Create a QoS 1 message with properties
248/// let properties = PublishProperties {
249///     content_type: Some("text/plain".into()),
250///     ..Default::default()
251/// };
252/// let publish = Publish::new(
253///     "test/topic",
254///     1234,
255///     Some(properties.clone()),
256///     Bytes::from("message payload"),
257///     Flags::new(QoS::AtLeastOnce)
258/// );
259///
260/// assert_eq!(publish.flags(), Flags::new(QoS::AtLeastOnce));
261///
262/// assert_eq!(publish.packet_id(), Some(1234u16));
263///
264/// assert_eq!(publish.properties(), Some(properties));
265/// ```
266#[derive(Debug, Clone, PartialEq, Eq)]
267pub struct Publish {
268    header: PublishHeader,
269    payload: Bytes,
270    flags: Flags,
271}
272
273impl Publish {
274    /// Creates a new `Publish` packet
275    ///
276    /// # Panics
277    ///
278    /// Panics if:
279    /// - QoS > 0 but packet_id is 0.
280    /// - The topic name is invalid according to MQTT topic naming rules.
281    pub fn new<T: Into<String>>(
282        topic: T,
283        packet_id: u16,
284        properties: Option<PublishProperties>,
285        payload: Bytes,
286        flags: Flags,
287    ) -> Self {
288        if flags.qos != QoS::AtMostOnce && packet_id == 0 {
289            panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
290        }
291
292        Publish {
293            header: PublishHeader::new(topic, packet_id, properties),
294            payload,
295            flags,
296        }
297    }
298
299    /// Returns the packet flags
300    pub fn flags(&self) -> Flags {
301        self.flags
302    }
303
304    /// Returns the packet identifier (if QoS > 0)
305    pub fn packet_id(&self) -> Option<u16> {
306        if self.flags.qos != QoS::AtMostOnce {
307            Some(self.header.inner.packet_id)
308        } else {
309            None
310        }
311    }
312
313    /// Returns the message topic
314    pub fn topic(&self) -> String {
315        self.header.inner.topic.clone()
316    }
317
318    /// Returns a copy of the properties (if any)
319    pub fn properties(&self) -> Option<PublishProperties> {
320        self.header.properties.clone()
321    }
322
323    /// Returns the message payload
324    pub fn payload(&self) -> Bytes {
325        self.payload.clone()
326    }
327}
328
329impl Decode for Publish {
330    /// Decodes a `Publish` packet from raw bytes
331    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
332        if packet.header.packet_type() != PacketType::Publish {
333            return Err(Error::MalformedPacket);
334        }
335
336        let flags = packet.header.flags();
337        let header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
338        let packet = Publish {
339            header,
340            payload: packet.payload,
341            flags,
342        };
343        Ok(packet)
344    }
345}
346
347impl Encode for Publish {
348    /// Encodes the `Publish` packet into a byte buffer
349    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
350        let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
351        header.encode(buf)?;
352        self.header.encode(buf, self.flags.qos)?;
353        buf.extend_from_slice(&self.payload);
354        Ok(())
355    }
356
357    /// Calculates the total packet length
358    fn payload_len(&self) -> usize {
359        self.header.encoded_len(self.flags.qos) + self.payload.len()
360    }
361}