mqute_codec/protocol/v4/
publish.rs1use crate::codec::{Decode, Encode, RawPacket};
8use crate::protocol::common::PublishHeader;
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Bytes, BytesMut};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct Publish {
32 header: PublishHeader,
34
35 payload: Bytes,
37
38 flags: Flags,
40}
41
42impl Publish {
43 pub fn new<T: Into<String>>(topic: T, packet_id: u16, payload: Bytes, flags: Flags) -> Self {
49 if flags.qos != QoS::AtMostOnce && packet_id == 0 {
50 panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
51 }
52
53 Publish {
54 header: PublishHeader::new(topic, packet_id),
55 payload,
56 flags,
57 }
58 }
59
60 pub fn flags(&self) -> Flags {
64 self.flags
65 }
66
67 pub fn topic(&self) -> String {
69 self.header.topic.clone()
70 }
71
72 pub fn packet_id(&self) -> Option<u16> {
77 if self.flags.qos != QoS::AtMostOnce {
78 Some(self.header.packet_id)
79 } else {
80 None
81 }
82 }
83
84 pub fn payload(&self) -> Bytes {
86 self.payload.clone()
87 }
88}
89
90impl Decode for Publish {
91 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
93 if packet.header.packet_type() != PacketType::Publish {
94 return Err(Error::MalformedPacket);
95 }
96
97 let flags = packet.header.flags();
98
99 let publish_header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
100
101 let packet = Publish {
102 header: publish_header,
103 payload: packet.payload,
104 flags,
105 };
106 Ok(packet)
107 }
108}
109
110impl Encode for Publish {
111 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
113 let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
114 header.encode(buf)?;
115 self.header.encode(buf, self.flags.qos);
116
117 buf.extend_from_slice(&self.payload);
119 Ok(())
120 }
121
122 fn payload_len(&self) -> usize {
126 self.header.encoded_len(self.flags.qos) + self.payload.len()
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use crate::codec::PacketCodec;
134 use crate::protocol::QoS;
135 use bytes::BytesMut;
136 use tokio_util::codec::Decoder;
137
138 #[test]
139 fn publish_decode() {
140 let mut codec = PacketCodec::new(None, None);
141
142 let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
143 let data = &[
144 (PacketType::Publish as u8) << 4 | 0b0000_0100, 0x0d, 0x00,
147 0x05,
148 b'/',
149 b't',
150 b'e',
151 b's',
152 b't',
153 0x12,
154 0x34,
155 0xde,
156 0xad,
157 0xbe,
158 0xef,
159 ];
160
161 let mut stream = BytesMut::new();
162
163 stream.extend_from_slice(&data[..]);
164
165 let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
166 let packet = Publish::decode(raw_packet).unwrap();
167
168 assert_eq!(
169 packet,
170 Publish::new(
171 "/test",
172 0x1234,
173 Bytes::copy_from_slice(&payload),
174 Flags::new(QoS::ExactlyOnce)
175 )
176 );
177 }
178
179 #[test]
180 fn publish_encode() {
181 let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
182 let packet = Publish::new(
183 "/test",
184 0x1234,
185 Bytes::copy_from_slice(&payload),
186 Flags::new(QoS::ExactlyOnce),
187 );
188
189 let mut stream = BytesMut::new();
190 packet.encode(&mut stream).unwrap();
191 assert_eq!(
192 stream,
193 vec![
194 (PacketType::Publish as u8) << 4 | 0b0000_0100, 0x0d, 0x00,
197 0x05,
198 b'/',
199 b't',
200 b'e',
201 b's',
202 b't',
203 0x12,
204 0x34,
205 0xde,
206 0xad,
207 0xbe,
208 0xef,
209 ]
210 );
211 }
212}