mqute_codec/protocol/v5/
subscribe.rs

1//! # Subscribe Packet - MQTT v5
2//!
3//! This module implements the MQTT v5 `Subscribe` packet, which is sent by clients to
4//! request subscription to one or more topics. The packet includes detailed subscription
5//! options and properties for each topic filter.
6
7use crate::codec::util::{
8    decode_byte, decode_string, decode_variable_integer, encode_string, encode_variable_integer,
9};
10use crate::codec::{Decode, Encode, RawPacket};
11use crate::protocol::util::len_bytes;
12use crate::protocol::v5::property::{
13    property_decode, property_encode, property_len, Property, PropertyFrame,
14};
15use crate::protocol::v5::util::id_header;
16use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
17use crate::Error;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use std::borrow::Borrow;
20use std::ops::{Index, IndexMut};
21
22/// Properties specific to `Subscribe` packets
23///
24/// In MQTT v5, `Subscribe` packets can include:
25/// - Subscription Identifier (for shared subscriptions)
26/// - User Properties (key-value pairs for extended metadata)
27///
28/// # Example
29///
30/// ```rust
31/// use mqute_codec::protocol::v5::SubscribeProperties;
32///
33/// let properties = SubscribeProperties {
34///     subscription_id: Some(42),  // Shared subscription ID
35///     user_properties: vec![("client".into(), "rust".into())],
36/// };
37/// ```
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SubscribeProperties {
40    /// Identifier for shared subscriptions
41    pub subscription_id: Option<u32>,
42    /// User-defined key-value properties
43    pub user_properties: Vec<(String, String)>,
44}
45
46impl PropertyFrame for SubscribeProperties {
47    /// Calculates the encoded length of the properties
48    fn encoded_len(&self) -> usize {
49        let mut len = 0usize;
50
51        if let Some(value) = self.subscription_id {
52            len += 1 + len_bytes(value as usize);
53        }
54        len += property_len!(&self.user_properties);
55
56        len
57    }
58
59    /// Encodes the properties into a byte buffer
60    fn encode(&self, buf: &mut BytesMut) {
61        if let Some(value) = self.subscription_id {
62            buf.put_u8(Property::SubscriptionIdentifier.into());
63            encode_variable_integer(buf, value).expect("");
64        }
65
66        property_encode!(&self.user_properties, Property::UserProp, buf);
67    }
68
69    /// Decodes properties from a byte buffer
70    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
71    where
72        Self: Sized,
73    {
74        if buf.is_empty() {
75            return Ok(None);
76        }
77
78        let mut subscription_id: Option<u32> = None;
79        let mut user_properties: Vec<(String, String)> = Vec::new();
80
81        while buf.has_remaining() {
82            let property: Property = decode_byte(buf)?.try_into()?;
83            match property {
84                Property::SubscriptionIdentifier => {
85                    if subscription_id.is_some() {
86                        return Err(Error::ProtocolError);
87                    }
88                    subscription_id = Some(decode_variable_integer(buf)? as u32);
89                }
90                Property::UserProp => {
91                    property_decode!(&mut user_properties, buf);
92                }
93                _ => return Err(Error::PropertyMismatch),
94            }
95        }
96
97        Ok(Some(SubscribeProperties {
98            subscription_id,
99            user_properties,
100        }))
101    }
102}
103
104/// Controls how retained messages are handled for subscriptions
105#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
106pub enum RetainHandling {
107    /// Send retained messages at the time of subscribe (default)
108    Send = 0,
109    /// Send retained messages only if subscription is new
110    SendForNewSub = 1,
111    /// Never send retained messages
112    DoNotSend = 2,
113}
114
115impl TryFrom<u8> for RetainHandling {
116    type Error = Error;
117
118    fn try_from(value: u8) -> Result<Self, Self::Error> {
119        match value {
120            0 => Ok(RetainHandling::Send),
121            1 => Ok(RetainHandling::SendForNewSub),
122            2 => Ok(RetainHandling::DoNotSend),
123            n => Err(Error::InvalidRetainHandling(n)),
124        }
125    }
126}
127
128impl From<RetainHandling> for u8 {
129    fn from(value: RetainHandling) -> Self {
130        value as u8
131    }
132}
133
134/// Represents a single topic filter with subscription options
135///
136/// # Example
137///
138/// ```rust
139/// use mqute_codec::protocol::v5::{TopicOptionFilter, RetainHandling};
140/// use mqute_codec::protocol::QoS;
141///
142/// let filter = TopicOptionFilter::new("topic1", QoS::AtLeastOnce, false, true, RetainHandling::DoNotSend);
143/// ```
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct TopicOptionFilter {
146    /// The topic filter to subscribe to
147    pub topic: String,
148    /// Requested QoS level
149    pub qos: QoS,
150    /// If true, messages published by this client won't be received
151    pub no_local: bool,
152    /// If true, retain flag on published messages is kept as-is
153    pub retain_as_published: bool,
154    /// Controls how retained messages are handled
155    pub retain_handling: RetainHandling,
156}
157
158impl TopicOptionFilter {
159    /// Creates a new topic filter with options
160    pub fn new<S: Into<String>>(
161        topic: S,
162        qos: QoS,
163        no_local: bool,
164        retain_as_published: bool,
165        retain_handling: RetainHandling,
166    ) -> Self {
167        TopicOptionFilter {
168            topic: topic.into(),
169            qos,
170            no_local,
171            retain_as_published,
172            retain_handling,
173        }
174    }
175}
176
177/// Collection of topic filters for a subscription
178///
179/// # Example
180///
181/// ```rust
182/// use mqute_codec::protocol::v5::{Subscribe, TopicOptionFilters, TopicOptionFilter, RetainHandling};
183/// use mqute_codec::protocol::QoS;
184///
185/// let filters = vec![
186///     TopicOptionFilter::new("topic1", QoS::AtLeastOnce, false, true, RetainHandling::DoNotSend),
187///     TopicOptionFilter::new("topic2", QoS::ExactlyOnce, true, true, RetainHandling::SendForNewSub),
188/// ];
189/// let topic_filters = TopicOptionFilters::new(filters);
190/// assert_eq!(topic_filters.len(), 2);
191/// ```
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct TopicOptionFilters(Vec<TopicOptionFilter>);
194
195#[allow(clippy::len_without_is_empty)]
196impl TopicOptionFilters {
197    /// Creates a new collection of topic filters
198    ///
199    /// # Panics
200    /// If no filters are provided
201    pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(filters: T) -> Self {
202        let values: Vec<TopicOptionFilter> = filters.into_iter().collect();
203
204        if values.is_empty() {
205            panic!("At least one topic filter is required");
206        }
207
208        TopicOptionFilters(values)
209    }
210
211    /// Returns the number of topic filters in the collection.
212    pub fn len(&self) -> usize {
213        self.0.len()
214    }
215
216    /// Decodes topic filters from payload
217    pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
218        let mut filters = Vec::with_capacity(1);
219
220        while payload.has_remaining() {
221            let topic = decode_string(payload)?;
222            let flags = decode_byte(payload)?;
223
224            // The upper 2 bits of the requested option byte must be zero
225            if flags & 0b1100_0000 > 0 {
226                return Err(Error::MalformedPacket);
227            }
228
229            let qos = (flags & 0x03).try_into()?;
230            let no_local = flags & 0x04 != 0;
231            let retain_as_published = flags & 0x08 != 0;
232            let retain_handling = ((flags >> 4) & 0x03).try_into()?;
233
234            filters.push(TopicOptionFilter::new(
235                topic,
236                qos,
237                no_local,
238                retain_as_published,
239                retain_handling,
240            ));
241        }
242
243        if filters.is_empty() {
244            return Err(Error::NoTopic);
245        }
246
247        Ok(TopicOptionFilters(filters))
248    }
249
250    /// Encodes topic filters into buffer
251    pub(crate) fn encode(&self, buf: &mut BytesMut) {
252        self.0.iter().for_each(|f| {
253            let qos: u8 = f.qos.into();
254            let retain_handling: u8 = f.retain_handling.into();
255
256            let options: u8 = retain_handling << 4
257                | (f.retain_as_published as u8) << 3
258                | (f.no_local as u8) << 2
259                | qos;
260
261            encode_string(buf, &f.topic);
262            buf.put_u8(options);
263        });
264    }
265
266    pub(crate) fn encoded_len(&self) -> usize {
267        self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
268    }
269}
270
271// Various trait implementations for TopicOptionFilters
272impl AsRef<Vec<TopicOptionFilter>> for TopicOptionFilters {
273    #[inline]
274    fn as_ref(&self) -> &Vec<TopicOptionFilter> {
275        &self.0
276    }
277}
278
279impl Borrow<Vec<TopicOptionFilter>> for TopicOptionFilters {
280    fn borrow(&self) -> &Vec<TopicOptionFilter> {
281        &self.0
282    }
283}
284
285impl IntoIterator for TopicOptionFilters {
286    type Item = TopicOptionFilter;
287    type IntoIter = std::vec::IntoIter<TopicOptionFilter>;
288
289    fn into_iter(self) -> Self::IntoIter {
290        self.0.into_iter()
291    }
292}
293
294impl FromIterator<TopicOptionFilter> for TopicOptionFilters {
295    fn from_iter<T: IntoIterator<Item = TopicOptionFilter>>(iter: T) -> Self {
296        TopicOptionFilters(Vec::from_iter(iter))
297    }
298}
299
300impl From<TopicOptionFilters> for Vec<TopicOptionFilter> {
301    #[inline]
302    fn from(value: TopicOptionFilters) -> Self {
303        value.0
304    }
305}
306
307impl From<Vec<TopicOptionFilter>> for TopicOptionFilters {
308    #[inline]
309    fn from(value: Vec<TopicOptionFilter>) -> Self {
310        TopicOptionFilters(value)
311    }
312}
313
314impl Index<usize> for TopicOptionFilters {
315    type Output = TopicOptionFilter;
316
317    fn index(&self, index: usize) -> &Self::Output {
318        self.0.index(index)
319    }
320}
321
322impl IndexMut<usize> for TopicOptionFilters {
323    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
324        self.0.index_mut(index)
325    }
326}
327
328// Internal header structure for `Subscribe` packets
329id_header!(SubscribeHeader, SubscribeProperties);
330
331/// Represents an MQTT v5 `Subscribe` packet
332///
333/// Used to request subscription to one or more topics with various options:
334/// - QoS levels
335/// - Retain handling preferences
336/// - Local message filtering
337///
338/// # Example
339///
340/// ```rust
341/// use mqute_codec::protocol::v5::{Subscribe, TopicOptionFilter, RetainHandling};
342/// use mqute_codec::protocol::QoS;
343///
344/// let subscribe = Subscribe::new(
345///     1234,
346///     None,
347///     vec![
348///         TopicOptionFilter::new(
349///             "sensors/temperature",
350///             QoS::AtLeastOnce,
351///             false,
352///             true,
353///             RetainHandling::Send
354///         ),
355///         TopicOptionFilter::new(
356///             "control/#",
357///             QoS::ExactlyOnce,
358///             true,
359///             false,
360///             RetainHandling::SendForNewSub
361///         )
362///     ]
363/// );
364///
365/// let filters = subscribe.filters();
366/// assert_eq!(filters[0],
367///            TopicOptionFilter::new(
368///                             "sensors/temperature",
369///                             QoS::AtLeastOnce,
370///                             false,
371///                             true,
372///                             RetainHandling::Send
373///                         ));
374/// ```
375#[derive(Debug, Clone, PartialEq, Eq)]
376pub struct Subscribe {
377    header: SubscribeHeader,
378    filters: TopicOptionFilters,
379}
380
381impl Subscribe {
382    /// Creates a new `Subscribe` packet
383    pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(
384        packet_id: u16,
385        properties: Option<SubscribeProperties>,
386        filters: T,
387    ) -> Self {
388        let header = SubscribeHeader::new(packet_id, properties);
389        let filters = TopicOptionFilters::new(filters);
390
391        Subscribe { header, filters }
392    }
393
394    /// Returns the packet identifier
395    pub fn packet_id(&self) -> u16 {
396        self.header.packet_id
397    }
398
399    /// Returns the subscription properties
400    pub fn properties(&self) -> Option<SubscribeProperties> {
401        self.header.properties.clone()
402    }
403
404    /// Returns the collection of topic filters
405    pub fn filters(&self) -> TopicOptionFilters {
406        self.filters.clone()
407    }
408}
409
410impl Encode for Subscribe {
411    /// Encodes the `Subscribe` packet into a byte buffer
412    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
413        let header = FixedHeader::with_flags(
414            PacketType::Subscribe,
415            Flags::new(QoS::AtLeastOnce),
416            self.payload_len(),
417        );
418        header.encode(buf)?;
419
420        self.header.encode(buf)?;
421        self.filters.encode(buf);
422
423        Ok(())
424    }
425
426    /// Calculates the total packet length
427    fn payload_len(&self) -> usize {
428        self.header.encoded_len() + self.filters.encoded_len()
429    }
430}
431
432impl Decode for Subscribe {
433    /// Decodes a `Subscribe` packet from raw bytes
434    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
435        // Validate header flags
436        if packet.header.packet_type() != PacketType::Subscribe
437            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
438        {
439            return Err(Error::MalformedPacket);
440        }
441
442        let header = SubscribeHeader::decode(&mut packet.payload)?;
443        let filters = TopicOptionFilters::decode(&mut packet.payload)?;
444
445        Ok(Subscribe::new(header.packet_id, header.properties, filters))
446    }
447}