mqute_codec/protocol/v4/
unsubscribe.rs

1//! # Unsubscribe Packet V4
2//!
3//! This module defines the `Unsubscribe` packet, which is used in the MQTT protocol to request
4//! the removal of one or more topic filters from a subscription. The `Unsubscribe` packet
5//! includes a packet ID and a list of topic filters.
6
7use crate::Error;
8use crate::codec::util::decode_word;
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters};
11use bytes::{BufMut, BytesMut};
12
13/// Represents an MQTT `Unsubscribe` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::TopicFilters;
19/// use mqute_codec::protocol::v4::Unsubscribe;
20///
21/// let unsubscribe = Unsubscribe::new(1234, vec!["topic1", "topic2"]);
22///
23/// let filters = TopicFilters::new(vec!["topic1", "topic2"]);
24///
25/// assert_eq!(unsubscribe.packet_id(), 1234u16);
26/// assert_eq!(unsubscribe.filters(), filters);
27/// ```
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Unsubscribe {
30    /// The packet ID for the `Unsubscribe` packet.
31    packet_id: u16,
32
33    /// The list of topic filters to unsubscribe from.
34    filters: TopicFilters,
35}
36
37impl Unsubscribe {
38    /// Creates a new `Unsubscribe` packet.
39    ///
40    /// # Panics
41    ///
42    /// Panics if:
43    /// - `packet_id` is zero.
44    /// - The topic filters are invalid according to MQTT topic naming rules.
45    pub fn new<T: IntoIterator<Item: Into<String>>>(packet_id: u16, filters: T) -> Self {
46        if packet_id == 0 {
47            panic!("Packet id is zero");
48        }
49
50        Unsubscribe {
51            packet_id,
52            filters: TopicFilters::new(filters),
53        }
54    }
55
56    /// Returns the packet ID of the `Unsubscribe` packet.
57    pub fn packet_id(&self) -> u16 {
58        self.packet_id
59    }
60
61    /// Returns the list of topic filters to unsubscribe from.
62    pub fn filters(&self) -> TopicFilters {
63        self.filters.clone()
64    }
65}
66
67impl Decode for Unsubscribe {
68    /// Decodes an `Unsubscribe` packet from a raw MQTT packet.
69    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
70        if packet.header.packet_type() != PacketType::Unsubscribe
71            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
72        {
73            return Err(Error::MalformedPacket);
74        }
75
76        let packet_id = decode_word(&mut packet.payload)?;
77        let filters = TopicFilters::decode(&mut packet.payload)?;
78
79        Ok(Unsubscribe::new(packet_id, filters))
80    }
81}
82
83impl Encode for Unsubscribe {
84    /// Encodes the `Unsubscribe` packet into a byte buffer.
85    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86        let header = FixedHeader::with_flags(
87            PacketType::Unsubscribe,
88            Flags::new(QoS::AtLeastOnce),
89            self.payload_len(),
90        );
91        header.encode(buf)?;
92
93        // Encode the packet id
94        buf.put_u16(self.packet_id);
95        self.filters.encode(buf);
96        Ok(())
97    }
98
99    fn payload_len(&self) -> usize {
100        2 + self.filters.encoded_len()
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::codec::PacketCodec;
108    use bytes::BytesMut;
109    use tokio_util::codec::Decoder;
110
111    fn packet_data() -> &'static [u8] {
112        &[
113            (PacketType::Unsubscribe as u8) << 4 | 0b0010, // Packet type
114            0x16,                                          // Remaining len
115            0x12,                                          // Packet ID
116            0x34,                                          //
117            0x00,                                          // Topic #1 len
118            0x0c,                                          //
119            b'h',                                          // Topic message
120            b'e',
121            b'l',
122            b'l',
123            b'o',
124            b' ',
125            b'w',
126            b'o',
127            b'r',
128            b'l',
129            b'd',
130            b'!',
131            0x00, // Topic #2 len
132            0x04,
133            b't', // Topic message
134            b'e',
135            b's',
136            b't',
137        ]
138    }
139
140    #[test]
141    fn unsubscribe_decode() {
142        let mut codec = PacketCodec::new(None, None);
143
144        let mut stream = BytesMut::new();
145
146        stream.extend_from_slice(packet_data());
147
148        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
149        let packet = Unsubscribe::decode(raw_packet).unwrap();
150
151        assert_eq!(
152            packet,
153            Unsubscribe::new(0x1234, vec!["hello world!", "test"])
154        );
155    }
156
157    #[test]
158    fn unsubscribe_encode() {
159        let packet = Unsubscribe::new(0x1234, vec!["hello world!", "test"]);
160
161        let mut stream = BytesMut::new();
162        packet.encode(&mut stream).unwrap();
163        assert_eq!(stream, Vec::from(packet_data()));
164    }
165
166    #[test]
167    #[should_panic]
168    fn unsubscribe_construct() {
169        Unsubscribe::new(0, Vec::<String>::new());
170    }
171}