mqute_codec/protocol/v4/
subscribe.rs1use crate::codec::util::{decode_byte, decode_string, decode_word, encode_string};
8use crate::codec::{Decode, Encode, RawPacket};
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::borrow::Borrow;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct TopicQosFilter {
28 pub topic: String,
30
31 pub qos: QoS,
33}
34
35impl TopicQosFilter {
36 pub fn new<T: Into<String>>(topic: T, qos: QoS) -> Self {
38 Self {
39 topic: topic.into(),
40 qos,
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct TopicQosFilters(Vec<TopicQosFilter>);
62
63#[allow(clippy::len_without_is_empty)]
64impl TopicQosFilters {
65 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(filters: T) -> Self {
71 let values: Vec<TopicQosFilter> = filters.into_iter().collect();
72
73 if values.is_empty() {
74 panic!("At least one topic filter is required");
75 }
76
77 TopicQosFilters(values)
78 }
79
80 pub fn len(&self) -> usize {
82 self.0.len()
83 }
84
85 pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
86 let mut filters = Vec::with_capacity(1);
87
88 while payload.has_remaining() {
89 let filter = decode_string(payload)?;
90 let flags = decode_byte(payload)?;
91
92 if flags & 0b1111_1100 > 0 {
94 return Err(Error::MalformedPacket);
95 }
96
97 filters.push(TopicQosFilter::new(filter, flags.try_into()?));
98 }
99
100 if filters.is_empty() {
101 return Err(Error::NoTopic);
102 }
103
104 Ok(TopicQosFilters(filters))
105 }
106
107 pub(crate) fn encode(&self, buf: &mut BytesMut) {
108 self.0.iter().for_each(|f| {
109 encode_string(buf, &f.topic);
110 buf.put_u8(f.qos.into());
111 });
112 }
113
114 pub(crate) fn encoded_len(&self) -> usize {
115 self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
116 }
117}
118
119impl AsRef<Vec<TopicQosFilter>> for TopicQosFilters {
120 #[inline]
121 fn as_ref(&self) -> &Vec<TopicQosFilter> {
122 self.0.as_ref()
123 }
124}
125
126impl Borrow<Vec<TopicQosFilter>> for TopicQosFilters {
127 fn borrow(&self) -> &Vec<TopicQosFilter> {
128 self.0.as_ref()
129 }
130}
131
132impl IntoIterator for TopicQosFilters {
133 type Item = TopicQosFilter;
134 type IntoIter = std::vec::IntoIter<TopicQosFilter>;
135
136 fn into_iter(self) -> Self::IntoIter {
137 self.0.into_iter()
138 }
139}
140
141impl FromIterator<TopicQosFilter> for TopicQosFilters {
142 fn from_iter<T: IntoIterator<Item = TopicQosFilter>>(iter: T) -> Self {
143 TopicQosFilters(Vec::from_iter(iter))
144 }
145}
146
147impl From<TopicQosFilters> for Vec<TopicQosFilter> {
148 #[inline]
149 fn from(value: TopicQosFilters) -> Self {
150 value.0
151 }
152}
153
154impl From<Vec<TopicQosFilter>> for TopicQosFilters {
155 #[inline]
156 fn from(value: Vec<TopicQosFilter>) -> Self {
157 TopicQosFilters(value)
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Subscribe {
178 packet_id: u16,
180
181 filters: TopicQosFilters,
183}
184
185impl Subscribe {
186 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(packet_id: u16, filters: T) -> Self {
192 if packet_id == 0 {
193 panic!("Packet id is zero");
194 }
195
196 let filters = filters.into_iter().collect();
197 Subscribe { packet_id, filters }
198 }
199
200 pub fn packet_id(&self) -> u16 {
202 self.packet_id
203 }
204
205 pub fn filters(&self) -> TopicQosFilters {
207 self.filters.clone()
208 }
209}
210
211impl Decode for Subscribe {
212 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
214 if packet.header.packet_type() != PacketType::Subscribe
216 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
217 {
218 return Err(Error::MalformedPacket);
219 }
220
221 let packet_id = decode_word(&mut packet.payload)?;
222 let filters = TopicQosFilters::decode(&mut packet.payload)?;
223
224 Ok(Subscribe::new(packet_id, filters))
225 }
226}
227
228impl Encode for Subscribe {
229 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
231 let header = FixedHeader::with_flags(
232 PacketType::Subscribe,
233 Flags::new(QoS::AtLeastOnce),
234 self.payload_len(),
235 );
236 header.encode(buf)?;
237 buf.put_u16(self.packet_id);
238 self.filters.encode(buf);
239
240 Ok(())
241 }
242
243 fn payload_len(&self) -> usize {
245 2 + self.filters.encoded_len()
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use crate::codec::PacketCodec;
254 use crate::protocol::QoS;
255 use bytes::BytesMut;
256 use tokio_util::codec::Decoder;
257
258 #[test]
259 fn subscribe_decode() {
260 let mut codec = PacketCodec::new(None, None);
261
262 let data = &[
263 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
266 0x34,
267 0x00,
268 0x02,
269 b'/',
270 b'a',
271 0x00,
272 0x00,
273 0x02,
274 b'/',
275 b'b',
276 0x02,
277 ];
278
279 let mut stream = BytesMut::new();
280
281 stream.extend_from_slice(&data[..]);
282
283 let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
284 let packet = Subscribe::decode(raw_packet).unwrap();
285
286 assert_eq!(
287 packet,
288 Subscribe::new(
289 0x1234,
290 vec![
291 TopicQosFilter::new("/a", QoS::AtMostOnce),
292 TopicQosFilter::new("/b", QoS::ExactlyOnce)
293 ]
294 )
295 );
296 }
297
298 #[test]
299 fn subscribe_encode() {
300 let packet = Subscribe::new(
301 0x1234,
302 vec![
303 TopicQosFilter::new("/a", QoS::AtMostOnce),
304 TopicQosFilter::new("/b", QoS::ExactlyOnce),
305 ],
306 );
307
308 let mut stream = BytesMut::new();
309 packet.encode(&mut stream).unwrap();
310 assert_eq!(
311 stream,
312 vec![
313 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
316 0x34,
317 0x00,
318 0x02,
319 b'/',
320 b'a',
321 0x00,
322 0x00,
323 0x02,
324 b'/',
325 b'b',
326 0x02,
327 ]
328 );
329 }
330}