mqute_codec/protocol/v4/
unsubscribe.rs

1//! # Unsubscribe Packet V4
2//!
3//! This module defines the `Unsubscribe` packet, which is used in the MQTT protocol to request
4//! the removal of one or more topic filters from a subscription. The `Unsubscribe` packet
5//! includes a packet ID and a list of topic filters.
6
7use crate::Error;
8use crate::codec::util::decode_word;
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters, traits};
11use bytes::{BufMut, BytesMut};
12
13/// Represents an MQTT `Unsubscribe` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::TopicFilters;
19/// use mqute_codec::protocol::v4::Unsubscribe;
20///
21/// let unsubscribe = Unsubscribe::new(1234, vec!["topic1", "topic2"]);
22///
23/// let filters = TopicFilters::new(vec!["topic1", "topic2"]);
24///
25/// assert_eq!(unsubscribe.packet_id(), 1234u16);
26/// assert_eq!(unsubscribe.filters(), filters);
27/// ```
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Unsubscribe {
30    /// The packet ID for the `Unsubscribe` packet.
31    packet_id: u16,
32
33    /// The list of topic filters to unsubscribe from.
34    filters: TopicFilters,
35}
36
37impl Unsubscribe {
38    /// Creates a new `Unsubscribe` packet.
39    ///
40    /// # Panics
41    ///
42    /// Panics if:
43    /// - `packet_id` is zero.
44    /// - The topic filters are invalid according to MQTT topic naming rules.
45    pub fn new<T: IntoIterator<Item: Into<String>>>(packet_id: u16, filters: T) -> Self {
46        if packet_id == 0 {
47            panic!("Packet id is zero");
48        }
49
50        Unsubscribe {
51            packet_id,
52            filters: TopicFilters::new(filters),
53        }
54    }
55
56    /// Returns the packet ID of the `Unsubscribe` packet.
57    pub fn packet_id(&self) -> u16 {
58        self.packet_id
59    }
60
61    /// Returns the list of topic filters to unsubscribe from.
62    pub fn filters(&self) -> TopicFilters {
63        self.filters.clone()
64    }
65}
66
67impl Decode for Unsubscribe {
68    /// Decodes an `Unsubscribe` packet from a raw MQTT packet.
69    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
70        if packet.header.packet_type() != PacketType::Unsubscribe
71            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
72        {
73            return Err(Error::MalformedPacket);
74        }
75
76        let packet_id = decode_word(&mut packet.payload)?;
77        let filters = TopicFilters::decode(&mut packet.payload)?;
78
79        Ok(Unsubscribe::new(packet_id, filters))
80    }
81}
82
83impl Encode for Unsubscribe {
84    /// Encodes the `Unsubscribe` packet into a byte buffer.
85    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86        let header = FixedHeader::with_flags(
87            PacketType::Unsubscribe,
88            Flags::new(QoS::AtLeastOnce),
89            self.payload_len(),
90        );
91        header.encode(buf)?;
92
93        // Encode the packet id
94        buf.put_u16(self.packet_id);
95        self.filters.encode(buf);
96        Ok(())
97    }
98
99    fn payload_len(&self) -> usize {
100        2 + self.filters.encoded_len()
101    }
102}
103
104impl traits::Unsubscribe for Unsubscribe {}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::codec::PacketCodec;
110    use bytes::BytesMut;
111    use tokio_util::codec::Decoder;
112
113    fn packet_data() -> &'static [u8] {
114        &[
115            (PacketType::Unsubscribe as u8) << 4 | 0b0010, // Packet type
116            0x16,                                          // Remaining len
117            0x12,                                          // Packet ID
118            0x34,                                          //
119            0x00,                                          // Topic #1 len
120            0x0c,                                          //
121            b'h',                                          // Topic message
122            b'e',
123            b'l',
124            b'l',
125            b'o',
126            b' ',
127            b'w',
128            b'o',
129            b'r',
130            b'l',
131            b'd',
132            b'!',
133            0x00, // Topic #2 len
134            0x04,
135            b't', // Topic message
136            b'e',
137            b's',
138            b't',
139        ]
140    }
141
142    #[test]
143    fn unsubscribe_decode() {
144        let mut codec = PacketCodec::new(None, None);
145
146        let mut stream = BytesMut::new();
147
148        stream.extend_from_slice(packet_data());
149
150        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
151        let packet = Unsubscribe::decode(raw_packet).unwrap();
152
153        assert_eq!(
154            packet,
155            Unsubscribe::new(0x1234, vec!["hello world!", "test"])
156        );
157    }
158
159    #[test]
160    fn unsubscribe_encode() {
161        let packet = Unsubscribe::new(0x1234, vec!["hello world!", "test"]);
162
163        let mut stream = BytesMut::new();
164        packet.encode(&mut stream).unwrap();
165        assert_eq!(stream, Vec::from(packet_data()));
166    }
167
168    #[test]
169    #[should_panic]
170    fn unsubscribe_construct() {
171        Unsubscribe::new(0, Vec::<String>::new());
172    }
173}