ntex_mqtt/v5/codec/packet/
subscribe.rs

1use std::num::{NonZeroU16, NonZeroU32};
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use super::ack_props;
6use crate::error::{DecodeError, EncodeError};
7use crate::types::QoS;
8use crate::utils::{self, Decode, Encode, write_variable_length};
9use crate::v5::codec::{UserProperties, UserProperty, encode::*, property_type as pt};
10
11/// Represents SUBSCRIBE packet
12#[derive(Debug, PartialEq, Eq, Clone)]
13pub struct Subscribe {
14    /// Packet Identifier
15    pub packet_id: NonZeroU16,
16    /// Subscription Identifier
17    pub id: Option<NonZeroU32>,
18    pub user_properties: UserProperties,
19    /// the list of Topic Filters and QoS to which the Client wants to subscribe.
20    pub topic_filters: Vec<(ByteString, SubscriptionOptions)>,
21}
22
23#[derive(Debug, PartialEq, Eq, Copy, Clone)]
24pub struct SubscriptionOptions {
25    pub qos: QoS,
26    pub no_local: bool,
27    pub retain_as_published: bool,
28    pub retain_handling: RetainHandling,
29}
30
31impl Default for SubscriptionOptions {
32    fn default() -> Self {
33        Self {
34            qos: QoS::AtMostOnce,
35            no_local: false,
36            retain_as_published: false,
37            retain_handling: RetainHandling::AtSubscribe,
38        }
39    }
40}
41
42prim_enum! {
43    pub enum RetainHandling {
44        AtSubscribe = 0,
45        AtSubscribeNew = 1,
46        NoAtSubscribe = 2
47    }
48}
49
50/// Represents SUBACK packet
51#[derive(Debug, PartialEq, Eq, Clone)]
52pub struct SubscribeAck {
53    pub packet_id: NonZeroU16,
54    pub properties: UserProperties,
55    pub reason_string: Option<ByteString>,
56    /// corresponds to a Topic Filter in the SUBSCRIBE Packet being acknowledged.
57    pub status: Vec<SubscribeAckReason>,
58}
59
60/// Represents UNSUBSCRIBE packet
61#[derive(Debug, PartialEq, Eq, Clone)]
62pub struct Unsubscribe {
63    /// Packet Identifier
64    pub packet_id: NonZeroU16,
65    pub user_properties: UserProperties,
66    /// the list of Topic Filters that the Client wishes to unsubscribe from.
67    pub topic_filters: Vec<ByteString>,
68}
69
70/// Represents UNSUBACK packet
71#[derive(Debug, PartialEq, Eq, Clone)]
72pub struct UnsubscribeAck {
73    /// Packet Identifier
74    pub packet_id: NonZeroU16,
75    pub properties: UserProperties,
76    pub reason_string: Option<ByteString>,
77    pub status: Vec<UnsubscribeAckReason>,
78}
79
80prim_enum! {
81    /// SUBACK reason codes
82    pub enum SubscribeAckReason {
83        GrantedQos0 = 0,
84        GrantedQos1 = 1,
85        GrantedQos2 = 2,
86        UnspecifiedError = 128,
87        ImplementationSpecificError = 131,
88        NotAuthorized = 135,
89        TopicFilterInvalid = 143,
90        PacketIdentifierInUse = 145,
91        QuotaExceeded = 151,
92        SharedSubscriptionNotSupported = 158,
93        SubscriptionIdentifiersNotSupported = 161,
94        WildcardSubscriptionsNotSupported = 162
95    }
96}
97
98prim_enum! {
99    /// UNSUBACK reason codes
100    pub enum UnsubscribeAckReason {
101        Success = 0,
102        NoSubscriptionExisted = 17,
103        UnspecifiedError = 128,
104        ImplementationSpecificError = 131,
105        NotAuthorized = 135,
106        TopicFilterInvalid = 143,
107        PacketIdentifierInUse = 145
108    }
109}
110
111impl Subscribe {
112    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
113        let packet_id = NonZeroU16::decode(src)?;
114        let prop_src = &mut utils::take_properties(src)?;
115        let mut sub_id = None;
116        let mut user_properties = Vec::new();
117        while prop_src.has_remaining() {
118            let prop_id = prop_src.get_u8();
119            match prop_id {
120                pt::SUB_ID => {
121                    ensure!(sub_id.is_none(), DecodeError::MalformedPacket); // can't appear twice
122                    let val = utils::decode_variable_length_cursor(prop_src)?;
123                    sub_id = Some(NonZeroU32::new(val).ok_or(DecodeError::MalformedPacket)?);
124                }
125                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
126                _ => return Err(DecodeError::MalformedPacket),
127            }
128        }
129
130        let mut topic_filters = Vec::new();
131        while src.has_remaining() {
132            let topic = ByteString::decode(src)?;
133            let opts = SubscriptionOptions::decode(src)?;
134            topic_filters.push((topic, opts));
135        }
136
137        Ok(Self { packet_id, id: sub_id, user_properties, topic_filters })
138    }
139}
140
141impl SubscribeAck {
142    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
143        let packet_id = NonZeroU16::decode(src)?;
144        let (properties, reason_string) = ack_props::decode(src)?;
145        let mut status = Vec::with_capacity(src.remaining());
146        for code in src.as_ref().iter().copied() {
147            status.push(code.try_into()?);
148        }
149        Ok(Self { packet_id, properties, reason_string, status })
150    }
151}
152
153impl Unsubscribe {
154    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
155        let packet_id = NonZeroU16::decode(src)?;
156
157        let prop_src = &mut utils::take_properties(src)?;
158        let mut user_properties = Vec::new();
159        while prop_src.has_remaining() {
160            let prop_id = prop_src.get_u8();
161            match prop_id {
162                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
163                _ => return Err(DecodeError::MalformedPacket),
164            }
165        }
166
167        let mut topic_filters = Vec::new();
168        while src.remaining() > 0 {
169            topic_filters.push(ByteString::decode(src)?);
170        }
171
172        Ok(Self { packet_id, user_properties, topic_filters })
173    }
174}
175
176impl UnsubscribeAck {
177    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
178        let packet_id = NonZeroU16::decode(src)?;
179        let (properties, reason_string) = ack_props::decode(src)?;
180        let mut status = Vec::with_capacity(src.remaining());
181        for code in src.as_ref().iter().copied() {
182            status.push(code.try_into()?);
183        }
184        Ok(Self { packet_id, properties, reason_string, status })
185    }
186}
187
188impl EncodeLtd for Subscribe {
189    fn encoded_size(&self, _limit: u32) -> usize {
190        let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize) as usize) // +1 to account for property type byte
191            + self.user_properties.encoded_size();
192        let payload_len = self
193            .topic_filters
194            .iter()
195            .fold(0, |acc, (filter, _opts)| acc + filter.encoded_size() + 1);
196        self.packet_id.encoded_size() + var_int_len(prop_len) as usize + prop_len + payload_len
197    }
198
199    fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> {
200        self.packet_id.encode(buf)?;
201
202        // encode properties
203        let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize))
204            + self.user_properties.encoded_size() as u32; // safe: size was already checked against maximum
205        utils::write_variable_length(prop_len, buf);
206
207        if let Some(id) = self.id {
208            buf.put_u8(pt::SUB_ID);
209            write_variable_length(id.get(), buf);
210        }
211
212        self.user_properties.encode(buf)?;
213
214        // payload
215        for (filter, opts) in self.topic_filters.iter() {
216            filter.encode(buf)?;
217            opts.encode(buf)?;
218        }
219
220        Ok(())
221    }
222}
223
224impl Decode for SubscriptionOptions {
225    fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
226        ensure!(src.has_remaining(), DecodeError::InvalidLength);
227        let val = src.get_u8();
228        let qos = (val & 0b0000_0011).try_into()?;
229        let retain_handling = ((val & 0b0011_0000) >> 4).try_into()?;
230        Ok(SubscriptionOptions {
231            qos,
232            no_local: val & 0b0000_0100 != 0,
233            retain_as_published: val & 0b0000_1000 != 0,
234            retain_handling,
235        })
236    }
237}
238
239impl Encode for SubscriptionOptions {
240    fn encoded_size(&self) -> usize {
241        1
242    }
243    fn encode(&self, buf: &mut BytesMut) -> Result<(), EncodeError> {
244        buf.put_u8(
245            u8::from(self.qos)
246                | ((self.no_local as u8) << 2)
247                | ((self.retain_as_published as u8) << 3)
248                | (u8::from(self.retain_handling) << 4),
249        );
250        Ok(())
251    }
252}
253
254impl EncodeLtd for SubscribeAck {
255    fn encoded_size(&self, limit: u32) -> usize {
256        let len = self.status.len();
257        if len > (u32::MAX - 2) as usize {
258            return usize::MAX; // bail to avoid overflow
259        }
260
261        2 + ack_props::encoded_size(
262            &self.properties,
263            &self.reason_string,
264            limit - 2 - len as u32,
265        ) + len
266    }
267
268    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
269        self.packet_id.encode(buf)?;
270        let len = self.status.len() as u32; // safe: max size checked already
271        ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
272        for &reason in self.status.iter() {
273            buf.put_u8(reason.into());
274        }
275        Ok(())
276    }
277}
278
279impl EncodeLtd for Unsubscribe {
280    fn encoded_size(&self, _limit: u32) -> usize {
281        let prop_len = self.user_properties.encoded_size();
282        2 + var_int_len(prop_len) as usize
283            + prop_len
284            + self.topic_filters.iter().fold(0, |acc, filter| acc + 2 + filter.len())
285    }
286
287    fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> {
288        self.packet_id.encode(buf)?;
289
290        // properties
291        let prop_len = self.user_properties.encoded_size();
292        utils::write_variable_length(prop_len as u32, buf); // safe: max size check is done already
293        self.user_properties.encode(buf)?;
294
295        // payload
296        for filter in self.topic_filters.iter() {
297            filter.encode(buf)?;
298        }
299        Ok(())
300    }
301}
302
303impl EncodeLtd for UnsubscribeAck {
304    // todo: almost identical to SUBACK
305    fn encoded_size(&self, limit: u32) -> usize {
306        let len = self.status.len();
307        2 + len
308            + ack_props::encoded_size(
309                &self.properties,
310                &self.reason_string,
311                reduce_limit(limit, 2 + len),
312            )
313    }
314
315    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
316        self.packet_id.encode(buf)?;
317        let len = self.status.len() as u32;
318
319        ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
320        for &reason in self.status.iter() {
321            buf.put_u8(reason.into());
322        }
323        Ok(())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use ntex_codec::{Decoder, Encoder};
330
331    use super::super::super::{Codec, Decoded, Packet};
332    use super::*;
333
334    fn packet(res: Decoded) -> Packet {
335        match res {
336            Decoded::Packet(pkt, _) => pkt,
337            _ => panic!(),
338        }
339    }
340
341    #[test]
342    fn test_sub() {
343        let pkt = Subscribe {
344            packet_id: 12.try_into().unwrap(),
345            id: Some(10.try_into().unwrap()),
346            user_properties: vec![("a".into(), "1".into())],
347            topic_filters: vec![("test".into(), SubscriptionOptions::default())],
348        };
349
350        let size = pkt.encoded_size(99999);
351        let mut buf = BytesMut::with_capacity(size);
352        pkt.encode(&mut buf, size as u32).unwrap();
353        assert_eq!(buf.len(), size);
354        assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap());
355
356        let pkt = Unsubscribe {
357            packet_id: 12.try_into().unwrap(),
358            user_properties: vec![("a".into(), "1".into())],
359            topic_filters: vec!["test".into()],
360        };
361
362        let size = pkt.encoded_size(99999);
363        let mut buf = BytesMut::with_capacity(size);
364        pkt.encode(&mut buf, size as u32).unwrap();
365        assert_eq!(buf.len(), size);
366        assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap());
367    }
368
369    #[test]
370    fn test_sub_pkt() {
371        let pkt = Packet::Subscribe(Subscribe {
372            packet_id: 12.try_into().unwrap(),
373            id: None,
374            user_properties: vec![("a".into(), "1".into())],
375            topic_filters: vec![("test".into(), SubscriptionOptions::default())],
376        });
377        let codec = Codec::new();
378
379        let mut buf = BytesMut::new();
380        codec.encode(pkt.clone().into(), &mut buf).unwrap();
381
382        assert_eq!(pkt, packet(codec.decode(&mut buf).unwrap().unwrap()));
383    }
384
385    #[test]
386    fn test_sub_ack() {
387        let ack = SubscribeAck {
388            packet_id: NonZeroU16::new(1).unwrap(),
389            properties: Vec::new(),
390            reason_string: Some("some reason".into()),
391            status: Vec::new(),
392        };
393
394        let size = ack.encoded_size(99999);
395        let mut buf = BytesMut::with_capacity(size);
396        ack.encode(&mut buf, size as u32).unwrap();
397        assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
398
399        let ack = SubscribeAck {
400            packet_id: NonZeroU16::new(1).unwrap(),
401            properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
402            reason_string: None,
403            status: vec![SubscribeAckReason::GrantedQos0],
404        };
405        let size = ack.encoded_size(99999);
406        let mut buf = BytesMut::with_capacity(size);
407        ack.encode(&mut buf, size as u32).unwrap();
408        assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
409
410        let ack = UnsubscribeAck {
411            packet_id: NonZeroU16::new(1).unwrap(),
412            properties: Vec::new(),
413            reason_string: Some("some reason".into()),
414            status: Vec::new(),
415        };
416        let mut buf = BytesMut::new();
417        let size = ack.encoded_size(99999);
418        ack.encode(&mut buf, size as u32).unwrap();
419        assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
420
421        let ack = UnsubscribeAck {
422            packet_id: NonZeroU16::new(1).unwrap(),
423            properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
424            reason_string: None,
425            status: vec![UnsubscribeAckReason::Success],
426        };
427        let size = ack.encoded_size(99999);
428        let mut buf = BytesMut::with_capacity(size);
429        ack.encode(&mut buf, size as u32).unwrap();
430        assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
431    }
432}