mqute_codec/protocol/v5/
subscribe.rs

1//! # Subscribe Packet - MQTT v5
2//!
3//! This module implements the MQTT v5 `Subscribe` packet, which is sent by clients to
4//! request subscription to one or more topics. The packet includes detailed subscription
5//! options and properties for each topic filter.
6
7use crate::Error;
8use crate::codec::util::{
9    decode_byte, decode_string, decode_variable_integer, encode_string, encode_variable_integer,
10};
11use crate::codec::{Decode, Encode, RawPacket};
12use crate::protocol::util::len_bytes;
13use crate::protocol::v5::property::{
14    Property, PropertyFrame, property_decode, property_encode, property_len,
15};
16use crate::protocol::v5::util::id_header;
17use crate::protocol::{FixedHeader, Flags, PacketType, QoS, util};
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use std::borrow::Borrow;
20use std::ops::{Index, IndexMut};
21
22/// Properties specific to `Subscribe` packets
23///
24/// In MQTT v5, `Subscribe` packets can include:
25/// - Subscription Identifier (for shared subscriptions)
26/// - User Properties (key-value pairs for extended metadata)
27///
28/// # Example
29///
30/// ```rust
31/// use mqute_codec::protocol::v5::SubscribeProperties;
32///
33/// let properties = SubscribeProperties {
34///     subscription_id: Some(42),  // Shared subscription ID
35///     user_properties: vec![("client".into(), "rust".into())],
36/// };
37/// ```
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SubscribeProperties {
40    /// Identifier for shared subscriptions
41    pub subscription_id: Option<u32>,
42    /// User-defined key-value properties
43    pub user_properties: Vec<(String, String)>,
44}
45
46impl PropertyFrame for SubscribeProperties {
47    /// Calculates the encoded length of the properties
48    fn encoded_len(&self) -> usize {
49        let mut len = 0usize;
50
51        if let Some(value) = self.subscription_id {
52            len += 1 + len_bytes(value as usize);
53        }
54        len += property_len!(&self.user_properties);
55
56        len
57    }
58
59    /// Encodes the properties into a byte buffer
60    fn encode(&self, buf: &mut BytesMut) {
61        if let Some(value) = self.subscription_id {
62            buf.put_u8(Property::SubscriptionIdentifier.into());
63            encode_variable_integer(buf, value).expect("");
64        }
65
66        property_encode!(&self.user_properties, Property::UserProp, buf);
67    }
68
69    /// Decodes properties from a byte buffer
70    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
71    where
72        Self: Sized,
73    {
74        if buf.is_empty() {
75            return Ok(None);
76        }
77
78        let mut subscription_id: Option<u32> = None;
79        let mut user_properties: Vec<(String, String)> = Vec::new();
80
81        while buf.has_remaining() {
82            let property: Property = decode_byte(buf)?.try_into()?;
83            match property {
84                Property::SubscriptionIdentifier => {
85                    if subscription_id.is_some() {
86                        return Err(Error::ProtocolError);
87                    }
88                    subscription_id = Some(decode_variable_integer(buf)? as u32);
89                }
90                Property::UserProp => {
91                    property_decode!(&mut user_properties, buf);
92                }
93                _ => return Err(Error::PropertyMismatch),
94            }
95        }
96
97        Ok(Some(SubscribeProperties {
98            subscription_id,
99            user_properties,
100        }))
101    }
102}
103
104/// Controls how retained messages are handled for subscriptions
105#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
106pub enum RetainHandling {
107    /// Send retained messages at the time of subscribe (default)
108    Send = 0,
109    /// Send retained messages only if subscription is new
110    SendForNewSub = 1,
111    /// Never send retained messages
112    DoNotSend = 2,
113}
114
115impl TryFrom<u8> for RetainHandling {
116    type Error = Error;
117
118    fn try_from(value: u8) -> Result<Self, Self::Error> {
119        match value {
120            0 => Ok(RetainHandling::Send),
121            1 => Ok(RetainHandling::SendForNewSub),
122            2 => Ok(RetainHandling::DoNotSend),
123            n => Err(Error::InvalidRetainHandling(n)),
124        }
125    }
126}
127
128impl From<RetainHandling> for u8 {
129    fn from(value: RetainHandling) -> Self {
130        value as u8
131    }
132}
133
134/// Represents a single topic filter with subscription options
135///
136/// # Example
137///
138/// ```rust
139/// use mqute_codec::protocol::v5::{TopicOptionFilter, RetainHandling};
140/// use mqute_codec::protocol::QoS;
141///
142/// let filter = TopicOptionFilter::new("topic1", QoS::AtLeastOnce, false, true, RetainHandling::DoNotSend);
143/// ```
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct TopicOptionFilter {
146    /// The topic filter to subscribe to
147    pub topic: String,
148    /// Requested QoS level
149    pub qos: QoS,
150    /// If true, messages published by this client won't be received
151    pub no_local: bool,
152    /// If true, retain flag on published messages is kept as-is
153    pub retain_as_published: bool,
154    /// Controls how retained messages are handled
155    pub retain_handling: RetainHandling,
156}
157
158impl TopicOptionFilter {
159    /// Creates a new topic filter with options
160    ///
161    /// # Panics
162    ///
163    /// Panics if the iterator is empty, as at least one topic filter is required.
164    pub fn new<S: Into<String>>(
165        topic: S,
166        qos: QoS,
167        no_local: bool,
168        retain_as_published: bool,
169        retain_handling: RetainHandling,
170    ) -> Self {
171        let topic = topic.into();
172
173        if !util::is_valid_topic_filter(&topic) {
174            panic!("Invalid topic filter: '{}'", topic);
175        }
176
177        TopicOptionFilter {
178            topic,
179            qos,
180            no_local,
181            retain_as_published,
182            retain_handling,
183        }
184    }
185}
186
187/// Collection of topic filters for a subscription
188///
189/// # Example
190///
191/// ```rust
192/// use mqute_codec::protocol::v5::{Subscribe, TopicOptionFilters, TopicOptionFilter, RetainHandling};
193/// use mqute_codec::protocol::QoS;
194///
195/// let filters = vec![
196///     TopicOptionFilter::new("topic1", QoS::AtLeastOnce, false, true, RetainHandling::DoNotSend),
197///     TopicOptionFilter::new("topic2", QoS::ExactlyOnce, true, true, RetainHandling::SendForNewSub),
198/// ];
199/// let topic_filters = TopicOptionFilters::new(filters);
200/// assert_eq!(topic_filters.len(), 2);
201/// ```
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct TopicOptionFilters(Vec<TopicOptionFilter>);
204
205#[allow(clippy::len_without_is_empty)]
206impl TopicOptionFilters {
207    /// Creates a new collection of topic filters
208    ///
209    /// # Panics
210    ///
211    /// Panics if:
212    /// - No filters are provided.
213    /// - The topic filters are invalid according to MQTT topic naming rules.
214    pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(filters: T) -> Self {
215        let values: Vec<TopicOptionFilter> = filters.into_iter().collect();
216
217        if values.is_empty() {
218            panic!("At least one topic filter is required");
219        }
220
221        TopicOptionFilters(values)
222    }
223
224    /// Returns the number of topic filters in the collection.
225    pub fn len(&self) -> usize {
226        self.0.len()
227    }
228
229    /// Decodes topic filters from payload
230    pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
231        let mut filters = Vec::with_capacity(1);
232
233        while payload.has_remaining() {
234            let topic = decode_string(payload)?;
235
236            if !util::is_valid_topic_filter(&topic) {
237                return Err(Error::InvalidTopicFilter(topic));
238            }
239
240            let flags = decode_byte(payload)?;
241
242            // The upper 2 bits of the requested option byte must be zero
243            if flags & 0b1100_0000 > 0 {
244                return Err(Error::MalformedPacket);
245            }
246
247            let qos = (flags & 0x03).try_into()?;
248            let no_local = flags & 0x04 != 0;
249            let retain_as_published = flags & 0x08 != 0;
250            let retain_handling = ((flags >> 4) & 0x03).try_into()?;
251
252            filters.push(TopicOptionFilter::new(
253                topic,
254                qos,
255                no_local,
256                retain_as_published,
257                retain_handling,
258            ));
259        }
260
261        if filters.is_empty() {
262            return Err(Error::NoTopic);
263        }
264
265        Ok(TopicOptionFilters(filters))
266    }
267
268    /// Encodes topic filters into buffer
269    pub(crate) fn encode(&self, buf: &mut BytesMut) {
270        self.0.iter().for_each(|f| {
271            let qos: u8 = f.qos.into();
272            let retain_handling: u8 = f.retain_handling.into();
273
274            let options: u8 = retain_handling << 4
275                | (f.retain_as_published as u8) << 3
276                | (f.no_local as u8) << 2
277                | qos;
278
279            encode_string(buf, &f.topic);
280            buf.put_u8(options);
281        });
282    }
283
284    pub(crate) fn encoded_len(&self) -> usize {
285        self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
286    }
287}
288
289// Various trait implementations for TopicOptionFilters
290impl AsRef<Vec<TopicOptionFilter>> for TopicOptionFilters {
291    #[inline]
292    fn as_ref(&self) -> &Vec<TopicOptionFilter> {
293        &self.0
294    }
295}
296
297impl Borrow<Vec<TopicOptionFilter>> for TopicOptionFilters {
298    fn borrow(&self) -> &Vec<TopicOptionFilter> {
299        &self.0
300    }
301}
302
303impl IntoIterator for TopicOptionFilters {
304    type Item = TopicOptionFilter;
305    type IntoIter = std::vec::IntoIter<TopicOptionFilter>;
306
307    fn into_iter(self) -> Self::IntoIter {
308        self.0.into_iter()
309    }
310}
311
312impl FromIterator<TopicOptionFilter> for TopicOptionFilters {
313    fn from_iter<T: IntoIterator<Item = TopicOptionFilter>>(iter: T) -> Self {
314        TopicOptionFilters(Vec::from_iter(iter))
315    }
316}
317
318impl From<TopicOptionFilters> for Vec<TopicOptionFilter> {
319    #[inline]
320    fn from(value: TopicOptionFilters) -> Self {
321        value.0
322    }
323}
324
325impl From<Vec<TopicOptionFilter>> for TopicOptionFilters {
326    #[inline]
327    fn from(value: Vec<TopicOptionFilter>) -> Self {
328        TopicOptionFilters(value)
329    }
330}
331
332impl Index<usize> for TopicOptionFilters {
333    type Output = TopicOptionFilter;
334
335    fn index(&self, index: usize) -> &Self::Output {
336        self.0.index(index)
337    }
338}
339
340impl IndexMut<usize> for TopicOptionFilters {
341    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
342        self.0.index_mut(index)
343    }
344}
345
346// Internal header structure for `Subscribe` packets
347id_header!(SubscribeHeader, SubscribeProperties);
348
349/// Represents an MQTT v5 `Subscribe` packet
350///
351/// Used to request subscription to one or more topics with various options:
352/// - QoS levels
353/// - Retain handling preferences
354/// - Local message filtering
355///
356/// # Example
357///
358/// ```rust
359/// use mqute_codec::protocol::v5::{Subscribe, TopicOptionFilter, RetainHandling};
360/// use mqute_codec::protocol::QoS;
361///
362/// let subscribe = Subscribe::new(
363///     1234,
364///     None,
365///     vec![
366///         TopicOptionFilter::new(
367///             "sensors/temperature",
368///             QoS::AtLeastOnce,
369///             false,
370///             true,
371///             RetainHandling::Send
372///         ),
373///         TopicOptionFilter::new(
374///             "control/#",
375///             QoS::ExactlyOnce,
376///             true,
377///             false,
378///             RetainHandling::SendForNewSub
379///         )
380///     ]
381/// );
382///
383/// let filters = subscribe.filters();
384/// assert_eq!(filters[0],
385///            TopicOptionFilter::new(
386///                             "sensors/temperature",
387///                             QoS::AtLeastOnce,
388///                             false,
389///                             true,
390///                             RetainHandling::Send
391///                         ));
392/// ```
393#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct Subscribe {
395    header: SubscribeHeader,
396    filters: TopicOptionFilters,
397}
398
399impl Subscribe {
400    /// Creates a new `Subscribe` packet
401    pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(
402        packet_id: u16,
403        properties: Option<SubscribeProperties>,
404        filters: T,
405    ) -> Self {
406        let header = SubscribeHeader::new(packet_id, properties);
407        let filters = TopicOptionFilters::new(filters);
408
409        Subscribe { header, filters }
410    }
411
412    /// Returns the packet identifier
413    pub fn packet_id(&self) -> u16 {
414        self.header.packet_id
415    }
416
417    /// Returns the subscription properties
418    pub fn properties(&self) -> Option<SubscribeProperties> {
419        self.header.properties.clone()
420    }
421
422    /// Returns the collection of topic filters
423    pub fn filters(&self) -> TopicOptionFilters {
424        self.filters.clone()
425    }
426}
427
428impl Encode for Subscribe {
429    /// Encodes the `Subscribe` packet into a byte buffer
430    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
431        let header = FixedHeader::with_flags(
432            PacketType::Subscribe,
433            Flags::new(QoS::AtLeastOnce),
434            self.payload_len(),
435        );
436        header.encode(buf)?;
437
438        self.header.encode(buf)?;
439        self.filters.encode(buf);
440
441        Ok(())
442    }
443
444    /// Calculates the total packet length
445    fn payload_len(&self) -> usize {
446        self.header.encoded_len() + self.filters.encoded_len()
447    }
448}
449
450impl Decode for Subscribe {
451    /// Decodes a `Subscribe` packet from raw bytes
452    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
453        // Validate header flags
454        if packet.header.packet_type() != PacketType::Subscribe
455            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
456        {
457            return Err(Error::MalformedPacket);
458        }
459
460        let header = SubscribeHeader::decode(&mut packet.payload)?;
461        let filters = TopicOptionFilters::decode(&mut packet.payload)?;
462
463        Ok(Subscribe::new(header.packet_id, header.properties, filters))
464    }
465}