mqute_codec/protocol/v4/
publish.rs

1//! # Publish Packet V4
2//!
3//! This module defines the `Publish` packet, which is used in the MQTT protocol to send
4//! messages from a client to a server or from a server to a client. The `Publish` packet
5//! includes a topic, payload, and flags for QoS, retain, and duplicate delivery.
6
7use crate::codec::{Decode, Encode, RawPacket};
8use crate::protocol::common::PublishHeader;
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Bytes, BytesMut};
12
13/// Represents an MQTT `Publish` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::v4::Publish;
19/// use mqute_codec::protocol::{QoS, Flags};
20/// use bytes::Bytes;
21///
22/// let flags = Flags::new(QoS::AtLeastOnce);
23/// let publish = Publish::new("topic", 1234, Bytes::from("message"), flags);
24///
25/// assert_eq!(publish.flags(), flags);
26/// assert_eq!(publish.topic(), "topic");
27/// assert_eq!(publish.packet_id(), Some(1234));
28/// assert_eq!(publish.payload(), Bytes::from("message"));
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct Publish {
32    /// The header of the `Publish` packet, containing the topic and packet ID (for QoS > 0).
33    header: PublishHeader,
34
35    /// The payload of the `Publish` packet, containing the message data.
36    payload: Bytes,
37
38    /// The flags for the `Publish` packet, including QoS, retain, and duplicate delivery.
39    flags: Flags,
40}
41
42impl Publish {
43    /// Creates a new `Publish` packet.
44    ///
45    /// # Panics
46    ///
47    /// Panics if:
48    /// - `packet_id` is zero for QoS > 0.
49    /// - The topic name is invalid according to MQTT topic naming rules.
50    pub fn new<T: Into<String>>(topic: T, packet_id: u16, payload: Bytes, flags: Flags) -> Self {
51        if flags.qos != QoS::AtMostOnce && packet_id == 0 {
52            panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
53        }
54
55        Publish {
56            header: PublishHeader::new(topic, packet_id),
57            payload,
58            flags,
59        }
60    }
61
62    /// Returns the flags for the `Publish` packet.
63    ///
64    /// The flags include QoS, retain, and duplicate delivery settings.
65    pub fn flags(&self) -> Flags {
66        self.flags
67    }
68
69    /// Returns the topic of the `Publish` packet.
70    pub fn topic(&self) -> String {
71        self.header.topic.clone()
72    }
73
74    /// Returns the packet ID of the `Publish` packet.
75    ///
76    /// For QoS 0, this method returns `None` because no packet ID is used.
77    /// For QoS 1 and 2, it returns the packet ID as `Some(u16)`.
78    pub fn packet_id(&self) -> Option<u16> {
79        if self.flags.qos != QoS::AtMostOnce {
80            Some(self.header.packet_id)
81        } else {
82            None
83        }
84    }
85
86    /// Returns the payload of the `Publish` packet.
87    pub fn payload(&self) -> Bytes {
88        self.payload.clone()
89    }
90}
91
92impl Decode for Publish {
93    /// Decodes a `Publish` packet from a raw MQTT packet.
94    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
95        if packet.header.packet_type() != PacketType::Publish {
96            return Err(Error::MalformedPacket);
97        }
98
99        let flags = packet.header.flags();
100
101        let publish_header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
102
103        let packet = Publish {
104            header: publish_header,
105            payload: packet.payload,
106            flags,
107        };
108        Ok(packet)
109    }
110}
111
112impl Encode for Publish {
113    /// Encodes the `Publish` packet into a byte buffer.
114    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
115        let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
116        header.encode(buf)?;
117        self.header.encode(buf, self.flags.qos);
118
119        // Append message
120        buf.extend_from_slice(&self.payload);
121        Ok(())
122    }
123
124    /// Returns the length of the `Publish` packet payload.
125    ///
126    /// The payload length includes the length of the Publish header and the message payload.
127    fn payload_len(&self) -> usize {
128        self.header.encoded_len(self.flags.qos) + self.payload.len()
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::codec::PacketCodec;
136    use crate::protocol::QoS;
137    use bytes::BytesMut;
138    use tokio_util::codec::Decoder;
139
140    #[test]
141    fn publish_decode() {
142        let mut codec = PacketCodec::new(None, None);
143
144        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
145        let data = &[
146            (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
147            0x0d,                                           // Remaining len
148            0x00,
149            0x05,
150            b'/',
151            b't',
152            b'e',
153            b's',
154            b't',
155            0x12,
156            0x34,
157            0xde,
158            0xad,
159            0xbe,
160            0xef,
161        ];
162
163        let mut stream = BytesMut::new();
164
165        stream.extend_from_slice(&data[..]);
166
167        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
168        let packet = Publish::decode(raw_packet).unwrap();
169
170        assert_eq!(
171            packet,
172            Publish::new(
173                "/test",
174                0x1234,
175                Bytes::copy_from_slice(&payload),
176                Flags::new(QoS::ExactlyOnce)
177            )
178        );
179    }
180
181    #[test]
182    fn publish_encode() {
183        let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
184        let packet = Publish::new(
185            "/test",
186            0x1234,
187            Bytes::copy_from_slice(&payload),
188            Flags::new(QoS::ExactlyOnce),
189        );
190
191        let mut stream = BytesMut::new();
192        packet.encode(&mut stream).unwrap();
193        assert_eq!(
194            stream,
195            vec![
196                (PacketType::Publish as u8) << 4 | 0b0000_0100, // Packet type
197                0x0d,                                           // Remaining len
198                0x00,
199                0x05,
200                b'/',
201                b't',
202                b'e',
203                b's',
204                b't',
205                0x12,
206                0x34,
207                0xde,
208                0xad,
209                0xbe,
210                0xef,
211            ]
212        );
213    }
214}