mqute_codec/protocol/v4/
publish.rs1use crate::codec::{Decode, Encode, RawPacket};
8use crate::protocol::common::PublishHeader;
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Bytes, BytesMut};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct Publish {
32 header: PublishHeader,
34
35 payload: Bytes,
37
38 flags: Flags,
40}
41
42impl Publish {
43 pub fn new<T: Into<String>>(topic: T, packet_id: u16, payload: Bytes, flags: Flags) -> Self {
51 if flags.qos != QoS::AtMostOnce && packet_id == 0 {
52 panic!("Control packets must contain a non-zero packet identifier at QoS > 0");
53 }
54
55 Publish {
56 header: PublishHeader::new(topic, packet_id),
57 payload,
58 flags,
59 }
60 }
61
62 pub fn flags(&self) -> Flags {
66 self.flags
67 }
68
69 pub fn topic(&self) -> String {
71 self.header.topic.clone()
72 }
73
74 pub fn packet_id(&self) -> Option<u16> {
79 if self.flags.qos != QoS::AtMostOnce {
80 Some(self.header.packet_id)
81 } else {
82 None
83 }
84 }
85
86 pub fn payload(&self) -> Bytes {
88 self.payload.clone()
89 }
90}
91
92impl Decode for Publish {
93 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
95 if packet.header.packet_type() != PacketType::Publish {
96 return Err(Error::MalformedPacket);
97 }
98
99 let flags = packet.header.flags();
100
101 let publish_header = PublishHeader::decode(&mut packet.payload, flags.qos)?;
102
103 let packet = Publish {
104 header: publish_header,
105 payload: packet.payload,
106 flags,
107 };
108 Ok(packet)
109 }
110}
111
112impl Encode for Publish {
113 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
115 let header = FixedHeader::with_flags(PacketType::Publish, self.flags, self.payload_len());
116 header.encode(buf)?;
117 self.header.encode(buf, self.flags.qos);
118
119 buf.extend_from_slice(&self.payload);
121 Ok(())
122 }
123
124 fn payload_len(&self) -> usize {
128 self.header.encoded_len(self.flags.qos) + self.payload.len()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::codec::PacketCodec;
136 use crate::protocol::QoS;
137 use bytes::BytesMut;
138 use tokio_util::codec::Decoder;
139
140 #[test]
141 fn publish_decode() {
142 let mut codec = PacketCodec::new(None, None);
143
144 let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
145 let data = &[
146 (PacketType::Publish as u8) << 4 | 0b0000_0100, 0x0d, 0x00,
149 0x05,
150 b'/',
151 b't',
152 b'e',
153 b's',
154 b't',
155 0x12,
156 0x34,
157 0xde,
158 0xad,
159 0xbe,
160 0xef,
161 ];
162
163 let mut stream = BytesMut::new();
164
165 stream.extend_from_slice(&data[..]);
166
167 let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
168 let packet = Publish::decode(raw_packet).unwrap();
169
170 assert_eq!(
171 packet,
172 Publish::new(
173 "/test",
174 0x1234,
175 Bytes::copy_from_slice(&payload),
176 Flags::new(QoS::ExactlyOnce)
177 )
178 );
179 }
180
181 #[test]
182 fn publish_encode() {
183 let payload: [u8; 4] = [0xde, 0xad, 0xbe, 0xef];
184 let packet = Publish::new(
185 "/test",
186 0x1234,
187 Bytes::copy_from_slice(&payload),
188 Flags::new(QoS::ExactlyOnce),
189 );
190
191 let mut stream = BytesMut::new();
192 packet.encode(&mut stream).unwrap();
193 assert_eq!(
194 stream,
195 vec![
196 (PacketType::Publish as u8) << 4 | 0b0000_0100, 0x0d, 0x00,
199 0x05,
200 b'/',
201 b't',
202 b'e',
203 b's',
204 b't',
205 0x12,
206 0x34,
207 0xde,
208 0xad,
209 0xbe,
210 0xef,
211 ]
212 );
213 }
214}