mqute_codec/protocol/v4/
subscribe.rs

1//! # Subscribe Packet V4
2//!
3//! This module defines the `Subscribe` packet and related structures (`TopicQosFilter` and
4//! `TopicQosFilters`) used in the MQTT protocol to handle subscription requests. The `Subscribe`
5//! packet contains a list of topic filters and their requested QoS levels.
6
7use crate::codec::util::{decode_byte, decode_string, decode_word, encode_string};
8use crate::codec::{Decode, Encode, RawPacket};
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
10use crate::Error;
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::borrow::Borrow;
13
14/// Represents a single topic filter and its requested QoS level.
15///
16/// # Example
17///
18/// ```rust
19/// use mqute_codec::protocol::v4::TopicQosFilter;
20/// use mqute_codec::protocol::QoS;
21///
22/// let filter = TopicQosFilter::new("topic1", QoS::AtLeastOnce);
23/// assert_eq!(filter.topic, "topic1");
24/// assert_eq!(filter.qos, QoS::AtLeastOnce);
25/// ```
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct TopicQosFilter {
28    /// The topic filter for the subscription.
29    pub topic: String,
30
31    /// The requested QoS level for the subscription.
32    pub qos: QoS,
33}
34
35impl TopicQosFilter {
36    /// Creates a new `TopicQosFilter` instance.
37    pub fn new<T: Into<String>>(topic: T, qos: QoS) -> Self {
38        Self {
39            topic: topic.into(),
40            qos,
41        }
42    }
43}
44
45/// Represents a collection of `TopicQosFilter` instances.
46///
47/// # Example
48///
49/// ```rust
50/// use mqute_codec::protocol::v4::{Subscribe, TopicQosFilters, TopicQosFilter};
51/// use mqute_codec::protocol::QoS;
52///
53/// let topic_filters = TopicQosFilters::new(vec![
54///         TopicQosFilter::new("topic1", QoS::AtLeastOnce),
55///         TopicQosFilter::new("topic2", QoS::ExactlyOnce),
56///     ]);
57///
58/// assert_eq!(topic_filters.len(), 2);
59/// ```
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct TopicQosFilters(Vec<TopicQosFilter>);
62
63#[allow(clippy::len_without_is_empty)]
64impl TopicQosFilters {
65    /// Creates a new `TopicQosFilters` instance from an iterator of `TopicQosFilter`.
66    ///
67    /// # Panics
68    ///
69    /// Panics if the iterator is empty, as at least one topic filter is required.
70    pub fn new<T: IntoIterator<Item = TopicQosFilter>>(filters: T) -> Self {
71        let values: Vec<TopicQosFilter> = filters.into_iter().collect();
72
73        if values.is_empty() {
74            panic!("At least one topic filter is required");
75        }
76
77        TopicQosFilters(values)
78    }
79
80    /// Returns the number of topic filters in the collection.
81    pub fn len(&self) -> usize {
82        self.0.len()
83    }
84
85    pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
86        let mut filters = Vec::with_capacity(1);
87
88        while payload.has_remaining() {
89            let filter = decode_string(payload)?;
90            let flags = decode_byte(payload)?;
91
92            // The upper 6 bits of the Requested QoS byte must be zero
93            if flags & 0b1111_1100 > 0 {
94                return Err(Error::MalformedPacket);
95            }
96
97            filters.push(TopicQosFilter::new(filter, flags.try_into()?));
98        }
99
100        if filters.is_empty() {
101            return Err(Error::NoTopic);
102        }
103
104        Ok(TopicQosFilters(filters))
105    }
106
107    pub(crate) fn encode(&self, buf: &mut BytesMut) {
108        self.0.iter().for_each(|f| {
109            encode_string(buf, &f.topic);
110            buf.put_u8(f.qos.into());
111        });
112    }
113
114    pub(crate) fn encoded_len(&self) -> usize {
115        self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
116    }
117}
118
119impl AsRef<Vec<TopicQosFilter>> for TopicQosFilters {
120    #[inline]
121    fn as_ref(&self) -> &Vec<TopicQosFilter> {
122        self.0.as_ref()
123    }
124}
125
126impl Borrow<Vec<TopicQosFilter>> for TopicQosFilters {
127    fn borrow(&self) -> &Vec<TopicQosFilter> {
128        self.0.as_ref()
129    }
130}
131
132impl IntoIterator for TopicQosFilters {
133    type Item = TopicQosFilter;
134    type IntoIter = std::vec::IntoIter<TopicQosFilter>;
135
136    fn into_iter(self) -> Self::IntoIter {
137        self.0.into_iter()
138    }
139}
140
141impl FromIterator<TopicQosFilter> for TopicQosFilters {
142    fn from_iter<T: IntoIterator<Item = TopicQosFilter>>(iter: T) -> Self {
143        TopicQosFilters(Vec::from_iter(iter))
144    }
145}
146
147impl From<TopicQosFilters> for Vec<TopicQosFilter> {
148    #[inline]
149    fn from(value: TopicQosFilters) -> Self {
150        value.0
151    }
152}
153
154impl From<Vec<TopicQosFilter>> for TopicQosFilters {
155    #[inline]
156    fn from(value: Vec<TopicQosFilter>) -> Self {
157        TopicQosFilters(value)
158    }
159}
160
161/// Represents an MQTT `Subscribe` packet.
162///
163/// # Example
164///
165/// ```rust
166/// use mqute_codec::protocol::v4::{Subscribe, TopicQosFilter};
167/// use mqute_codec::protocol::QoS;
168///
169/// let filters = vec![
170///     TopicQosFilter::new("topic1", QoS::AtLeastOnce),
171///     TopicQosFilter::new("topic2", QoS::ExactlyOnce),
172/// ];
173/// let subscribe = Subscribe::new(123, filters);
174/// assert_eq!(subscribe.packet_id(), 123);
175/// ```
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Subscribe {
178    /// The packet ID for the `Subscribe` packet.
179    packet_id: u16,
180
181    /// The list of topic filters and their requested QoS levels.
182    filters: TopicQosFilters,
183}
184
185impl Subscribe {
186    /// Creates a new `Subscribe` packet.
187    ///
188    /// # Panics
189    ///
190    /// Panics if `packet_id` is zero.
191    pub fn new<T: IntoIterator<Item = TopicQosFilter>>(packet_id: u16, filters: T) -> Self {
192        if packet_id == 0 {
193            panic!("Packet id is zero");
194        }
195
196        let filters = filters.into_iter().collect();
197        Subscribe { packet_id, filters }
198    }
199
200    /// Returns the packet ID of the `Subscribe` packet.
201    pub fn packet_id(&self) -> u16 {
202        self.packet_id
203    }
204
205    /// Returns the list of topic filters and their requested QoS levels.
206    pub fn filters(&self) -> TopicQosFilters {
207        self.filters.clone()
208    }
209}
210
211impl Decode for Subscribe {
212    /// Decodes a `Subscribe` packet from a raw MQTT packet.
213    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
214        // Validate header flags
215        if packet.header.packet_type() != PacketType::Subscribe
216            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
217        {
218            return Err(Error::MalformedPacket);
219        }
220
221        let packet_id = decode_word(&mut packet.payload)?;
222        let filters = TopicQosFilters::decode(&mut packet.payload)?;
223
224        Ok(Subscribe::new(packet_id, filters))
225    }
226}
227
228impl Encode for Subscribe {
229    /// Encodes the `Subscribe` packet into a byte buffer.
230    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
231        let header = FixedHeader::with_flags(
232            PacketType::Subscribe,
233            Flags::new(QoS::AtLeastOnce),
234            self.payload_len(),
235        );
236        header.encode(buf)?;
237        buf.put_u16(self.packet_id);
238        self.filters.encode(buf);
239
240        Ok(())
241    }
242
243    /// Returns the length of the `Subscribe` packet payload.
244    fn payload_len(&self) -> usize {
245        // Packet ID and filter list
246        2 + self.filters.encoded_len()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::codec::PacketCodec;
254    use crate::protocol::QoS;
255    use bytes::BytesMut;
256    use tokio_util::codec::Decoder;
257
258    #[test]
259    fn subscribe_decode() {
260        let mut codec = PacketCodec::new(None, None);
261
262        let data = &[
263            (PacketType::Subscribe as u8) << 4 | 0b0010, // Packet type
264            0x0c,                                        // Remaining len
265            0x12,
266            0x34,
267            0x00,
268            0x02,
269            b'/',
270            b'a',
271            0x00,
272            0x00,
273            0x02,
274            b'/',
275            b'b',
276            0x02,
277        ];
278
279        let mut stream = BytesMut::new();
280
281        stream.extend_from_slice(&data[..]);
282
283        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
284        let packet = Subscribe::decode(raw_packet).unwrap();
285
286        assert_eq!(
287            packet,
288            Subscribe::new(
289                0x1234,
290                vec![
291                    TopicQosFilter::new("/a", QoS::AtMostOnce),
292                    TopicQosFilter::new("/b", QoS::ExactlyOnce)
293                ]
294            )
295        );
296    }
297
298    #[test]
299    fn subscribe_encode() {
300        let packet = Subscribe::new(
301            0x1234,
302            vec![
303                TopicQosFilter::new("/a", QoS::AtMostOnce),
304                TopicQosFilter::new("/b", QoS::ExactlyOnce),
305            ],
306        );
307
308        let mut stream = BytesMut::new();
309        packet.encode(&mut stream).unwrap();
310        assert_eq!(
311            stream,
312            vec![
313                (PacketType::Subscribe as u8) << 4 | 0b0010, // Packet type
314                0x0c,                                        // Remaining len
315                0x12,
316                0x34,
317                0x00,
318                0x02,
319                b'/',
320                b'a',
321                0x00,
322                0x00,
323                0x02,
324                b'/',
325                b'b',
326                0x02,
327            ]
328        );
329    }
330}