mqute_codec/protocol/v4/
subscribe.rs1use crate::Error;
8use crate::codec::util::{decode_byte, decode_string, decode_word, encode_string};
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, traits, util};
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::borrow::Borrow;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct TopicQosFilter {
28 pub topic: String,
30
31 pub qos: QoS,
33}
34
35impl TopicQosFilter {
36 pub fn new<T: Into<String>>(topic: T, qos: QoS) -> Self {
42 let topic = topic.into();
43
44 if !util::is_valid_topic_filter(&topic) {
45 panic!("Invalid topic filter: '{}'", topic);
46 }
47
48 Self { topic, qos }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct TopicQosFilters(Vec<TopicQosFilter>);
69
70#[allow(clippy::len_without_is_empty)]
71impl TopicQosFilters {
72 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(filters: T) -> Self {
80 let values: Vec<TopicQosFilter> = filters.into_iter().collect();
81
82 if values.is_empty() {
83 panic!("At least one topic filter is required");
84 }
85
86 TopicQosFilters(values)
87 }
88
89 pub fn len(&self) -> usize {
91 self.0.len()
92 }
93
94 pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
95 let mut filters = Vec::with_capacity(1);
96
97 while payload.has_remaining() {
98 let filter = decode_string(payload)?;
99
100 if !util::is_valid_topic_filter(&filter) {
101 return Err(Error::InvalidTopicFilter(filter));
102 }
103
104 let flags = decode_byte(payload)?;
105
106 if flags & 0b1111_1100 > 0 {
108 return Err(Error::MalformedPacket);
109 }
110
111 filters.push(TopicQosFilter::new(filter, flags.try_into()?));
112 }
113
114 if filters.is_empty() {
115 return Err(Error::NoTopic);
116 }
117
118 Ok(TopicQosFilters(filters))
119 }
120
121 pub(crate) fn encode(&self, buf: &mut BytesMut) {
122 self.0.iter().for_each(|f| {
123 encode_string(buf, &f.topic);
124 buf.put_u8(f.qos.into());
125 });
126 }
127
128 pub(crate) fn encoded_len(&self) -> usize {
129 self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
130 }
131}
132
133impl AsRef<Vec<TopicQosFilter>> for TopicQosFilters {
134 #[inline]
135 fn as_ref(&self) -> &Vec<TopicQosFilter> {
136 self.0.as_ref()
137 }
138}
139
140impl Borrow<Vec<TopicQosFilter>> for TopicQosFilters {
141 fn borrow(&self) -> &Vec<TopicQosFilter> {
142 self.0.as_ref()
143 }
144}
145
146impl IntoIterator for TopicQosFilters {
147 type Item = TopicQosFilter;
148 type IntoIter = std::vec::IntoIter<TopicQosFilter>;
149
150 fn into_iter(self) -> Self::IntoIter {
151 self.0.into_iter()
152 }
153}
154
155impl FromIterator<TopicQosFilter> for TopicQosFilters {
156 fn from_iter<T: IntoIterator<Item = TopicQosFilter>>(iter: T) -> Self {
157 TopicQosFilters(Vec::from_iter(iter))
158 }
159}
160
161impl From<TopicQosFilters> for Vec<TopicQosFilter> {
162 #[inline]
163 fn from(value: TopicQosFilters) -> Self {
164 value.0
165 }
166}
167
168impl From<Vec<TopicQosFilter>> for TopicQosFilters {
169 #[inline]
170 fn from(value: Vec<TopicQosFilter>) -> Self {
171 TopicQosFilters(value)
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct Subscribe {
192 packet_id: u16,
194
195 filters: TopicQosFilters,
197}
198
199impl Subscribe {
200 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(packet_id: u16, filters: T) -> Self {
206 if packet_id == 0 {
207 panic!("Packet id is zero");
208 }
209
210 let filters = filters.into_iter().collect();
211 Subscribe { packet_id, filters }
212 }
213
214 pub fn packet_id(&self) -> u16 {
216 self.packet_id
217 }
218
219 pub fn filters(&self) -> TopicQosFilters {
221 self.filters.clone()
222 }
223}
224
225impl Decode for Subscribe {
226 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
228 if packet.header.packet_type() != PacketType::Subscribe
230 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
231 {
232 return Err(Error::MalformedPacket);
233 }
234
235 let packet_id = decode_word(&mut packet.payload)?;
236 let filters = TopicQosFilters::decode(&mut packet.payload)?;
237
238 Ok(Subscribe::new(packet_id, filters))
239 }
240}
241
242impl Encode for Subscribe {
243 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
245 let header = FixedHeader::with_flags(
246 PacketType::Subscribe,
247 Flags::new(QoS::AtLeastOnce),
248 self.payload_len(),
249 );
250 header.encode(buf)?;
251 buf.put_u16(self.packet_id);
252 self.filters.encode(buf);
253
254 Ok(())
255 }
256
257 fn payload_len(&self) -> usize {
259 2 + self.filters.encoded_len()
261 }
262}
263
264impl traits::Subscribe for Subscribe {}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::codec::PacketCodec;
270 use crate::protocol::QoS;
271 use bytes::BytesMut;
272 use tokio_util::codec::Decoder;
273
274 #[test]
275 fn subscribe_decode() {
276 let mut codec = PacketCodec::new(None, None);
277
278 let data = &[
279 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
282 0x34,
283 0x00,
284 0x02,
285 b'/',
286 b'a',
287 0x00,
288 0x00,
289 0x02,
290 b'/',
291 b'b',
292 0x02,
293 ];
294
295 let mut stream = BytesMut::new();
296
297 stream.extend_from_slice(&data[..]);
298
299 let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
300 let packet = Subscribe::decode(raw_packet).unwrap();
301
302 assert_eq!(
303 packet,
304 Subscribe::new(
305 0x1234,
306 vec![
307 TopicQosFilter::new("/a", QoS::AtMostOnce),
308 TopicQosFilter::new("/b", QoS::ExactlyOnce)
309 ]
310 )
311 );
312 }
313
314 #[test]
315 fn subscribe_encode() {
316 let packet = Subscribe::new(
317 0x1234,
318 vec![
319 TopicQosFilter::new("/a", QoS::AtMostOnce),
320 TopicQosFilter::new("/b", QoS::ExactlyOnce),
321 ],
322 );
323
324 let mut stream = BytesMut::new();
325 packet.encode(&mut stream).unwrap();
326 assert_eq!(
327 stream,
328 vec![
329 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
332 0x34,
333 0x00,
334 0x02,
335 b'/',
336 b'a',
337 0x00,
338 0x00,
339 0x02,
340 b'/',
341 b'b',
342 0x02,
343 ]
344 );
345 }
346}