mqute_codec/protocol/v4/
unsubscribe.rs

1//! # Unsubscribe Packet V4
2//!
3//! This module defines the `Unsubscribe` packet, which is used in the MQTT protocol to request
4//! the removal of one or more topic filters from a subscription. The `Unsubscribe` packet
5//! includes a packet ID and a list of topic filters.
6
7use crate::codec::util::decode_word;
8use crate::codec::{Decode, Encode, RawPacket};
9use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters};
10use crate::Error;
11use bytes::{BufMut, BytesMut};
12
13/// Represents an MQTT `Unsubscribe` packet.
14///
15/// # Example
16///
17/// ```rust
18/// use mqute_codec::protocol::TopicFilters;
19/// use mqute_codec::protocol::v4::Unsubscribe;
20///
21/// let unsubscribe = Unsubscribe::new(1234, vec!["topic1", "topic2"]);
22///
23/// let filters = TopicFilters::new(vec!["topic1", "topic2"]);
24///
25/// assert_eq!(unsubscribe.packet_id(), 1234u16);
26/// assert_eq!(unsubscribe.filters(), filters);
27/// ```
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Unsubscribe {
30    /// The packet ID for the `Unsubscribe` packet.
31    packet_id: u16,
32
33    /// The list of topic filters to unsubscribe from.
34    filters: TopicFilters,
35}
36
37impl Unsubscribe {
38    /// Creates a new `Unsubscribe` packet.
39    ///
40    /// # Panics
41    ///
42    /// Panics if `packet_id` is zero.
43    pub fn new<T: IntoIterator<Item: Into<String>>>(packet_id: u16, filters: T) -> Self {
44        if packet_id == 0 {
45            panic!("Packet id is zero");
46        }
47
48        Unsubscribe {
49            packet_id,
50            filters: TopicFilters::new(filters),
51        }
52    }
53
54    /// Returns the packet ID of the `Unsubscribe` packet.
55    pub fn packet_id(&self) -> u16 {
56        self.packet_id
57    }
58
59    /// Returns the list of topic filters to unsubscribe from.
60    pub fn filters(&self) -> TopicFilters {
61        self.filters.clone()
62    }
63}
64
65impl Decode for Unsubscribe {
66    /// Decodes an `Unsubscribe` packet from a raw MQTT packet.
67    fn decode(mut packet: RawPacket) -> Result<Self, Error> {
68        if packet.header.packet_type() != PacketType::Unsubscribe
69            || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
70        {
71            return Err(Error::MalformedPacket);
72        }
73
74        let packet_id = decode_word(&mut packet.payload)?;
75        let filters = TopicFilters::decode(&mut packet.payload)?;
76
77        Ok(Unsubscribe::new(packet_id, filters))
78    }
79}
80
81impl Encode for Unsubscribe {
82    /// Encodes the `Unsubscribe` packet into a byte buffer.
83    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
84        let header = FixedHeader::with_flags(
85            PacketType::Unsubscribe,
86            Flags::new(QoS::AtLeastOnce),
87            self.payload_len(),
88        );
89        header.encode(buf)?;
90
91        // Encode the packet id
92        buf.put_u16(self.packet_id);
93        self.filters.encode(buf);
94        Ok(())
95    }
96
97    fn payload_len(&self) -> usize {
98        2 + self.filters.encoded_len()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use crate::codec::PacketCodec;
106    use bytes::BytesMut;
107    use tokio_util::codec::Decoder;
108
109    fn packet_data() -> &'static [u8] {
110        &[
111            (PacketType::Unsubscribe as u8) << 4 | 0b0010, // Packet type
112            0x16,                                          // Remaining len
113            0x12,                                          // Packet ID
114            0x34,                                          //
115            0x00,                                          // Topic #1 len
116            0x0c,                                          //
117            b'h',                                          // Topic message
118            b'e',
119            b'l',
120            b'l',
121            b'o',
122            b' ',
123            b'w',
124            b'o',
125            b'r',
126            b'l',
127            b'd',
128            b'!',
129            0x00, // Topic #2 len
130            0x04,
131            b't', // Topic message
132            b'e',
133            b's',
134            b't',
135        ]
136    }
137
138    #[test]
139    fn unsubscribe_decode() {
140        let mut codec = PacketCodec::new(None, None);
141
142        let mut stream = BytesMut::new();
143
144        stream.extend_from_slice(packet_data());
145
146        let raw_packet = codec.decode(&mut stream).unwrap().unwrap();
147        let packet = Unsubscribe::decode(raw_packet).unwrap();
148
149        assert_eq!(
150            packet,
151            Unsubscribe::new(0x1234, vec!["hello world!", "test"])
152        );
153    }
154
155    #[test]
156    fn unsubscribe_encode() {
157        let packet = Unsubscribe::new(0x1234, vec!["hello world!", "test"]);
158
159        let mut stream = BytesMut::new();
160        packet.encode(&mut stream).unwrap();
161        assert_eq!(stream, Vec::from(packet_data()));
162    }
163
164    #[test]
165    #[should_panic]
166    fn unsubscribe_construct() {
167        Unsubscribe::new(0, Vec::<String>::new());
168    }
169}