mqtt_proto/v5/
subscribe.rs

1use core::convert::TryFrom;
2
3use alloc::string::String;
4use alloc::sync::Arc;
5use alloc::vec::Vec;
6
7use crate::{
8    decode_var_int, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, AsyncRead,
9    Encodable, Error, Pid, QoS, SyncWrite, TopicFilter,
10};
11
12use super::{
13    decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
14    PropertyId, PropertyValue, UserProperty, VarByteInt,
15};
16
17/// Body type for SUBSCRIBE packet.
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20pub struct Subscribe {
21    pub pid: Pid,
22    pub properties: SubscribeProperties,
23    pub topics: Vec<(TopicFilter, SubscriptionOptions)>,
24}
25
26impl Subscribe {
27    pub fn new(pid: Pid, topics: Vec<(TopicFilter, SubscriptionOptions)>) -> Self {
28        Subscribe {
29            pid,
30            properties: SubscribeProperties::default(),
31            topics,
32        }
33    }
34
35    pub async fn decode_async<T: AsyncRead + Unpin>(
36        reader: &mut T,
37        header: Header,
38    ) -> Result<Self, ErrorV5> {
39        let mut remaining_len = header.remaining_len as usize;
40        let pid = Pid::try_from(read_u16(reader).await?)?;
41        let properties = SubscribeProperties::decode_async(reader, header.typ).await?;
42        remaining_len = remaining_len
43            .checked_sub(2 + properties.encode_len())
44            .ok_or(Error::InvalidRemainingLength)?;
45        if remaining_len == 0 {
46            return Err(Error::EmptySubscription.into());
47        }
48        let mut topics = Vec::new();
49        while remaining_len > 0 {
50            let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
51            let options = {
52                let opt_byte = read_u8(reader).await?;
53                if opt_byte & 0b11000000 > 0 {
54                    return Err(ErrorV5::InvalidSubscriptionOption(opt_byte));
55                }
56                let max_qos = QoS::from_u8(opt_byte & 0b11)
57                    .map_err(|_| ErrorV5::InvalidSubscriptionOption(opt_byte))?;
58                let no_local = opt_byte & 0b100 == 0b100;
59                let retain_as_published = opt_byte & 0b1000 == 0b1000;
60                let retain_handling = RetainHandling::from_u8((opt_byte & 0b110000) >> 4)
61                    .ok_or(ErrorV5::InvalidSubscriptionOption(opt_byte))?;
62                SubscriptionOptions {
63                    max_qos,
64                    no_local,
65                    retain_as_published,
66                    retain_handling,
67                }
68            };
69            remaining_len = remaining_len
70                .checked_sub(3 + topic_filter.len())
71                .ok_or(Error::InvalidRemainingLength)?;
72            topics.push((topic_filter, options));
73        }
74        Ok(Subscribe {
75            pid,
76            properties,
77            topics,
78        })
79    }
80}
81
82impl Encodable for Subscribe {
83    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
84        write_u16(writer, self.pid.value())?;
85        self.properties.encode(writer)?;
86        for (topic_filter, options) in &self.topics {
87            write_bytes(writer, topic_filter.as_bytes())?;
88            write_u8(writer, options.to_u8())?;
89        }
90        Ok(())
91    }
92
93    fn encode_len(&self) -> usize {
94        2 + self.properties.encode_len()
95            + self
96                .topics
97                .iter()
98                .map(|(filter, _)| 3 + filter.len())
99                .sum::<usize>()
100    }
101}
102
103/// Property list for SUBSCRIBE packet.
104#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
105#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
106pub struct SubscribeProperties {
107    pub subscription_id: Option<VarByteInt>,
108    pub user_properties: Vec<UserProperty>,
109}
110
111impl SubscribeProperties {
112    pub async fn decode_async<T: AsyncRead + Unpin>(
113        reader: &mut T,
114        packet_type: PacketType,
115    ) -> Result<Self, ErrorV5> {
116        let mut properties = SubscribeProperties::default();
117        decode_properties!(packet_type, properties, reader, SubscriptionIdentifier,);
118        Ok(properties)
119    }
120}
121
122impl Encodable for SubscribeProperties {
123    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
124        encode_properties!(self, writer, SubscriptionIdentifier,);
125        Ok(())
126    }
127    fn encode_len(&self) -> usize {
128        let mut len = 0;
129        encode_properties_len!(self, len, SubscriptionIdentifier,);
130        len
131    }
132}
133
134/// Subscription options.
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
136#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
137pub struct SubscriptionOptions {
138    pub max_qos: QoS,
139    pub no_local: bool,
140    pub retain_as_published: bool,
141    pub retain_handling: RetainHandling,
142}
143
144impl SubscriptionOptions {
145    pub fn new(max_qos: QoS) -> Self {
146        SubscriptionOptions {
147            max_qos,
148            no_local: false,
149            retain_as_published: true,
150            retain_handling: RetainHandling::SendAtSubscribe,
151        }
152    }
153
154    pub fn to_u8(&self) -> u8 {
155        let mut byte = self.max_qos as u8;
156        if self.no_local {
157            byte |= 0b100;
158        }
159        if self.retain_as_published {
160            byte |= 0b1000;
161        }
162        byte |= (self.retain_handling as u8) << 4;
163        byte
164    }
165}
166
167/// Retain handling type.
168#[repr(u8)]
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
170#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
171pub enum RetainHandling {
172    SendAtSubscribe = 0,
173    SendAtSubscribeIfNotExist = 1,
174    DoNotSend = 2,
175}
176
177impl RetainHandling {
178    pub fn from_u8(value: u8) -> Option<Self> {
179        let opt = match value {
180            0 => Self::SendAtSubscribe,
181            1 => Self::SendAtSubscribeIfNotExist,
182            2 => Self::DoNotSend,
183            _ => return None,
184        };
185        Some(opt)
186    }
187}
188
189/// Body type for SUBACK packet.
190#[derive(Debug, Clone, PartialEq, Eq)]
191#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
192pub struct Suback {
193    pub pid: Pid,
194    pub properties: SubackProperties,
195    pub topics: Vec<SubscribeReasonCode>,
196}
197
198impl Suback {
199    pub fn new(pid: Pid, topics: Vec<SubscribeReasonCode>) -> Self {
200        Suback {
201            pid,
202            properties: SubackProperties::default(),
203            topics,
204        }
205    }
206
207    pub async fn decode_async<T: AsyncRead + Unpin>(
208        reader: &mut T,
209        header: Header,
210    ) -> Result<Self, ErrorV5> {
211        let mut remaining_len = header.remaining_len as usize;
212        let pid = Pid::try_from(read_u16(reader).await?)?;
213        let properties = SubackProperties::decode_async(reader, header.typ).await?;
214        remaining_len = remaining_len
215            .checked_sub(2 + properties.encode_len())
216            .ok_or(Error::InvalidRemainingLength)?;
217        let mut topics = Vec::new();
218        while remaining_len > 0 {
219            let value = read_u8(reader).await?;
220            let code = SubscribeReasonCode::from_u8(value)
221                .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
222            topics.push(code);
223            remaining_len -= 1;
224        }
225        Ok(Suback {
226            pid,
227            properties,
228            topics,
229        })
230    }
231}
232
233impl Encodable for Suback {
234    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
235        write_u16(writer, self.pid.value())?;
236        self.properties.encode(writer)?;
237        for reason_code in &self.topics {
238            write_u8(writer, *reason_code as u8)?;
239        }
240        Ok(())
241    }
242
243    fn encode_len(&self) -> usize {
244        2 + self.properties.encode_len() + self.topics.len()
245    }
246}
247
248/// Property list for SUBACK packet.
249#[derive(Debug, Clone, PartialEq, Eq, Default)]
250#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
251pub struct SubackProperties {
252    pub reason_string: Option<Arc<String>>,
253    pub user_properties: Vec<UserProperty>,
254}
255
256impl SubackProperties {
257    pub async fn decode_async<T: AsyncRead + Unpin>(
258        reader: &mut T,
259        packet_type: PacketType,
260    ) -> Result<Self, ErrorV5> {
261        let mut properties = SubackProperties::default();
262        decode_properties!(packet_type, properties, reader, ReasonString,);
263        Ok(properties)
264    }
265}
266
267impl Encodable for SubackProperties {
268    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
269        encode_properties!(self, writer, ReasonString,);
270        Ok(())
271    }
272    fn encode_len(&self) -> usize {
273        let mut len = 0;
274        encode_properties_len!(self, len, ReasonString,);
275        len
276    }
277}
278
279/// Reason code for SUBACK packet.
280///
281/// | Dec |  Hex | Reason Code name                       | Description                                                                                                        |
282/// |-----|------|----------------------------------------|--------------------------------------------------------------------------------------------------------------------|
283/// |   0 | 0x00 | Granted QoS 0                          | The subscription is accepted and the maximum QoS sent will be QoS 0. This might be a lower QoS than was requested. |
284/// |   1 | 0x01 | Granted QoS 1                          | The subscription is accepted and the maximum QoS sent will be QoS 1. This might be a lower QoS than was requested. |
285/// |   2 | 0x02 | Granted QoS 2                          | The subscription is accepted and any received QoS will be sent to this subscription.                               |
286/// | 128 | 0x80 | Unspecified error                      | The subscription is not accepted and the Server either does not wish to reveal the reason                          |
287/// |     |      |                                        | or none of the other Reason Codes apply.                                                                           |
288/// | 131 | 0x83 | Implementation specific error          | The SUBSCRIBE is valid but the Server does not accept it.                                                          |
289/// | 135 | 0x87 | Not authorized                         | The Client is not authorized to make this subscription.                                                            |
290/// | 143 | 0x8F | Topic Filter invalid                   | The Topic Filter is correctly formed but is not allowed for this Client.                                           |
291/// | 145 | 0x91 | Packet Identifier in use               | The specified Packet Identifier is already in use.                                                                 |
292/// | 151 | 0x97 | Quota exceeded                         | An implementation or administrative imposed limit has been exceeded.                                               |
293/// | 158 | 0x9E | Shared Subscriptions not supported     | The Server does not support Shared Subscriptions for this Client.                                                  |
294/// | 161 | 0xA1 | Subscription Identifiers not supported | The Server does not support Subscription Identifiers; the subscription is not accepted.                            |
295/// | 162 | 0xA2 | Wildcard Subscriptions not supported   | The Server does not support Wildcard Subscriptions; the subscription is not accepted.                              |
296#[repr(u8)]
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
298#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
299pub enum SubscribeReasonCode {
300    GrantedQoS0 = 0x00,
301    GrantedQoS1 = 0x01,
302    GrantedQoS2 = 0x02,
303    UnspecifiedError = 0x80,
304    ImplementationSpecificError = 0x83,
305    NotAuthorized = 0x87,
306    TopicFilterInvalid = 0x8F,
307    PacketIdentifierInUse = 0x91,
308    QuotaExceeded = 0x97,
309    SharedSubscriptionNotSupported = 0x9E,
310    SubscriptionIdentifiersNotSupported = 0xA1,
311    WildcardSubscriptionsNotSupported = 0xA2,
312}
313
314impl SubscribeReasonCode {
315    pub fn from_u8(value: u8) -> Option<Self> {
316        let code = match value {
317            0x00 => Self::GrantedQoS0,
318            0x01 => Self::GrantedQoS1,
319            0x02 => Self::GrantedQoS2,
320            0x80 => Self::UnspecifiedError,
321            0x83 => Self::ImplementationSpecificError,
322            0x87 => Self::NotAuthorized,
323            0x8F => Self::TopicFilterInvalid,
324            0x91 => Self::PacketIdentifierInUse,
325            0x97 => Self::QuotaExceeded,
326            0x9E => Self::SharedSubscriptionNotSupported,
327            0xA1 => Self::SubscriptionIdentifiersNotSupported,
328            0xA2 => Self::WildcardSubscriptionsNotSupported,
329            _ => return None,
330        };
331        Some(code)
332    }
333}
334
335/// Body type for UNSUBSCRIBE packet.
336#[derive(Debug, Clone, PartialEq, Eq, Hash)]
337#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
338pub struct Unsubscribe {
339    pub pid: Pid,
340    pub properties: UnsubscribeProperties,
341    pub topics: Vec<TopicFilter>,
342}
343
344impl Unsubscribe {
345    pub fn new(pid: Pid, topics: Vec<TopicFilter>) -> Self {
346        Unsubscribe {
347            pid,
348            properties: Default::default(),
349            topics,
350        }
351    }
352
353    pub async fn decode_async<T: AsyncRead + Unpin>(
354        reader: &mut T,
355        header: Header,
356    ) -> Result<Self, ErrorV5> {
357        let mut remaining_len = header.remaining_len as usize;
358        let pid = Pid::try_from(read_u16(reader).await?)?;
359        let (property_len, property_len_bytes) = decode_var_int(reader).await?;
360        let mut properties = UnsubscribeProperties::default();
361        let mut len = 0;
362        while property_len as usize > len {
363            let property_id = PropertyId::from_u8(read_u8(reader).await?)?;
364            match property_id {
365                PropertyId::UserProperty => {
366                    let property = PropertyValue::decode_user_property(reader).await?;
367                    len += 1 + 4 + property.name.len() + property.value.len();
368                    properties.user_properties.push(property);
369                }
370                _ => return Err(ErrorV5::InvalidProperty(header.typ, property_id)),
371            }
372        }
373        if property_len as usize != len {
374            return Err(ErrorV5::InvalidPropertyLength(property_len));
375        }
376        remaining_len = remaining_len
377            .checked_sub(2 + property_len_bytes + len)
378            .ok_or(Error::InvalidRemainingLength)?;
379        if remaining_len == 0 {
380            return Err(Error::EmptySubscription.into());
381        }
382        let mut topics = Vec::new();
383        while remaining_len > 0 {
384            let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
385            remaining_len = remaining_len
386                .checked_sub(2 + topic_filter.len())
387                .ok_or(Error::InvalidRemainingLength)?;
388            topics.push(topic_filter);
389        }
390        Ok(Unsubscribe {
391            pid,
392            properties,
393            topics,
394        })
395    }
396}
397
398impl Encodable for Unsubscribe {
399    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
400        write_u16(writer, self.pid.value())?;
401        self.properties.encode(writer)?;
402        for topic_filter in &self.topics {
403            write_bytes(writer, topic_filter.as_bytes())?;
404        }
405        Ok(())
406    }
407
408    fn encode_len(&self) -> usize {
409        let mut len = 2;
410        len += self.properties.encode_len();
411        len += self
412            .topics
413            .iter()
414            .map(|topic_filter| 2 + topic_filter.len())
415            .sum::<usize>();
416        len
417    }
418}
419
420/// Property list for UNSUBSCRIBE packet.
421#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
422#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
423pub struct UnsubscribeProperties {
424    pub user_properties: Vec<UserProperty>,
425}
426
427impl UnsubscribeProperties {
428    pub async fn decode_async<T: AsyncRead + Unpin>(
429        reader: &mut T,
430        packet_type: PacketType,
431    ) -> Result<Self, ErrorV5> {
432        let mut properties = UnsubscribeProperties::default();
433        decode_properties!(packet_type, properties, reader,);
434        Ok(properties)
435    }
436}
437
438impl Encodable for UnsubscribeProperties {
439    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
440        encode_properties!(self, writer);
441        Ok(())
442    }
443    fn encode_len(&self) -> usize {
444        let mut len = 0;
445        encode_properties_len!(self, len);
446        len
447    }
448}
449
450impl From<Vec<UserProperty>> for UnsubscribeProperties {
451    fn from(user_properties: Vec<UserProperty>) -> UnsubscribeProperties {
452        UnsubscribeProperties { user_properties }
453    }
454}
455
456/// Body type for UNSUBACK packet.
457#[derive(Debug, Clone, PartialEq, Eq)]
458#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
459pub struct Unsuback {
460    pub pid: Pid,
461    pub properties: UnsubackProperties,
462    pub topics: Vec<UnsubscribeReasonCode>,
463}
464
465impl Unsuback {
466    pub fn new(pid: Pid, topics: Vec<UnsubscribeReasonCode>) -> Self {
467        Unsuback {
468            pid,
469            properties: UnsubackProperties::default(),
470            topics,
471        }
472    }
473
474    pub async fn decode_async<T: AsyncRead + Unpin>(
475        reader: &mut T,
476        header: Header,
477    ) -> Result<Self, ErrorV5> {
478        let mut remaining_len = header.remaining_len as usize;
479        let pid = Pid::try_from(read_u16(reader).await?)?;
480        let properties = UnsubackProperties::decode_async(reader, header.typ).await?;
481        remaining_len = remaining_len
482            .checked_sub(2 + properties.encode_len())
483            .ok_or(Error::InvalidRemainingLength)?;
484        let mut topics = Vec::new();
485        while remaining_len > 0 {
486            let value = read_u8(reader).await?;
487            let code = UnsubscribeReasonCode::from_u8(value)
488                .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
489            topics.push(code);
490            remaining_len -= 1;
491        }
492        Ok(Unsuback {
493            pid,
494            properties,
495            topics,
496        })
497    }
498}
499
500impl Encodable for Unsuback {
501    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
502        write_u16(writer, self.pid.value())?;
503        self.properties.encode(writer)?;
504        for reason_code in &self.topics {
505            write_u8(writer, *reason_code as u8)?;
506        }
507        Ok(())
508    }
509
510    fn encode_len(&self) -> usize {
511        2 + self.properties.encode_len() + self.topics.len()
512    }
513}
514
515/// Property list for UNSUBACK packet.
516#[derive(Debug, Clone, PartialEq, Eq, Default)]
517#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
518pub struct UnsubackProperties {
519    pub reason_string: Option<Arc<String>>,
520    pub user_properties: Vec<UserProperty>,
521}
522
523impl UnsubackProperties {
524    pub async fn decode_async<T: AsyncRead + Unpin>(
525        reader: &mut T,
526        packet_type: PacketType,
527    ) -> Result<Self, ErrorV5> {
528        let mut properties = UnsubackProperties::default();
529        decode_properties!(packet_type, properties, reader, ReasonString,);
530        Ok(properties)
531    }
532}
533
534impl Encodable for UnsubackProperties {
535    fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
536        encode_properties!(self, writer, ReasonString,);
537        Ok(())
538    }
539    fn encode_len(&self) -> usize {
540        let mut len = 0;
541        encode_properties_len!(self, len, ReasonString,);
542        len
543    }
544}
545
546/// Reason code for UNSUBACK packet.
547///
548/// | Dec |  Hex | Reason Code name              | Description                                                                                   |
549/// |-----|------|-------------------------------|-----------------------------------------------------------------------------------------------|
550/// |   0 | 0x00 | Success                       | The subscription is deleted.                                                                  |
551/// |  17 | 0x11 | No subscription existed       | No matching Topic Filter is being used by the Client.                                         |
552/// | 128 | 0x80 | Unspecified error             | The unsubscribe could not be completed and                                                    |
553/// |     |      |                               | the Server either does not wish to reveal the reason or none of the other Reason Codes apply. |
554/// | 131 | 0x83 | Implementation specific error | The UNSUBSCRIBE is valid but the Server does not accept it.                                   |
555/// | 135 | 0x87 | Not authorized                | The Client is not authorized to unsubscribe.                                                  |
556/// | 143 | 0x8F | Topic Filter invalid          | The Topic Filter is correctly formed but is not allowed for this Client.                      |
557/// | 145 | 0x91 | Packet Identifier in use      | The specified Packet Identifier is already in use.                                            |
558#[repr(u8)]
559#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
560#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
561pub enum UnsubscribeReasonCode {
562    Success = 0x00,
563    NoSubscriptionExisted = 0x11,
564    UnspecifiedError = 0x80,
565    ImplementationSpecificError = 0x83,
566    NotAuthorized = 0x87,
567    TopicFilterInvalid = 0x8F,
568    PacketIdentifierInUse = 0x91,
569}
570
571impl UnsubscribeReasonCode {
572    pub fn from_u8(value: u8) -> Option<Self> {
573        let code = match value {
574            0x00 => Self::Success,
575            0x11 => Self::NoSubscriptionExisted,
576            0x80 => Self::UnspecifiedError,
577            0x83 => Self::ImplementationSpecificError,
578            0x87 => Self::NotAuthorized,
579            0x8F => Self::TopicFilterInvalid,
580            0x91 => Self::PacketIdentifierInUse,
581            _ => return None,
582        };
583        Some(code)
584    }
585}