rmqtt_codec/v5/packet/
subscribe.rs

1use std::num::{NonZeroU16, NonZeroU32};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use super::ack_props;
8use crate::error::{DecodeError, EncodeError};
9use crate::types::QoS;
10use crate::utils::{self, write_variable_length, Decode, Encode};
11use crate::v5::{encode::*, property_type as pt, UserProperties, UserProperty};
12
13/// Represents SUBSCRIBE packet
14#[derive(Debug, PartialEq, Eq, Clone)]
15pub struct Subscribe {
16    /// Packet Identifier
17    pub packet_id: NonZeroU16,
18    /// Subscription Identifier
19    pub id: Option<NonZeroU32>,
20    pub user_properties: UserProperties,
21    /// the list of Topic Filters and QoS to which the Client wants to subscribe.
22    pub topic_filters: Vec<(ByteString, SubscriptionOptions)>,
23}
24
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub struct SubscriptionOptions {
27    pub qos: QoS,
28    pub no_local: bool,
29    pub retain_as_published: bool,
30    pub retain_handling: RetainHandling,
31}
32
33impl Default for SubscriptionOptions {
34    fn default() -> Self {
35        Self {
36            qos: QoS::AtMostOnce,
37            no_local: false,
38            retain_as_published: false,
39            retain_handling: RetainHandling::AtSubscribe,
40        }
41    }
42}
43
44prim_enum! {
45    pub enum RetainHandling {
46        AtSubscribe = 0,
47        AtSubscribeNew = 1,
48        NoAtSubscribe = 2
49    }
50}
51
52impl From<RetainHandling> for u8 {
53    fn from(v: RetainHandling) -> Self {
54        match v {
55            RetainHandling::AtSubscribe => 0,
56            RetainHandling::AtSubscribeNew => 1,
57            RetainHandling::NoAtSubscribe => 2,
58        }
59    }
60}
61
62/// Represents SUBACK packet
63#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
64pub struct SubscribeAck {
65    pub packet_id: NonZeroU16,
66    pub properties: UserProperties,
67    pub reason_string: Option<ByteString>,
68    /// corresponds to a Topic Filter in the SUBSCRIBE Packet being acknowledged.
69    pub status: Vec<SubscribeAckReason>,
70}
71
72/// Represents UNSUBSCRIBE packet
73#[derive(Debug, PartialEq, Eq, Clone)]
74pub struct Unsubscribe {
75    /// Packet Identifier
76    pub packet_id: NonZeroU16,
77    pub user_properties: UserProperties,
78    /// the list of Topic Filters that the Client wishes to unsubscribe from.
79    pub topic_filters: Vec<ByteString>,
80}
81
82/// Represents UNSUBACK packet
83#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
84pub struct UnsubscribeAck {
85    /// Packet Identifier
86    pub packet_id: NonZeroU16,
87    pub properties: UserProperties,
88    pub reason_string: Option<ByteString>,
89    pub status: Vec<UnsubscribeAckReason>,
90}
91
92prim_enum! {
93    /// SUBACK reason codes
94    #[derive(Deserialize, Serialize)]
95    pub enum SubscribeAckReason {
96        GrantedQos0 = 0,
97        GrantedQos1 = 1,
98        GrantedQos2 = 2,
99        UnspecifiedError = 128,
100        ImplementationSpecificError = 131,
101        NotAuthorized = 135,
102        TopicFilterInvalid = 143,
103        PacketIdentifierInUse = 145,
104        QuotaExceeded = 151,
105        SharedSubscriptionNotSupported = 158,
106        SubscriptionIdentifiersNotSupported = 161,
107        WildcardSubscriptionsNotSupported = 162
108    }
109}
110
111impl From<SubscribeAckReason> for u8 {
112    fn from(v: SubscribeAckReason) -> Self {
113        match v {
114            SubscribeAckReason::GrantedQos0 => 0,
115            SubscribeAckReason::GrantedQos1 => 1,
116            SubscribeAckReason::GrantedQos2 => 2,
117            SubscribeAckReason::UnspecifiedError => 128,
118            SubscribeAckReason::ImplementationSpecificError => 131,
119            SubscribeAckReason::NotAuthorized => 135,
120            SubscribeAckReason::TopicFilterInvalid => 143,
121            SubscribeAckReason::PacketIdentifierInUse => 145,
122            SubscribeAckReason::QuotaExceeded => 151,
123            SubscribeAckReason::SharedSubscriptionNotSupported => 158,
124            SubscribeAckReason::SubscriptionIdentifiersNotSupported => 161,
125            SubscribeAckReason::WildcardSubscriptionsNotSupported => 162,
126        }
127    }
128}
129
130prim_enum! {
131    /// UNSUBACK reason codes
132    #[derive(Deserialize, Serialize)]
133    pub enum UnsubscribeAckReason {
134        Success = 0,
135        NoSubscriptionExisted = 17,
136        UnspecifiedError = 128,
137        ImplementationSpecificError = 131,
138        NotAuthorized = 135,
139        TopicFilterInvalid = 143,
140        PacketIdentifierInUse = 145
141    }
142}
143
144impl From<UnsubscribeAckReason> for u8 {
145    fn from(v: UnsubscribeAckReason) -> Self {
146        match v {
147            UnsubscribeAckReason::Success => 0,
148            UnsubscribeAckReason::NoSubscriptionExisted => 17,
149            UnsubscribeAckReason::UnspecifiedError => 128,
150            UnsubscribeAckReason::ImplementationSpecificError => 131,
151            UnsubscribeAckReason::NotAuthorized => 135,
152            UnsubscribeAckReason::TopicFilterInvalid => 143,
153            UnsubscribeAckReason::PacketIdentifierInUse => 145,
154        }
155    }
156}
157
158impl Subscribe {
159    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
160        let packet_id = NonZeroU16::decode(src)?;
161        let prop_src = &mut utils::take_properties(src)?;
162        let mut sub_id = None;
163        let mut user_properties = Vec::new();
164        while prop_src.has_remaining() {
165            let prop_id = prop_src.get_u8();
166            match prop_id {
167                pt::SUB_ID => {
168                    ensure!(sub_id.is_none(), DecodeError::MalformedPacket); // can't appear twice
169                    let val = utils::decode_variable_length_cursor(prop_src)?;
170                    sub_id = Some(NonZeroU32::new(val).ok_or(DecodeError::MalformedPacket)?);
171                }
172                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
173                _ => return Err(DecodeError::MalformedPacket),
174            }
175        }
176
177        let mut topic_filters = Vec::new();
178        while src.has_remaining() {
179            let topic = ByteString::decode(src)?;
180            let opts = SubscriptionOptions::decode(src)?;
181            topic_filters.push((topic, opts));
182        }
183
184        Ok(Self { packet_id, id: sub_id, user_properties, topic_filters })
185    }
186}
187
188impl SubscribeAck {
189    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
190        let packet_id = NonZeroU16::decode(src)?;
191        let (properties, reason_string) = ack_props::decode(src)?;
192        let mut status = Vec::with_capacity(src.remaining());
193        for code in src.as_ref().iter().copied() {
194            status.push(code.try_into()?);
195        }
196        Ok(Self { packet_id, properties, reason_string, status })
197    }
198}
199
200impl Unsubscribe {
201    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
202        let packet_id = NonZeroU16::decode(src)?;
203
204        let prop_src = &mut utils::take_properties(src)?;
205        let mut user_properties = Vec::new();
206        while prop_src.has_remaining() {
207            let prop_id = prop_src.get_u8();
208            match prop_id {
209                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
210                _ => return Err(DecodeError::MalformedPacket),
211            }
212        }
213
214        let mut topic_filters = Vec::new();
215        while src.remaining() > 0 {
216            topic_filters.push(ByteString::decode(src)?);
217        }
218
219        Ok(Self { packet_id, user_properties, topic_filters })
220    }
221}
222
223impl UnsubscribeAck {
224    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
225        let packet_id = NonZeroU16::decode(src)?;
226        let (properties, reason_string) = ack_props::decode(src)?;
227        let mut status = Vec::with_capacity(src.remaining());
228        for code in src.as_ref().iter().copied() {
229            status.push(code.try_into()?);
230        }
231        Ok(Self { packet_id, properties, reason_string, status })
232    }
233}
234
235impl EncodeLtd for Subscribe {
236    fn encoded_size(&self, _limit: u32) -> usize {
237        let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize) as usize) // +1 to account for property type byte
238            + self.user_properties.encoded_size();
239        let payload_len =
240            self.topic_filters.iter().fold(0, |acc, (filter, _opts)| acc + filter.encoded_size() + 1);
241        self.packet_id.encoded_size() + var_int_len(prop_len) as usize + prop_len + payload_len
242    }
243
244    fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> {
245        self.packet_id.encode(buf)?;
246
247        // encode properties
248        let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize))
249            + self.user_properties.encoded_size() as u32; // safe: size was already checked against maximum
250        utils::write_variable_length(prop_len, buf);
251
252        if let Some(id) = self.id {
253            buf.put_u8(pt::SUB_ID);
254            write_variable_length(id.get(), buf);
255        }
256
257        self.user_properties.encode(buf)?;
258
259        // payload
260        for (filter, opts) in self.topic_filters.iter() {
261            filter.encode(buf)?;
262            opts.encode(buf)?;
263        }
264
265        Ok(())
266    }
267}
268
269impl Decode for SubscriptionOptions {
270    fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
271        ensure!(src.has_remaining(), DecodeError::InvalidLength);
272        let val = src.get_u8();
273        let qos = (val & 0b0000_0011).try_into()?;
274        let retain_handling = ((val & 0b0011_0000) >> 4).try_into()?;
275        Ok(SubscriptionOptions {
276            qos,
277            no_local: val & 0b0000_0100 != 0,
278            retain_as_published: val & 0b0000_1000 != 0,
279            retain_handling,
280        })
281    }
282}
283
284impl Encode for SubscriptionOptions {
285    fn encoded_size(&self) -> usize {
286        1
287    }
288    fn encode(&self, buf: &mut BytesMut) -> Result<(), EncodeError> {
289        buf.put_u8(
290            u8::from(self.qos)
291                | ((self.no_local as u8) << 2)
292                | ((self.retain_as_published as u8) << 3)
293                | (u8::from(self.retain_handling) << 4),
294        );
295        Ok(())
296    }
297}
298
299impl EncodeLtd for SubscribeAck {
300    fn encoded_size(&self, limit: u32) -> usize {
301        let len = self.status.len();
302        if len > (u32::MAX - 2) as usize {
303            return usize::MAX; // bail to avoid overflow
304        }
305
306        2 + ack_props::encoded_size(&self.properties, &self.reason_string, limit - 2 - len as u32) + len
307    }
308
309    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
310        self.packet_id.encode(buf)?;
311        let len = self.status.len() as u32; // safe: max size checked already
312        ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
313        for &reason in self.status.iter() {
314            buf.put_u8(reason.into());
315        }
316        Ok(())
317    }
318}
319
320impl EncodeLtd for Unsubscribe {
321    fn encoded_size(&self, _limit: u32) -> usize {
322        let prop_len = self.user_properties.encoded_size();
323        2 + var_int_len(prop_len) as usize
324            + prop_len
325            + self.topic_filters.iter().fold(0, |acc, filter| acc + 2 + filter.len())
326    }
327
328    fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> {
329        self.packet_id.encode(buf)?;
330
331        // properties
332        let prop_len = self.user_properties.encoded_size();
333        utils::write_variable_length(prop_len as u32, buf); // safe: max size check is done already
334        self.user_properties.encode(buf)?;
335
336        // payload
337        for filter in self.topic_filters.iter() {
338            filter.encode(buf)?;
339        }
340        Ok(())
341    }
342}
343
344impl EncodeLtd for UnsubscribeAck {
345    // todo: almost identical to SUBACK
346    fn encoded_size(&self, limit: u32) -> usize {
347        let len = self.status.len();
348        2 + len + ack_props::encoded_size(&self.properties, &self.reason_string, reduce_limit(limit, 2 + len))
349    }
350
351    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
352        self.packet_id.encode(buf)?;
353        let len = self.status.len() as u32;
354
355        ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
356        for &reason in self.status.iter() {
357            buf.put_u8(reason.into());
358        }
359        Ok(())
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::v5::{Codec, Packet};
367    use tokio_util::codec::Decoder;
368    use tokio_util::codec::Encoder;
369
370    #[test]
371    fn test_sub() {
372        let pkt = Subscribe {
373            packet_id: 12.try_into().unwrap(),
374            id: Some(10.try_into().unwrap()),
375            user_properties: vec![("a".into(), "1".into())],
376            topic_filters: vec![("test".into(), SubscriptionOptions::default())],
377        };
378
379        let size = pkt.encoded_size(99999);
380        let mut buf = BytesMut::with_capacity(size);
381        pkt.encode(&mut buf, size as u32).unwrap();
382        assert_eq!(buf.len(), size);
383        assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap());
384
385        let pkt = Unsubscribe {
386            packet_id: 12.try_into().unwrap(),
387            user_properties: vec![("a".into(), "1".into())],
388            topic_filters: vec!["test".into()],
389        };
390
391        let size = pkt.encoded_size(99999);
392        let mut buf = BytesMut::with_capacity(size);
393        pkt.encode(&mut buf, size as u32).unwrap();
394        assert_eq!(buf.len(), size);
395        assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap());
396    }
397
398    #[test]
399    fn test_sub_pkt() {
400        let pkt = Packet::Subscribe(Subscribe {
401            packet_id: 12.try_into().unwrap(),
402            id: None,
403            user_properties: vec![("a".into(), "1".into())],
404            topic_filters: vec![("test".into(), SubscriptionOptions::default())],
405        });
406        let mut codec = Codec::default();
407
408        let mut buf = BytesMut::new();
409        codec.encode(pkt.clone(), &mut buf).unwrap();
410
411        assert_eq!(pkt, codec.decode(&mut buf).unwrap().unwrap().0);
412    }
413
414    #[test]
415    fn test_sub_ack() {
416        let ack = SubscribeAck {
417            packet_id: NonZeroU16::new(1).unwrap(),
418            properties: Vec::new(),
419            reason_string: Some("some reason".into()),
420            status: Vec::new(),
421        };
422
423        let size = ack.encoded_size(99999);
424        let mut buf = BytesMut::with_capacity(size);
425        ack.encode(&mut buf, size as u32).unwrap();
426        assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
427
428        let ack = SubscribeAck {
429            packet_id: NonZeroU16::new(1).unwrap(),
430            properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
431            reason_string: None,
432            status: vec![SubscribeAckReason::GrantedQos0],
433        };
434        let size = ack.encoded_size(99999);
435        let mut buf = BytesMut::with_capacity(size);
436        ack.encode(&mut buf, size as u32).unwrap();
437        assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
438
439        let ack = UnsubscribeAck {
440            packet_id: NonZeroU16::new(1).unwrap(),
441            properties: Vec::new(),
442            reason_string: Some("some reason".into()),
443            status: Vec::new(),
444        };
445        let mut buf = BytesMut::new();
446        let size = ack.encoded_size(99999);
447        ack.encode(&mut buf, size as u32).unwrap();
448        assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
449
450        let ack = UnsubscribeAck {
451            packet_id: NonZeroU16::new(1).unwrap(),
452            properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
453            reason_string: None,
454            status: vec![UnsubscribeAckReason::Success],
455        };
456        let size = ack.encoded_size(99999);
457        let mut buf = BytesMut::with_capacity(size);
458        ack.encode(&mut buf, size as u32).unwrap();
459        assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
460    }
461}