mqute_codec/protocol/v4/
publish.rs

1//! # Publish Packet V4
2//!
3//! This module defines the `Publish` packet, which is used in the MQTT protocol to send
4//! messages from a client to a server or from a server to a client. The `Publish` packet
5//! includes a topic, payload, and flags for QoS, retain, and duplicate delivery.
6
7use crate::codec::{Decode, Encode, RawPacket};
8use crate::protocol::common::PublishHeader;
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Bytes, BytesMut};
12
13/// Represents an MQTT `Publish` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::v4::Publish;
19/// use mqute_codec::protocol::{QoS, Flags};
20/// use bytes::Bytes;
21///
22/// let flags = Flags::new(QoS::AtLeastOnce);
23/// let publish = Publish::new("topic", 1234, Bytes::from("message"), flags);
24///
25/// assert_eq!(publish.flags(), flags);
26/// assert_eq!(publish.topic(), "topic");
27/// assert_eq!(publish.packet_id(), Some(1234));
28/// assert_eq!(publish.payload(), Bytes::from("message"));
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct Publish {
32    /// The header of the `Publish` packet, containing the topic and packet ID (for QoS > 0).
33    header: PublishHeader,
34
35    /// The payload of the `Publish` packet, containing the message data.
36    payload: Bytes,
37
38    /// The flags for the `Publish` packet, including QoS, retain, and duplicate delivery.
39    flags: Flags,
40}
41
42impl Publish {
43    /// Creates a new `Publish` packet.
44    ///
45    /// # Panics
46    ///
47    /// Panics if `packet_id` is zero for QoS > 0.
48    pub fn new<T: Into<String>>(topic: T, packet_id: u16, payload: Bytes, flags: Flags) -> Self {
49        if flags.qos != QoS::AtMostOnce && packet_id == 0 {
50            panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
51        }
52
53        Publish {
54            header: PublishHeader::new(topic, packet_id),
55            payload,
56            flags,
57        }
58    }
59
60    /// Returns the flags for the `Publish` packet.
61    ///
62    /// The flags include QoS, retain, and duplicate delivery settings.
63    pub fn flags(&self) -> Flags {
64        self.flags
65    }
66
67    /// Returns the topic of the `Publish` packet.
68    pub fn topic(&self) -> String {
69        self.header.topic.clone()
70    }
71
72    /// Returns the packet ID of the `Publish` packet.
73    ///
74    /// For QoS 0, this method returns `None` because no packet ID is used.
75    /// For QoS 1 and 2, it returns the packet ID as `Some(u16)`.
76    pub fn packet_id(&self) -> Option<u16> {
77        if self.flags.qos != QoS::AtMostOnce {
78            Some(self.header.packet_id)
79        } else {
80            None
81        }
82    }
83
84    /// Returns the payload of the `Publish` packet.
85    pub fn payload(&self) -> Bytes {
86        self.payload.clone()
87    }
88}
89
90impl Decode for Publish {
91    /// Decodes a `Publish` packet from a raw MQTT packet.
92    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
93        if packet.header.packet_type() != PacketType::Publish {
94            return Err(Error::MalformedPacket);
95        }
96
97        let flags = packet.header.flags();
98
99        let publish_header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
100
101        let packet = Publish {
102            header: publish_header,
103            payload: packet.payload,
104            flags,
105        };
106        Ok(packet)
107    }
108}
109
110impl Encode for Publish {
111    /// Encodes the `Publish` packet into a byte buffer.
112    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
113        let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
114        header.encode(buf)?;
115        self.header.encode(buf, self.flags.qos);
116
117        // Append message
118        buf.extend_from_slice(&self.payload);
119        Ok(())
120    }
121
122    /// Returns the length of the `Publish` packet payload.
123    ///
124    /// The payload length includes the length of the Publish header and the message payload.
125    fn payload_len(&self) -> usize {
126        self.header.encoded_len(self.flags.qos) + self.payload.len()
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::codec::PacketCodec;
134    use crate::protocol::QoS;
135    use bytes::BytesMut;
136    use tokio_util::codec::Decoder;
137
138    #[test]
139    fn publish_decode() {
140        let mut codec = PacketCodec::new(None, None);
141
142        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
143        let data = &[
144            (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
145            0x0d,                                           // Remaining len
146            0x00,
147            0x05,
148            b'/',
149            b't',
150            b'e',
151            b's',
152            b't',
153            0x12,
154            0x34,
155            0xde,
156            0xad,
157            0xbe,
158            0xef,
159        ];
160
161        let mut stream = BytesMut::new();
162
163        stream.extend_from_slice(&data[..]);
164
165        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
166        let packet = Publish::decode(raw_packet).unwrap();
167
168        assert_eq!(
169            packet,
170            Publish::new(
171                "/test",
172                0x1234,
173                Bytes::copy_from_slice(&payload),
174                Flags::new(QoS::ExactlyOnce)
175            )
176        );
177    }
178
179    #[test]
180    fn publish_encode() {
181        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
182        let packet = Publish::new(
183            "/test",
184            0x1234,
185            Bytes::copy_from_slice(&payload),
186            Flags::new(QoS::ExactlyOnce),
187        );
188
189        let mut stream = BytesMut::new();
190        packet.encode(&mut stream).unwrap();
191        assert_eq!(
192            stream,
193            vec![
194                (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
195                0x0d,                                           // Remaining len
196                0x00,
197                0x05,
198                b'/',
199                b't',
200                b'e',
201                b's',
202                b't',
203                0x12,
204                0x34,
205                0xde,
206                0xad,
207                0xbe,
208                0xef,
209            ]
210        );
211    }
212}