mqtt_proto/v5/
subscribe.rs

1use std::convert::TryFrom;
2use std::io;
3use std::sync::Arc;
4
5use tokio::io::AsyncRead;
6
7use super::{
8    decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
9    PropertyId, PropertyValue, UserProperty, VarByteInt,
10};
11use crate::{
12    decode_var_int, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, Encodable,
13    Error, Pid, QoS, TopicFilter,
14};
15
16/// Body type for SUBSCRIBE packet.
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
19pub struct Subscribe {
20    pub pid: Pid,
21    pub properties: SubscribeProperties,
22    pub topics: Vec<(TopicFilter, SubscriptionOptions)>,
23}
24
25impl Subscribe {
26    pub fn new(pid: Pid, topics: Vec<(TopicFilter, SubscriptionOptions)>) -> Self {
27        Subscribe {
28            pid,
29            properties: SubscribeProperties::default(),
30            topics,
31        }
32    }
33
34    pub async fn decode_async<T: AsyncRead + Unpin>(
35        reader: &mut T,
36        header: Header,
37    ) -> Result<Self, ErrorV5> {
38        let mut remaining_len = header.remaining_len as usize;
39        let pid = Pid::try_from(read_u16(reader).await?)?;
40        let properties = SubscribeProperties::decode_async(reader, header.typ).await?;
41        remaining_len = remaining_len
42            .checked_sub(2 + properties.encode_len())
43            .ok_or(Error::InvalidRemainingLength)?;
44        if remaining_len == 0 {
45            return Err(Error::EmptySubscription.into());
46        }
47        let mut topics = Vec::new();
48        while remaining_len > 0 {
49            let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
50            let options = {
51                let opt_byte = read_u8(reader).await?;
52                if opt_byte & 0b11000000 > 0 {
53                    return Err(ErrorV5::InvalidSubscriptionOption(opt_byte));
54                }
55                let max_qos = QoS::from_u8(opt_byte & 0b11)
56                    .map_err(|_| ErrorV5::InvalidSubscriptionOption(opt_byte))?;
57                let no_local = opt_byte & 0b100 == 0b100;
58                let retain_as_published = opt_byte & 0b1000 == 0b1000;
59                let retain_handling = RetainHandling::from_u8((opt_byte & 0b110000) >> 4)
60                    .ok_or(ErrorV5::InvalidSubscriptionOption(opt_byte))?;
61                SubscriptionOptions {
62                    max_qos,
63                    no_local,
64                    retain_as_published,
65                    retain_handling,
66                }
67            };
68            remaining_len = remaining_len
69                .checked_sub(3 + topic_filter.len())
70                .ok_or(Error::InvalidRemainingLength)?;
71            topics.push((topic_filter, options));
72        }
73        Ok(Subscribe {
74            pid,
75            properties,
76            topics,
77        })
78    }
79}
80
81impl Encodable for Subscribe {
82    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
83        write_u16(writer, self.pid.value())?;
84        self.properties.encode(writer)?;
85        for (topic_filter, options) in &self.topics {
86            write_bytes(writer, topic_filter.as_bytes())?;
87            write_u8(writer, options.to_u8())?;
88        }
89        Ok(())
90    }
91
92    fn encode_len(&self) -> usize {
93        2 + self.properties.encode_len()
94            + self
95                .topics
96                .iter()
97                .map(|(filter, _)| 3 + filter.len())
98                .sum::<usize>()
99    }
100}
101
102/// Property list for SUBSCRIBE packet.
103#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
104#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
105pub struct SubscribeProperties {
106    pub subscription_id: Option<VarByteInt>,
107    pub user_properties: Vec<UserProperty>,
108}
109
110impl SubscribeProperties {
111    pub async fn decode_async<T: AsyncRead + Unpin>(
112        reader: &mut T,
113        packet_type: PacketType,
114    ) -> Result<Self, ErrorV5> {
115        let mut properties = SubscribeProperties::default();
116        decode_properties!(packet_type, properties, reader, SubscriptionIdentifier,);
117        Ok(properties)
118    }
119}
120
121impl Encodable for SubscribeProperties {
122    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
123        encode_properties!(self, writer, SubscriptionIdentifier,);
124        Ok(())
125    }
126    fn encode_len(&self) -> usize {
127        let mut len = 0;
128        encode_properties_len!(self, len, SubscriptionIdentifier,);
129        len
130    }
131}
132
133/// Subscription options.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
136pub struct SubscriptionOptions {
137    pub max_qos: QoS,
138    pub no_local: bool,
139    pub retain_as_published: bool,
140    pub retain_handling: RetainHandling,
141}
142
143impl SubscriptionOptions {
144    pub fn new(max_qos: QoS) -> Self {
145        SubscriptionOptions {
146            max_qos,
147            no_local: false,
148            retain_as_published: true,
149            retain_handling: RetainHandling::SendAtSubscribe,
150        }
151    }
152
153    pub fn to_u8(&self) -> u8 {
154        let mut byte = self.max_qos as u8;
155        if self.no_local {
156            byte |= 0b100;
157        }
158        if self.retain_as_published {
159            byte |= 0b1000;
160        }
161        byte |= (self.retain_handling as u8) << 4;
162        byte
163    }
164}
165
166/// Retain handling type.
167#[repr(u8)]
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
169#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
170pub enum RetainHandling {
171    SendAtSubscribe = 0,
172    SendAtSubscribeIfNotExist = 1,
173    DoNotSend = 2,
174}
175
176impl RetainHandling {
177    pub fn from_u8(value: u8) -> Option<Self> {
178        let opt = match value {
179            0 => Self::SendAtSubscribe,
180            1 => Self::SendAtSubscribeIfNotExist,
181            2 => Self::DoNotSend,
182            _ => return None,
183        };
184        Some(opt)
185    }
186}
187
188/// Body type for SUBACK packet.
189#[derive(Debug, Clone, PartialEq, Eq)]
190#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
191pub struct Suback {
192    pub pid: Pid,
193    pub properties: SubackProperties,
194    pub topics: Vec<SubscribeReasonCode>,
195}
196
197impl Suback {
198    pub fn new(pid: Pid, topics: Vec<SubscribeReasonCode>) -> Self {
199        Suback {
200            pid,
201            properties: SubackProperties::default(),
202            topics,
203        }
204    }
205
206    pub async fn decode_async<T: AsyncRead + Unpin>(
207        reader: &mut T,
208        header: Header,
209    ) -> Result<Self, ErrorV5> {
210        let mut remaining_len = header.remaining_len as usize;
211        let pid = Pid::try_from(read_u16(reader).await?)?;
212        let properties = SubackProperties::decode_async(reader, header.typ).await?;
213        remaining_len = remaining_len
214            .checked_sub(2 + properties.encode_len())
215            .ok_or(Error::InvalidRemainingLength)?;
216        let mut topics = Vec::new();
217        while remaining_len > 0 {
218            let value = read_u8(reader).await?;
219            let code = SubscribeReasonCode::from_u8(value)
220                .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
221            topics.push(code);
222            remaining_len -= 1;
223        }
224        Ok(Suback {
225            pid,
226            properties,
227            topics,
228        })
229    }
230}
231
232impl Encodable for Suback {
233    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
234        write_u16(writer, self.pid.value())?;
235        self.properties.encode(writer)?;
236        for reason_code in &self.topics {
237            write_u8(writer, *reason_code as u8)?;
238        }
239        Ok(())
240    }
241
242    fn encode_len(&self) -> usize {
243        2 + self.properties.encode_len() + self.topics.len()
244    }
245}
246
247/// Property list for SUBACK packet.
248#[derive(Debug, Clone, PartialEq, Eq, Default)]
249#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
250pub struct SubackProperties {
251    pub reason_string: Option<Arc<String>>,
252    pub user_properties: Vec<UserProperty>,
253}
254
255impl SubackProperties {
256    pub async fn decode_async<T: AsyncRead + Unpin>(
257        reader: &mut T,
258        packet_type: PacketType,
259    ) -> Result<Self, ErrorV5> {
260        let mut properties = SubackProperties::default();
261        decode_properties!(packet_type, properties, reader, ReasonString,);
262        Ok(properties)
263    }
264}
265
266impl Encodable for SubackProperties {
267    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
268        encode_properties!(self, writer, ReasonString,);
269        Ok(())
270    }
271    fn encode_len(&self) -> usize {
272        let mut len = 0;
273        encode_properties_len!(self, len, ReasonString,);
274        len
275    }
276}
277
278/// Reason code for SUBACK packet.
279///
280/// | Dec |  Hex | Reason Code name                       | Description                                                                                                        |
281/// |-----|------|----------------------------------------|--------------------------------------------------------------------------------------------------------------------|
282/// |   0 | 0x00 | Granted QoS 0                          | The subscription is accepted and the maximum QoS sent will be QoS 0. This might be a lower QoS than was requested. |
283/// |   1 | 0x01 | Granted QoS 1                          | The subscription is accepted and the maximum QoS sent will be QoS 1. This might be a lower QoS than was requested. |
284/// |   2 | 0x02 | Granted QoS 2                          | The subscription is accepted and any received QoS will be sent to this subscription.                               |
285/// | 128 | 0x80 | Unspecified error                      | The subscription is not accepted and the Server either does not wish to reveal the reason                          |
286/// |     |      |                                        | or none of the other Reason Codes apply.                                                                           |
287/// | 131 | 0x83 | Implementation specific error          | The SUBSCRIBE is valid but the Server does not accept it.                                                          |
288/// | 135 | 0x87 | Not authorized                         | The Client is not authorized to make this subscription.                                                            |
289/// | 143 | 0x8F | Topic Filter invalid                   | The Topic Filter is correctly formed but is not allowed for this Client.                                           |
290/// | 145 | 0x91 | Packet Identifier in use               | The specified Packet Identifier is already in use.                                                                 |
291/// | 151 | 0x97 | Quota exceeded                         | An implementation or administrative imposed limit has been exceeded.                                               |
292/// | 158 | 0x9E | Shared Subscriptions not supported     | The Server does not support Shared Subscriptions for this Client.                                                  |
293/// | 161 | 0xA1 | Subscription Identifiers not supported | The Server does not support Subscription Identifiers; the subscription is not accepted.                            |
294/// | 162 | 0xA2 | Wildcard Subscriptions not supported   | The Server does not support Wildcard Subscriptions; the subscription is not accepted.                              |
295#[repr(u8)]
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
297#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
298pub enum SubscribeReasonCode {
299    GrantedQoS0 = 0x00,
300    GrantedQoS1 = 0x01,
301    GrantedQoS2 = 0x02,
302    UnspecifiedError = 0x80,
303    ImplementationSpecificError = 0x83,
304    NotAuthorized = 0x87,
305    TopicFilterInvalid = 0x8F,
306    PacketIdentifierInUse = 0x91,
307    QuotaExceeded = 0x97,
308    SharedSubscriptionNotSupported = 0x9E,
309    SubscriptionIdentifiersNotSupported = 0xA1,
310    WildcardSubscriptionsNotSupported = 0xA2,
311}
312
313impl SubscribeReasonCode {
314    pub fn from_u8(value: u8) -> Option<Self> {
315        let code = match value {
316            0x00 => Self::GrantedQoS0,
317            0x01 => Self::GrantedQoS1,
318            0x02 => Self::GrantedQoS2,
319            0x80 => Self::UnspecifiedError,
320            0x83 => Self::ImplementationSpecificError,
321            0x87 => Self::NotAuthorized,
322            0x8F => Self::TopicFilterInvalid,
323            0x91 => Self::PacketIdentifierInUse,
324            0x97 => Self::QuotaExceeded,
325            0x9E => Self::SharedSubscriptionNotSupported,
326            0xA1 => Self::SubscriptionIdentifiersNotSupported,
327            0xA2 => Self::WildcardSubscriptionsNotSupported,
328            _ => return None,
329        };
330        Some(code)
331    }
332}
333
334/// Body type for UNSUBSCRIBE packet.
335#[derive(Debug, Clone, PartialEq, Eq, Hash)]
336#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
337pub struct Unsubscribe {
338    pub pid: Pid,
339    pub properties: UnsubscribeProperties,
340    pub topics: Vec<TopicFilter>,
341}
342
343impl Unsubscribe {
344    pub fn new(pid: Pid, topics: Vec<TopicFilter>) -> Self {
345        Unsubscribe {
346            pid,
347            properties: Default::default(),
348            topics,
349        }
350    }
351
352    pub async fn decode_async<T: AsyncRead + Unpin>(
353        reader: &mut T,
354        header: Header,
355    ) -> Result<Self, ErrorV5> {
356        let mut remaining_len = header.remaining_len as usize;
357        let pid = Pid::try_from(read_u16(reader).await?)?;
358        let (property_len, property_len_bytes) = decode_var_int(reader).await?;
359        let mut properties = UnsubscribeProperties::default();
360        let mut len = 0;
361        while property_len as usize > len {
362            let property_id = PropertyId::from_u8(read_u8(reader).await?)?;
363            match property_id {
364                PropertyId::UserProperty => {
365                    let property = PropertyValue::decode_user_property(reader).await?;
366                    len += 1 + 4 + property.name.len() + property.value.len();
367                    properties.user_properties.push(property);
368                }
369                _ => return Err(ErrorV5::InvalidProperty(header.typ, property_id)),
370            }
371        }
372        if property_len as usize != len {
373            return Err(ErrorV5::InvalidPropertyLength(property_len));
374        }
375        remaining_len = remaining_len
376            .checked_sub(2 + property_len_bytes + len)
377            .ok_or(Error::InvalidRemainingLength)?;
378        if remaining_len == 0 {
379            return Err(Error::EmptySubscription.into());
380        }
381        let mut topics = Vec::new();
382        while remaining_len > 0 {
383            let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
384            remaining_len = remaining_len
385                .checked_sub(2 + topic_filter.len())
386                .ok_or(Error::InvalidRemainingLength)?;
387            topics.push(topic_filter);
388        }
389        Ok(Unsubscribe {
390            pid,
391            properties,
392            topics,
393        })
394    }
395}
396
397impl Encodable for Unsubscribe {
398    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
399        write_u16(writer, self.pid.value())?;
400        self.properties.encode(writer)?;
401        for topic_filter in &self.topics {
402            write_bytes(writer, topic_filter.as_bytes())?;
403        }
404        Ok(())
405    }
406
407    fn encode_len(&self) -> usize {
408        let mut len = 2;
409        len += self.properties.encode_len();
410        len += self
411            .topics
412            .iter()
413            .map(|topic_filter| 2 + topic_filter.len())
414            .sum::<usize>();
415        len
416    }
417}
418
419/// Property list for UNSUBSCRIBE packet.
420#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
421#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
422pub struct UnsubscribeProperties {
423    pub user_properties: Vec<UserProperty>,
424}
425
426impl UnsubscribeProperties {
427    pub async fn decode_async<T: AsyncRead + Unpin>(
428        reader: &mut T,
429        packet_type: PacketType,
430    ) -> Result<Self, ErrorV5> {
431        let mut properties = UnsubscribeProperties::default();
432        decode_properties!(packet_type, properties, reader,);
433        Ok(properties)
434    }
435}
436
437impl Encodable for UnsubscribeProperties {
438    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
439        encode_properties!(self, writer);
440        Ok(())
441    }
442    fn encode_len(&self) -> usize {
443        let mut len = 0;
444        encode_properties_len!(self, len);
445        len
446    }
447}
448
449impl From<Vec<UserProperty>> for UnsubscribeProperties {
450    fn from(user_properties: Vec<UserProperty>) -> UnsubscribeProperties {
451        UnsubscribeProperties { user_properties }
452    }
453}
454
455/// Body type for UNSUBACK packet.
456#[derive(Debug, Clone, PartialEq, Eq)]
457#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
458pub struct Unsuback {
459    pub pid: Pid,
460    pub properties: UnsubackProperties,
461    pub topics: Vec<UnsubscribeReasonCode>,
462}
463
464impl Unsuback {
465    pub fn new(pid: Pid, topics: Vec<UnsubscribeReasonCode>) -> Self {
466        Unsuback {
467            pid,
468            properties: UnsubackProperties::default(),
469            topics,
470        }
471    }
472
473    pub async fn decode_async<T: AsyncRead + Unpin>(
474        reader: &mut T,
475        header: Header,
476    ) -> Result<Self, ErrorV5> {
477        let mut remaining_len = header.remaining_len as usize;
478        let pid = Pid::try_from(read_u16(reader).await?)?;
479        let properties = UnsubackProperties::decode_async(reader, header.typ).await?;
480        remaining_len = remaining_len
481            .checked_sub(2 + properties.encode_len())
482            .ok_or(Error::InvalidRemainingLength)?;
483        let mut topics = Vec::new();
484        while remaining_len > 0 {
485            let value = read_u8(reader).await?;
486            let code = UnsubscribeReasonCode::from_u8(value)
487                .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
488            topics.push(code);
489            remaining_len -= 1;
490        }
491        Ok(Unsuback {
492            pid,
493            properties,
494            topics,
495        })
496    }
497}
498
499impl Encodable for Unsuback {
500    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
501        write_u16(writer, self.pid.value())?;
502        self.properties.encode(writer)?;
503        for reason_code in &self.topics {
504            write_u8(writer, *reason_code as u8)?;
505        }
506        Ok(())
507    }
508
509    fn encode_len(&self) -> usize {
510        2 + self.properties.encode_len() + self.topics.len()
511    }
512}
513
514/// Property list for UNSUBACK packet.
515#[derive(Debug, Clone, PartialEq, Eq, Default)]
516#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
517pub struct UnsubackProperties {
518    pub reason_string: Option<Arc<String>>,
519    pub user_properties: Vec<UserProperty>,
520}
521
522impl UnsubackProperties {
523    pub async fn decode_async<T: AsyncRead + Unpin>(
524        reader: &mut T,
525        packet_type: PacketType,
526    ) -> Result<Self, ErrorV5> {
527        let mut properties = UnsubackProperties::default();
528        decode_properties!(packet_type, properties, reader, ReasonString,);
529        Ok(properties)
530    }
531}
532
533impl Encodable for UnsubackProperties {
534    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
535        encode_properties!(self, writer, ReasonString,);
536        Ok(())
537    }
538    fn encode_len(&self) -> usize {
539        let mut len = 0;
540        encode_properties_len!(self, len, ReasonString,);
541        len
542    }
543}
544
545/// Reason code for UNSUBACK packet.
546///
547/// | Dec |  Hex | Reason Code name              | Description                                                                                   |
548/// |-----|------|-------------------------------|-----------------------------------------------------------------------------------------------|
549/// |   0 | 0x00 | Success                       | The subscription is deleted.                                                                  |
550/// |  17 | 0x11 | No subscription existed       | No matching Topic Filter is being used by the Client.                                         |
551/// | 128 | 0x80 | Unspecified error             | The unsubscribe could not be completed and                                                    |
552/// |     |      |                               | the Server either does not wish to reveal the reason or none of the other Reason Codes apply. |
553/// | 131 | 0x83 | Implementation specific error | The UNSUBSCRIBE is valid but the Server does not accept it.                                   |
554/// | 135 | 0x87 | Not authorized                | The Client is not authorized to unsubscribe.                                                  |
555/// | 143 | 0x8F | Topic Filter invalid          | The Topic Filter is correctly formed but is not allowed for this Client.                      |
556/// | 145 | 0x91 | Packet Identifier in use      | The specified Packet Identifier is already in use.                                            |
557#[repr(u8)]
558#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
559#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
560pub enum UnsubscribeReasonCode {
561    Success = 0x00,
562    NoSubscriptionExisted = 0x11,
563    UnspecifiedError = 0x80,
564    ImplementationSpecificError = 0x83,
565    NotAuthorized = 0x87,
566    TopicFilterInvalid = 0x8F,
567    PacketIdentifierInUse = 0x91,
568}
569
570impl UnsubscribeReasonCode {
571    pub fn from_u8(value: u8) -> Option<Self> {
572        let code = match value {
573            0x00 => Self::Success,
574            0x11 => Self::NoSubscriptionExisted,
575            0x80 => Self::UnspecifiedError,
576            0x83 => Self::ImplementationSpecificError,
577            0x87 => Self::NotAuthorized,
578            0x8F => Self::TopicFilterInvalid,
579            0x91 => Self::PacketIdentifierInUse,
580            _ => return None,
581        };
582        Some(code)
583    }
584}