hebo_codec/v5/
subscribe.rs

1// Copyright (c) 2021 Xu Shaohua <shaohua@biofan.org>. All rights reserved.
2// Use of this source is governed by Apache-2.0 License that can be found
3// in the LICENSE file.
4
5use std::convert::TryFrom;
6
7use super::{
8    property::check_multiple_subscription_identifiers, property::check_property_type_list,
9    Properties, PropertyType,
10};
11use crate::{
12    ByteArray, DecodeError, DecodePacket, EncodeError, EncodePacket, FixedHeader, Packet, PacketId,
13    PacketType, QoS, SubTopic, VarIntError,
14};
15
16#[repr(u8)]
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RetainHandling {
19    /// 0 = Send retained messages at the time of the subscribe.
20    #[default]
21    Send = 0,
22
23    /// 1 = Send retained messages at subscribe only if the subscription does not currently exist.
24    SendFirst = 1,
25
26    /// 2 = Do not send retained messages at the time of the subscribe.
27    NoSend = 2,
28}
29
30impl TryFrom<u8> for RetainHandling {
31    type Error = DecodeError;
32
33    fn try_from(v: u8) -> Result<Self, Self::Error> {
34        match v {
35            0 => Ok(Self::Send),
36            1 => Ok(Self::SendFirst),
37            2 => Ok(Self::NoSend),
38            _ => Err(DecodeError::OtherErrors),
39        }
40    }
41}
42
43/// Topic/QoS pair.
44#[allow(clippy::module_name_repetitions)]
45#[derive(Clone, Debug, Default, PartialEq, Eq)]
46pub struct SubscribeTopic {
47    /// Subscribed `topic` contains wildcard characters to match interested topics with patterns.
48    topic: SubTopic,
49
50    /// Bits 0 and 1 of the Subscription Options represent Maximum QoS field.
51    ///
52    /// This gives the maximum QoS level at which the Server can send Application Messages
53    /// to the Client. It is a Protocol Error if the Maximum QoS field has the value 3.
54    qos: QoS,
55
56    /// Bit 2 of the Subscription Options represents the No Local option.
57    ///
58    /// If the value is 1, Application Messages MUST NOT be forwarded to a connection
59    /// with a ClientID equal to the ClientID of the publishing connection [MQTT-3.8.3-3].
60    ///
61    /// It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription [MQTT-3.8.3-4].
62    no_local: bool,
63
64    /// Bit 3 of the Subscription Options represents the Retain As Published option.
65    ///
66    /// If 1, Application Messages forwarded using this subscription keep the RETAIN flag
67    /// they were published with. If 0, Application Messages forwarded using this subscription
68    /// have the RETAIN flag set to 0. Retained messages sent when the subscription
69    /// is established have the RETAIN flag set to 1.
70    retain_as_published: bool,
71
72    /// Bits 4 and 5 of the Subscription Options represent the Retain Handling option.
73    ///
74    /// This option specifies whether retained messages are sent when the subscription
75    /// is established. This does not affect the sending of retained messages
76    /// at any point after the subscribe. If there are no retained messages
77    /// matching the Topic Filter, all of these values act the same. The values are:
78    ///
79    /// - 0 = Send retained messages at the time of the subscribe
80    /// - 1 = Send retained messages at subscribe only if the subscription does not currently exist
81    /// - 2 = Do not send retained messages at the time of the subscribe
82    ///
83    /// It is a Protocol Error to send a Retain Handling value of 3.
84    retain_handling: RetainHandling,
85}
86
87impl SubscribeTopic {
88    /// Create a new subscribe topic.
89    ///
90    /// # Errors
91    ///
92    /// Returns error if `topic` is invalid.
93    pub fn new(topic: &str, qos: QoS) -> Result<Self, EncodeError> {
94        let topic = SubTopic::new(topic)?;
95        Ok(Self {
96            topic,
97            qos,
98            ..Self::default()
99        })
100    }
101
102    /// Update topic pattern.
103    ///
104    /// # Errors
105    ///
106    /// Returns error if `topic` is invalid.
107    pub fn set_topic(&mut self, topic: &str) -> Result<&mut Self, EncodeError> {
108        self.topic = SubTopic::new(topic)?;
109        Ok(self)
110    }
111
112    /// Get current topic pattern.
113    #[must_use]
114    pub fn topic(&self) -> &str {
115        self.topic.as_ref()
116    }
117
118    /// Update `qos` value.
119    pub fn set_qos(&mut self, qos: QoS) -> &mut Self {
120        self.qos = qos;
121        self
122    }
123
124    /// Get current `QoS`.
125    #[must_use]
126    pub const fn qos(&self) -> QoS {
127        self.qos
128    }
129
130    /// Set `no_local` flag.
131    pub fn set_no_local(&mut self, no_local: bool) -> &mut Self {
132        self.no_local = no_local;
133        self
134    }
135
136    /// Get `no_local` flag.
137    #[must_use]
138    pub const fn no_local(&self) -> bool {
139        self.no_local
140    }
141
142    /// Update `retain_as_published` flag.
143    pub fn set_retain_as_published(&mut self, retain_as_published: bool) -> &mut Self {
144        self.retain_as_published = retain_as_published;
145        self
146    }
147
148    /// Get `retain_as_published` flag.
149    #[must_use]
150    pub const fn retain_as_published(&self) -> bool {
151        self.retain_as_published
152    }
153
154    /// Update `retain_handling` flag.
155    pub fn set_retain_handling(&mut self, retain_handling: RetainHandling) -> &mut Self {
156        self.retain_handling = retain_handling;
157        self
158    }
159
160    /// Get `retain_handling` flag.
161    #[must_use]
162    pub const fn retain_handling(&self) -> RetainHandling {
163        self.retain_handling
164    }
165
166    pub fn bytes(&self) -> usize {
167        1 + self.topic.bytes()
168    }
169}
170
171impl EncodePacket for SubscribeTopic {
172    fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
173        self.topic.encode(buf)?;
174        let mut flag: u8 = 0b0000_0011 & (self.qos as u8);
175        if self.no_local {
176            flag |= 0b0000_0100;
177        }
178        if self.retain_as_published {
179            flag |= 0b0000_1000;
180        }
181        flag |= 0b0011_0000 & (self.retain_handling as u8);
182        buf.push(flag);
183
184        Ok(self.bytes())
185    }
186}
187
188impl DecodePacket for SubscribeTopic {
189    fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
190        let topic = SubTopic::decode(ba)?;
191
192        let flag = ba.read_byte()?;
193        // Bits 0 and 1 of the Subscription Options represent Maximum QoS field.
194        // This gives the maximum QoS level at which the Server can send
195        // Application Messages to the Client. It is a Protocol Error if
196        // the Maximum QoS field has the value 3.
197        let qos = QoS::try_from(flag & 0b0000_0011)?;
198
199        let no_local = (flag & 0b0000_0100) == 0b0000_0100;
200        let retain_as_published = (flag & 0b0000_1000) == 0b0000_1000;
201        let retain_handling = RetainHandling::try_from(flag & 0b0011_0000)?;
202
203        // Bits 6 and 7 of the Subscription Options byte are reserved for future use.
204        // The Server MUST treat a SUBSCRIBE packet as malformed if any of Reserved bits
205        // in the Payload are non-zero [MQTT-3.8.3-5].
206        if flag & 0b1100_0000 != 0b0000_0000 {
207            return Err(DecodeError::OtherErrors);
208        }
209
210        Ok(Self {
211            topic,
212            qos,
213            no_local,
214            retain_as_published,
215            retain_handling,
216        })
217    }
218}
219
220/// Subscribe packet is sent from the Client to the Server to subscribe one or more topics.
221/// This packet also specifies the maximum `QoS` with which the Server can send Application
222/// message to the Client.
223///
224/// Basic struct of this packet:
225///
226/// ```txt
227/// +----------------------------+
228/// | Fixed header               |
229/// |                            |
230/// +----------------------------+
231/// | Packet Id                  |
232/// |                            |
233/// +----------------------------+
234/// | Properties ...             |
235/// +----------------------------+
236/// | Topic 0 length             |
237/// |                            |
238/// +----------------------------+
239/// | Topic 0 ...                |
240/// +----------------------------+
241/// | Topic 0 QoS                |
242/// +----------------------------+
243/// | Topic 1 length             |
244/// |                            |
245/// +----------------------------+
246/// | Topic 1 ...                |
247/// +----------------------------+
248/// | Tpoic 1 QoS                |
249/// +----------------------------+
250/// | ...                        |
251/// +----------------------------+
252/// ```
253///
254/// Each topic name is followed by associated `QoS` flag.
255///
256/// If a Server receives a Subscribe packet containing a Topic Filter that is identical
257/// to an existing Subscription's Topic Filter then it must completely replace existing
258/// Subscription with a new Subscription. The Topic Filter in the new Subscription will
259/// be identical to the previous Subscription, also `QoS` may be different. Any existing
260/// retained message will be re-sent to the new Subscrption.
261#[allow(clippy::module_name_repetitions)]
262#[derive(Debug, Default, Clone, PartialEq, Eq)]
263pub struct SubscribePacket {
264    /// `packet_id` is used by the Server to reply SubscribeAckPacket to the client.
265    packet_id: PacketId,
266
267    properties: Properties,
268
269    /// A list of topic the Client subscribes to.
270    topics: Vec<SubscribeTopic>,
271}
272
273/// Properties available in subscribe packet.
274pub const SUBSCRIBE_PROPERTIES: &[PropertyType] = &[
275    // The Subscription Identifier can have the value of 1 to 268,435,455.
276    // It is a Protocol Error if the Subscription Identifier has a value of 0.
277    // It is a Protocol Error to include the Subscription Identifier more than once.
278    //
279    // The Subscription Identifier is associated with any subscription created or
280    // modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier,
281    // it is stored with the subscription. If this property is not specified,
282    // then the absence of a Subscription Identifier is stored with the subscription.
283    PropertyType::SubscriptionIdentifier,
284    PropertyType::UserProperty,
285];
286
287impl SubscribePacket {
288    /// Create a new subscribe packet.
289    ///
290    /// # Errors
291    ///
292    /// Returns error if `topic` pattern is invalid.
293    pub fn new(topic: &str, qos: QoS, packet_id: PacketId) -> Result<Self, EncodeError> {
294        let topic = SubscribeTopic::new(topic, qos)?;
295        Ok(Self {
296            packet_id,
297            properties: Properties::new(),
298            topics: vec![topic],
299        })
300    }
301
302    /// Update packet id.
303    pub fn set_packet_id(&mut self, packet_id: PacketId) -> &mut Self {
304        self.packet_id = packet_id;
305        self
306    }
307
308    /// Get current packet id.
309    #[must_use]
310    pub const fn packet_id(&self) -> PacketId {
311        self.packet_id
312    }
313
314    /// Get a mutable reference to property list.
315    pub fn properties_mut(&mut self) -> &mut Properties {
316        &mut self.properties
317    }
318
319    /// Get a reference to property list.
320    #[must_use]
321    pub const fn properties(&self) -> &Properties {
322        &self.properties
323    }
324
325    /// Update topic patterns.
326    pub fn set_topics(&mut self, topics: &[SubscribeTopic]) -> &mut Self {
327        self.topics.clear();
328        self.topics.extend_from_slice(topics);
329        self
330    }
331
332    /// Get a reference to topic patterns.
333    #[must_use]
334    pub fn topics(&self) -> &[SubscribeTopic] {
335        &self.topics
336    }
337
338    /// Get a mutable reference to topic patterns.
339    pub fn mut_topics(&mut self) -> &mut Vec<SubscribeTopic> {
340        &mut self.topics
341    }
342
343    fn get_fixed_header(&self) -> Result<FixedHeader, VarIntError> {
344        let mut remaining_length = PacketId::bytes();
345        for topic in &self.topics {
346            remaining_length += topic.bytes();
347        }
348
349        FixedHeader::new(PacketType::Subscribe, remaining_length)
350    }
351}
352
353impl DecodePacket for SubscribePacket {
354    fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
355        let fixed_header = FixedHeader::decode(ba)?;
356        if fixed_header.packet_type() != PacketType::Subscribe {
357            return Err(DecodeError::InvalidPacketType);
358        }
359
360        let packet_id = PacketId::decode(ba)?;
361        if packet_id.value() == 0 {
362            // SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in cases where QoS > 0) Control Packets
363            // MUST contain a non-zero 16-bit Packet Identifier. [MQTT-2.3.1-1]
364            return Err(DecodeError::InvalidPacketId);
365        }
366
367        let properties = Properties::decode(ba)?;
368        if let Err(property_type) =
369            check_property_type_list(properties.props(), SUBSCRIBE_PROPERTIES)
370        {
371            log::error!(
372                "v5/SubscribePacket: property type {:?} cannot be used in properties!",
373                property_type
374            );
375            return Err(DecodeError::InvalidPropertyType);
376        }
377        if let Err(property_type) = check_multiple_subscription_identifiers(properties.props()) {
378            log::error!(
379                "v5/SubscribePacket: property type {:?} cannot be used in properties!",
380                property_type
381            );
382            return Err(DecodeError::InvalidPropertyType);
383        }
384
385        let mut remaining_length = PacketId::bytes() + properties.bytes();
386        let mut topics = Vec::new();
387
388        // Parse topic/qos list.
389        while remaining_length < fixed_header.remaining_length() {
390            let topic = SubscribeTopic::decode(ba)?;
391            remaining_length += topic.bytes();
392            topics.push(topic);
393        }
394
395        // The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair.
396        // A SUBSCRIBE packet with no payload is a protocol violation [MQTT-3.8.3-3].
397        if topics.is_empty() {
398            return Err(DecodeError::EmptyTopicFilter);
399        }
400
401        Ok(Self {
402            packet_id,
403            properties,
404            topics,
405        })
406    }
407}
408
409impl EncodePacket for SubscribePacket {
410    fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
411        let old_len = buf.len();
412
413        let fixed_header = self.get_fixed_header()?;
414        fixed_header.encode(buf)?;
415
416        // Variable header
417        self.packet_id.encode(buf)?;
418
419        // Payload
420        for topic in &self.topics {
421            topic.encode(buf)?;
422        }
423
424        Ok(buf.len() - old_len)
425    }
426}
427
428impl Packet for SubscribePacket {
429    fn packet_type(&self) -> PacketType {
430        PacketType::Subscribe
431    }
432
433    fn bytes(&self) -> Result<usize, VarIntError> {
434        let fixed_header = self.get_fixed_header()?;
435        Ok(fixed_header.bytes() + fixed_header.remaining_length())
436    }
437}