mqute_codec/protocol/v4/
publish.rs

1//! # Publish Packet V4
2//!
3//! This module defines the `Publish` packet, which is used in the MQTT protocol to send
4//! messages from a client to a server or from a server to a client. The `Publish` packet
5//! includes a topic, payload, and flags for QoS, retain, and duplicate delivery.
6
7use crate::Error;
8use crate::codec::{Decode, Encode, RawPacket};
9use crate::protocol::common::PublishHeader;
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, traits};
11use bytes::{Bytes, BytesMut};
12
13/// Represents an MQTT `Publish` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::v4::Publish;
19/// use mqute_codec::protocol::{QoS, Flags};
20/// use bytes::Bytes;
21///
22/// let flags = Flags::new(QoS::AtLeastOnce);
23/// let publish = Publish::new("topic", 1234, Bytes::from("message"), flags);
24///
25/// assert_eq!(publish.flags(), flags);
26/// assert_eq!(publish.topic(), "topic");
27/// assert_eq!(publish.packet_id(), Some(1234));
28/// assert_eq!(publish.payload(), Bytes::from("message"));
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct Publish {
32    /// The header of the `Publish` packet, containing the topic and packet ID (for QoS > 0).
33    header: PublishHeader,
34
35    /// The payload of the `Publish` packet, containing the message data.
36    payload: Bytes,
37
38    /// The flags for the `Publish` packet, including QoS, retain, and duplicate delivery.
39    flags: Flags,
40}
41
42impl Publish {
43    /// Creates a new `Publish` packet.
44    ///
45    /// # Panics
46    ///
47    /// Panics if:
48    /// - `packet_id` is zero for QoS > 0.
49    /// - The topic name is invalid according to MQTT topic naming rules.
50    pub fn new<T: Into<String>>(topic: T, packet_id: u16, payload: Bytes, flags: Flags) -> Self {
51        if flags.qos != QoS::AtMostOnce && packet_id == 0 {
52            panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
53        }
54
55        Publish {
56            header: PublishHeader::new(topic, packet_id),
57            payload,
58            flags,
59        }
60    }
61
62    /// Returns the flags for the `Publish` packet.
63    ///
64    /// The flags include QoS, retain, and duplicate delivery settings.
65    pub fn flags(&self) -> Flags {
66        self.flags
67    }
68
69    /// Returns the topic of the `Publish` packet.
70    pub fn topic(&self) -> String {
71        self.header.topic.clone()
72    }
73
74    /// Returns the packet ID of the `Publish` packet.
75    ///
76    /// For QoS 0, this method returns `None` because no packet ID is used.
77    /// For QoS 1 and 2, it returns the packet ID as `Some(u16)`.
78    pub fn packet_id(&self) -> Option<u16> {
79        if self.flags.qos != QoS::AtMostOnce {
80            Some(self.header.packet_id)
81        } else {
82            None
83        }
84    }
85
86    /// Returns the payload of the `Publish` packet.
87    pub fn payload(&self) -> Bytes {
88        self.payload.clone()
89    }
90}
91
92impl Decode for Publish {
93    /// Decodes a `Publish` packet from a raw MQTT packet.
94    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
95        if packet.header.packet_type() != PacketType::Publish {
96            return Err(Error::MalformedPacket);
97        }
98
99        let flags = packet.header.flags();
100
101        let publish_header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
102
103        let packet = Publish {
104            header: publish_header,
105            payload: packet.payload,
106            flags,
107        };
108        Ok(packet)
109    }
110}
111
112impl Encode for Publish {
113    /// Encodes the `Publish` packet into a byte buffer.
114    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
115        let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
116        header.encode(buf)?;
117        self.header.encode(buf, self.flags.qos);
118
119        // Append message
120        buf.extend_from_slice(&self.payload);
121        Ok(())
122    }
123
124    /// Returns the length of the `Publish` packet payload.
125    ///
126    /// The payload length includes the length of the Publish header and the message payload.
127    fn payload_len(&self) -> usize {
128        self.header.encoded_len(self.flags.qos) + self.payload.len()
129    }
130}
131
132impl traits::Publish for Publish {}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::codec::PacketCodec;
138    use crate::protocol::QoS;
139    use bytes::BytesMut;
140    use tokio_util::codec::Decoder;
141
142    #[test]
143    fn publish_decode() {
144        let mut codec = PacketCodec::new(None, None);
145
146        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
147        let data = &[
148            (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
149            0x0d,                                           // Remaining len
150            0x00,
151            0x05,
152            b'/',
153            b't',
154            b'e',
155            b's',
156            b't',
157            0x12,
158            0x34,
159            0xde,
160            0xad,
161            0xbe,
162            0xef,
163        ];
164
165        let mut stream = BytesMut::new();
166
167        stream.extend_from_slice(&data[..]);
168
169        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
170        let packet = Publish::decode(raw_packet).unwrap();
171
172        assert_eq!(
173            packet,
174            Publish::new(
175                "/test",
176                0x1234,
177                Bytes::copy_from_slice(&payload),
178                Flags::new(QoS::ExactlyOnce)
179            )
180        );
181    }
182
183    #[test]
184    fn publish_encode() {
185        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
186        let packet = Publish::new(
187            "/test",
188            0x1234,
189            Bytes::copy_from_slice(&payload),
190            Flags::new(QoS::ExactlyOnce),
191        );
192
193        let mut stream = BytesMut::new();
194        packet.encode(&mut stream).unwrap();
195        assert_eq!(
196            stream,
197            vec![
198                (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
199                0x0d,                                           // Remaining len
200                0x00,
201                0x05,
202                b'/',
203                b't',
204                b'e',
205                b's',
206                b't',
207                0x12,
208                0x34,
209                0xde,
210                0xad,
211                0xbe,
212                0xef,
213            ]
214        );
215    }
216}