mqute_codec/protocol/v4/
subscribe.rs

1//! # Subscribe Packet V4
2//!
3//! This module defines the `Subscribe` packet and related structures (`TopicQosFilter` and
4//! `TopicQosFilters`) used in the MQTT protocol to handle subscription requests. The `Subscribe`
5//! packet contains a list of topic filters and their requested QoS levels.
6
7use crate::Error;
8use crate::codec::util::{decode_byte, decode_string, decode_word, encode_string};
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, traits, util};
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::borrow::Borrow;
13
14/// Represents a single topic filter and its requested QoS level.
15///
16/// # Example
17///
18/// ```rust
19/// use mqute_codec::protocol::v4::TopicQosFilter;
20/// use mqute_codec::protocol::QoS;
21///
22/// let filter = TopicQosFilter::new("topic1", QoS::AtLeastOnce);
23/// assert_eq!(filter.topic, "topic1");
24/// assert_eq!(filter.qos, QoS::AtLeastOnce);
25/// ```
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct TopicQosFilter {
28    /// The topic filter for the subscription.
29    pub topic: String,
30
31    /// The requested QoS level for the subscription.
32    pub qos: QoS,
33}
34
35impl TopicQosFilter {
36    /// Creates a new `TopicQosFilter` instance with the specified topic and QoS level.
37    ///
38    /// # Panics
39    ///
40    /// Panics if the topic filter is invalid according to MQTT specification rules.
41    pub fn new<T: Into<String>>(topic: T, qos: QoS) -> Self {
42        let topic = topic.into();
43
44        if !util::is_valid_topic_filter(&topic) {
45            panic!("Invalid topic filter: '{}'", topic);
46        }
47
48        Self { topic, qos }
49    }
50}
51
52/// Represents a collection of `TopicQosFilter` instances.
53///
54/// # Example
55///
56/// ```rust
57/// use mqute_codec::protocol::v4::{Subscribe, TopicQosFilters, TopicQosFilter};
58/// use mqute_codec::protocol::QoS;
59///
60/// let topic_filters = TopicQosFilters::new(vec![
61///         TopicQosFilter::new("topic1", QoS::AtLeastOnce),
62///         TopicQosFilter::new("topic2", QoS::ExactlyOnce),
63///     ]);
64///
65/// assert_eq!(topic_filters.len(), 2);
66/// ```
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct TopicQosFilters(Vec<TopicQosFilter>);
69
70#[allow(clippy::len_without_is_empty)]
71impl TopicQosFilters {
72    /// Creates a new `TopicQosFilters` instance from an iterator of `TopicQosFilter`.
73    ///
74    /// # Panics
75    ///
76    /// Panics if:
77    /// - The iterator is empty, as at least one topic filter is required.
78    /// - The topic filters are invalid according to MQTT topic naming rules.
79    pub fn new<T: IntoIterator<Item = TopicQosFilter>>(filters: T) -> Self {
80        let values: Vec<TopicQosFilter> = filters.into_iter().collect();
81
82        if values.is_empty() {
83            panic!("At least one topic filter is required");
84        }
85
86        TopicQosFilters(values)
87    }
88
89    /// Returns the number of topic filters in the collection.
90    pub fn len(&self) -> usize {
91        self.0.len()
92    }
93
94    pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
95        let mut filters = Vec::with_capacity(1);
96
97        while payload.has_remaining() {
98            let filter = decode_string(payload)?;
99
100            if !util::is_valid_topic_filter(&filter) {
101                return Err(Error::InvalidTopicFilter(filter));
102            }
103
104            let flags = decode_byte(payload)?;
105
106            // The upper 6 bits of the Requested QoS byte must be zero
107            if flags & 0b1111_1100 > 0 {
108                return Err(Error::MalformedPacket);
109            }
110
111            filters.push(TopicQosFilter::new(filter, flags.try_into()?));
112        }
113
114        if filters.is_empty() {
115            return Err(Error::NoTopic);
116        }
117
118        Ok(TopicQosFilters(filters))
119    }
120
121    pub(crate) fn encode(&self, buf: &mut BytesMut) {
122        self.0.iter().for_each(|f| {
123            encode_string(buf, &f.topic);
124            buf.put_u8(f.qos.into());
125        });
126    }
127
128    pub(crate) fn encoded_len(&self) -> usize {
129        self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
130    }
131}
132
133impl AsRef<Vec<TopicQosFilter>> for TopicQosFilters {
134    #[inline]
135    fn as_ref(&self) -> &Vec<TopicQosFilter> {
136        self.0.as_ref()
137    }
138}
139
140impl Borrow<Vec<TopicQosFilter>> for TopicQosFilters {
141    fn borrow(&self) -> &Vec<TopicQosFilter> {
142        self.0.as_ref()
143    }
144}
145
146impl IntoIterator for TopicQosFilters {
147    type Item = TopicQosFilter;
148    type IntoIter = std::vec::IntoIter<TopicQosFilter>;
149
150    fn into_iter(self) -> Self::IntoIter {
151        self.0.into_iter()
152    }
153}
154
155impl FromIterator<TopicQosFilter> for TopicQosFilters {
156    fn from_iter<T: IntoIterator<Item = TopicQosFilter>>(iter: T) -> Self {
157        TopicQosFilters(Vec::from_iter(iter))
158    }
159}
160
161impl From<TopicQosFilters> for Vec<TopicQosFilter> {
162    #[inline]
163    fn from(value: TopicQosFilters) -> Self {
164        value.0
165    }
166}
167
168impl From<Vec<TopicQosFilter>> for TopicQosFilters {
169    #[inline]
170    fn from(value: Vec<TopicQosFilter>) -> Self {
171        TopicQosFilters(value)
172    }
173}
174
175/// Represents an MQTT `Subscribe` packet.
176///
177/// # Example
178///
179/// ```rust
180/// use mqute_codec::protocol::v4::{Subscribe, TopicQosFilter};
181/// use mqute_codec::protocol::QoS;
182///
183/// let filters = vec![
184///     TopicQosFilter::new("topic1", QoS::AtLeastOnce),
185///     TopicQosFilter::new("topic2", QoS::ExactlyOnce),
186/// ];
187/// let subscribe = Subscribe::new(123, filters);
188/// assert_eq!(subscribe.packet_id(), 123);
189/// ```
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct Subscribe {
192    /// The packet ID for the `Subscribe` packet.
193    packet_id: u16,
194
195    /// The list of topic filters and their requested QoS levels.
196    filters: TopicQosFilters,
197}
198
199impl Subscribe {
200    /// Creates a new `Subscribe` packet.
201    ///
202    /// # Panics
203    ///
204    /// Panics if `packet_id` is zero.
205    pub fn new<T: IntoIterator<Item = TopicQosFilter>>(packet_id: u16, filters: T) -> Self {
206        if packet_id == 0 {
207            panic!("Packet id is zero");
208        }
209
210        let filters = filters.into_iter().collect();
211        Subscribe { packet_id, filters }
212    }
213
214    /// Returns the packet ID of the `Subscribe` packet.
215    pub fn packet_id(&self) -> u16 {
216        self.packet_id
217    }
218
219    /// Returns the list of topic filters and their requested QoS levels.
220    pub fn filters(&self) -> TopicQosFilters {
221        self.filters.clone()
222    }
223}
224
225impl Decode for Subscribe {
226    /// Decodes a `Subscribe` packet from a raw MQTT packet.
227    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
228        // Validate header flags
229        if packet.header.packet_type() != PacketType::Subscribe
230            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
231        {
232            return Err(Error::MalformedPacket);
233        }
234
235        let packet_id = decode_word(&mut packet.payload)?;
236        let filters = TopicQosFilters::decode(&mut packet.payload)?;
237
238        Ok(Subscribe::new(packet_id, filters))
239    }
240}
241
242impl Encode for Subscribe {
243    /// Encodes the `Subscribe` packet into a byte buffer.
244    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
245        let header = FixedHeader::with_flags(
246            PacketType::Subscribe,
247            Flags::new(QoS::AtLeastOnce),
248            self.payload_len(),
249        );
250        header.encode(buf)?;
251        buf.put_u16(self.packet_id);
252        self.filters.encode(buf);
253
254        Ok(())
255    }
256
257    /// Returns the length of the `Subscribe` packet payload.
258    fn payload_len(&self) -> usize {
259        // Packet ID and filter list
260        2 + self.filters.encoded_len()
261    }
262}
263
264impl traits::Subscribe for Subscribe {}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::codec::PacketCodec;
270    use crate::protocol::QoS;
271    use bytes::BytesMut;
272    use tokio_util::codec::Decoder;
273
274    #[test]
275    fn subscribe_decode() {
276        let mut codec = PacketCodec::new(None, None);
277
278        let data = &[
279            (PacketType::Subscribe as u8) << 4 | 0b0010, // Packet type
280            0x0c,                                        // Remaining len
281            0x12,
282            0x34,
283            0x00,
284            0x02,
285            b'/',
286            b'a',
287            0x00,
288            0x00,
289            0x02,
290            b'/',
291            b'b',
292            0x02,
293        ];
294
295        let mut stream = BytesMut::new();
296
297        stream.extend_from_slice(&data[..]);
298
299        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
300        let packet = Subscribe::decode(raw_packet).unwrap();
301
302        assert_eq!(
303            packet,
304            Subscribe::new(
305                0x1234,
306                vec![
307                    TopicQosFilter::new("/a", QoS::AtMostOnce),
308                    TopicQosFilter::new("/b", QoS::ExactlyOnce)
309                ]
310            )
311        );
312    }
313
314    #[test]
315    fn subscribe_encode() {
316        let packet = Subscribe::new(
317            0x1234,
318            vec![
319                TopicQosFilter::new("/a", QoS::AtMostOnce),
320                TopicQosFilter::new("/b", QoS::ExactlyOnce),
321            ],
322        );
323
324        let mut stream = BytesMut::new();
325        packet.encode(&mut stream).unwrap();
326        assert_eq!(
327            stream,
328            vec![
329                (PacketType::Subscribe as u8) << 4 | 0b0010, // Packet type
330                0x0c,                                        // Remaining len
331                0x12,
332                0x34,
333                0x00,
334                0x02,
335                b'/',
336                b'a',
337                0x00,
338                0x00,
339                0x02,
340                b'/',
341                b'b',
342                0x02,
343            ]
344        );
345    }
346}