mqute_codec/protocol/v4/
subscribe.rs1use crate::Error;
8use crate::codec::util::{decode_byte, decode_string, decode_word, encode_string};
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, util};
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::borrow::Borrow;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct TopicQosFilter {
28 pub topic: String,
30
31 pub qos: QoS,
33}
34
35impl TopicQosFilter {
36 pub fn new<T: Into<String>>(topic: T, qos: QoS) -> Self {
42 let topic = topic.into();
43
44 if !util::is_valid_topic_filter(&topic) {
45 panic!("Invalid topic filter: '{}'", topic);
46 }
47
48 Self { topic, qos }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct TopicQosFilters(Vec<TopicQosFilter>);
69
70#[allow(clippy::len_without_is_empty)]
71impl TopicQosFilters {
72 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(filters: T) -> Self {
80 let values: Vec<TopicQosFilter> = filters.into_iter().collect();
81
82 if values.is_empty() {
83 panic!("At least one topic filter is required");
84 }
85
86 TopicQosFilters(values)
87 }
88
89 pub fn len(&self) -> usize {
91 self.0.len()
92 }
93
94 pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
95 let mut filters = Vec::with_capacity(1);
96
97 while payload.has_remaining() {
98 let filter = decode_string(payload)?;
99
100 if !util::is_valid_topic_filter(&filter) {
101 return Err(Error::InvalidTopicFilter(filter));
102 }
103
104 let flags = decode_byte(payload)?;
105
106 if flags & 0b1111_1100 > 0 {
108 return Err(Error::MalformedPacket);
109 }
110
111 filters.push(TopicQosFilter::new(filter, flags.try_into()?));
112 }
113
114 if filters.is_empty() {
115 return Err(Error::NoTopic);
116 }
117
118 Ok(TopicQosFilters(filters))
119 }
120
121 pub(crate) fn encode(&self, buf: &mut BytesMut) {
122 self.0.iter().for_each(|f| {
123 encode_string(buf, &f.topic);
124 buf.put_u8(f.qos.into());
125 });
126 }
127
128 pub(crate) fn encoded_len(&self) -> usize {
129 self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
130 }
131}
132
133impl AsRef<Vec<TopicQosFilter>> for TopicQosFilters {
134 #[inline]
135 fn as_ref(&self) -> &Vec<TopicQosFilter> {
136 self.0.as_ref()
137 }
138}
139
140impl Borrow<Vec<TopicQosFilter>> for TopicQosFilters {
141 fn borrow(&self) -> &Vec<TopicQosFilter> {
142 self.0.as_ref()
143 }
144}
145
146impl IntoIterator for TopicQosFilters {
147 type Item = TopicQosFilter;
148 type IntoIter = std::vec::IntoIter<TopicQosFilter>;
149
150 fn into_iter(self) -> Self::IntoIter {
151 self.0.into_iter()
152 }
153}
154
155impl FromIterator<TopicQosFilter> for TopicQosFilters {
156 fn from_iter<T: IntoIterator<Item = TopicQosFilter>>(iter: T) -> Self {
157 TopicQosFilters(Vec::from_iter(iter))
158 }
159}
160
161impl From<TopicQosFilters> for Vec<TopicQosFilter> {
162 #[inline]
163 fn from(value: TopicQosFilters) -> Self {
164 value.0
165 }
166}
167
168impl From<Vec<TopicQosFilter>> for TopicQosFilters {
169 #[inline]
170 fn from(value: Vec<TopicQosFilter>) -> Self {
171 TopicQosFilters(value)
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct Subscribe {
192 packet_id: u16,
194
195 filters: TopicQosFilters,
197}
198
199impl Subscribe {
200 pub fn new<T: IntoIterator<Item = TopicQosFilter>>(packet_id: u16, filters: T) -> Self {
206 if packet_id == 0 {
207 panic!("Packet id is zero");
208 }
209
210 let filters = filters.into_iter().collect();
211 Subscribe { packet_id, filters }
212 }
213
214 pub fn packet_id(&self) -> u16 {
216 self.packet_id
217 }
218
219 pub fn filters(&self) -> TopicQosFilters {
221 self.filters.clone()
222 }
223}
224
225impl Decode for Subscribe {
226 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
228 if packet.header.packet_type() != PacketType::Subscribe
230 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
231 {
232 return Err(Error::MalformedPacket);
233 }
234
235 let packet_id = decode_word(&mut packet.payload)?;
236 let filters = TopicQosFilters::decode(&mut packet.payload)?;
237
238 Ok(Subscribe::new(packet_id, filters))
239 }
240}
241
242impl Encode for Subscribe {
243 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
245 let header = FixedHeader::with_flags(
246 PacketType::Subscribe,
247 Flags::new(QoS::AtLeastOnce),
248 self.payload_len(),
249 );
250 header.encode(buf)?;
251 buf.put_u16(self.packet_id);
252 self.filters.encode(buf);
253
254 Ok(())
255 }
256
257 fn payload_len(&self) -> usize {
259 2 + self.filters.encoded_len()
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use crate::codec::PacketCodec;
268 use crate::protocol::QoS;
269 use bytes::BytesMut;
270 use tokio_util::codec::Decoder;
271
272 #[test]
273 fn subscribe_decode() {
274 let mut codec = PacketCodec::new(None, None);
275
276 let data = &[
277 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
280 0x34,
281 0x00,
282 0x02,
283 b'/',
284 b'a',
285 0x00,
286 0x00,
287 0x02,
288 b'/',
289 b'b',
290 0x02,
291 ];
292
293 let mut stream = BytesMut::new();
294
295 stream.extend_from_slice(&data[..]);
296
297 let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
298 let packet = Subscribe::decode(raw_packet).unwrap();
299
300 assert_eq!(
301 packet,
302 Subscribe::new(
303 0x1234,
304 vec![
305 TopicQosFilter::new("/a", QoS::AtMostOnce),
306 TopicQosFilter::new("/b", QoS::ExactlyOnce)
307 ]
308 )
309 );
310 }
311
312 #[test]
313 fn subscribe_encode() {
314 let packet = Subscribe::new(
315 0x1234,
316 vec![
317 TopicQosFilter::new("/a", QoS::AtMostOnce),
318 TopicQosFilter::new("/b", QoS::ExactlyOnce),
319 ],
320 );
321
322 let mut stream = BytesMut::new();
323 packet.encode(&mut stream).unwrap();
324 assert_eq!(
325 stream,
326 vec![
327 (PacketType::Subscribe as u8) << 4 | 0b0010, 0x0c, 0x12,
330 0x34,
331 0x00,
332 0x02,
333 b'/',
334 b'a',
335 0x00,
336 0x00,
337 0x02,
338 b'/',
339 b'b',
340 0x02,
341 ]
342 );
343 }
344}