mqtt5_protocol/packet/
unsubscribe.rs

1use crate::encoding::{decode_string, encode_string};
2use crate::error::{MqttError, Result};
3use crate::packet::{FixedHeader, MqttPacket, PacketType};
4use crate::protocol::v5::properties::Properties;
5use crate::types::ProtocolVersion;
6use bytes::{Buf, BufMut};
7
8/// MQTT UNSUBSCRIBE packet
9#[derive(Debug, Clone)]
10pub struct UnsubscribePacket {
11    /// Packet identifier
12    pub packet_id: u16,
13    /// Topic filters to unsubscribe from
14    pub filters: Vec<String>,
15    /// UNSUBSCRIBE properties (v5.0 only)
16    pub properties: Properties,
17    /// Protocol version (4 = v3.1.1, 5 = v5.0)
18    pub protocol_version: u8,
19}
20
21impl UnsubscribePacket {
22    /// Creates a new UNSUBSCRIBE packet (v5.0)
23    #[must_use]
24    pub fn new(packet_id: u16) -> Self {
25        Self {
26            packet_id,
27            filters: Vec::new(),
28            properties: Properties::new(),
29            protocol_version: 5,
30        }
31    }
32
33    /// Creates a new UNSUBSCRIBE packet for v3.1.1
34    #[must_use]
35    pub fn new_v311(packet_id: u16) -> Self {
36        Self {
37            packet_id,
38            filters: Vec::new(),
39            properties: Properties::new(),
40            protocol_version: 4,
41        }
42    }
43
44    /// Adds a topic filter to unsubscribe from
45    #[must_use]
46    pub fn add_filter(mut self, filter: impl Into<String>) -> Self {
47        self.filters.push(filter.into());
48        self
49    }
50
51    /// Adds a user property
52    #[must_use]
53    pub fn with_user_property(mut self, key: String, value: String) -> Self {
54        self.properties.add_user_property(key, value);
55        self
56    }
57}
58
59impl MqttPacket for UnsubscribePacket {
60    fn packet_type(&self) -> PacketType {
61        PacketType::Unsubscribe
62    }
63
64    fn flags(&self) -> u8 {
65        0x02 // UNSUBSCRIBE must have flags = 0x02
66    }
67
68    fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<()> {
69        buf.put_u16(self.packet_id);
70
71        if self.protocol_version == 5 {
72            self.properties.encode(buf)?;
73        }
74
75        if self.filters.is_empty() {
76            return Err(MqttError::MalformedPacket(
77                "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
78            ));
79        }
80
81        for filter in &self.filters {
82            encode_string(buf, filter)?;
83        }
84
85        Ok(())
86    }
87
88    fn decode_body<B: Buf>(buf: &mut B, fixed_header: &FixedHeader) -> Result<Self> {
89        Self::decode_body_with_version(buf, fixed_header, 5)
90    }
91}
92
93impl UnsubscribePacket {
94    /// Decodes the packet body with a specific protocol version
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if decoding fails
99    pub fn decode_body_with_version<B: Buf>(
100        buf: &mut B,
101        fixed_header: &FixedHeader,
102        protocol_version: u8,
103    ) -> Result<Self> {
104        ProtocolVersion::try_from(protocol_version)
105            .map_err(|()| MqttError::UnsupportedProtocolVersion)?;
106
107        if fixed_header.flags != 0x02 {
108            return Err(MqttError::MalformedPacket(format!(
109                "Invalid UNSUBSCRIBE flags: expected 0x02, got 0x{:02X}",
110                fixed_header.flags
111            )));
112        }
113
114        if buf.remaining() < 2 {
115            return Err(MqttError::MalformedPacket(
116                "UNSUBSCRIBE missing packet identifier".to_string(),
117            ));
118        }
119        let packet_id = buf.get_u16();
120
121        let properties = if protocol_version == 5 {
122            Properties::decode(buf)?
123        } else {
124            Properties::default()
125        };
126
127        let mut filters = Vec::new();
128
129        if !buf.has_remaining() {
130            return Err(MqttError::MalformedPacket(
131                "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
132            ));
133        }
134
135        while buf.has_remaining() {
136            let filter = decode_string(buf)?;
137            filters.push(filter);
138        }
139
140        Ok(Self {
141            packet_id,
142            filters,
143            properties,
144            protocol_version,
145        })
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::protocol::v5::properties::PropertyId;
153    use bytes::BytesMut;
154
155    #[test]
156    fn test_unsubscribe_basic() {
157        let packet = UnsubscribePacket::new(123)
158            .add_filter("temperature/+")
159            .add_filter("humidity/#");
160
161        assert_eq!(packet.packet_id, 123);
162        assert_eq!(packet.filters.len(), 2);
163        assert_eq!(packet.filters[0], "temperature/+");
164        assert_eq!(packet.filters[1], "humidity/#");
165    }
166
167    #[test]
168    fn test_unsubscribe_with_properties() {
169        let packet = UnsubscribePacket::new(456)
170            .add_filter("test/topic")
171            .with_user_property("reason".to_string(), "cleanup".to_string());
172
173        assert_eq!(packet.filters.len(), 1);
174        assert!(packet.properties.contains(PropertyId::UserProperty));
175    }
176
177    #[test]
178    fn test_unsubscribe_encode_decode() {
179        let packet = UnsubscribePacket::new(789)
180            .add_filter("sensor/temp")
181            .add_filter("sensor/humidity")
182            .add_filter("sensor/pressure");
183
184        let mut buf = BytesMut::new();
185        packet.encode(&mut buf).unwrap();
186
187        let fixed_header = FixedHeader::decode(&mut buf).unwrap();
188        assert_eq!(fixed_header.packet_type, PacketType::Unsubscribe);
189        assert_eq!(fixed_header.flags, 0x02);
190
191        let decoded = UnsubscribePacket::decode_body(&mut buf, &fixed_header).unwrap();
192        assert_eq!(decoded.packet_id, 789);
193        assert_eq!(decoded.filters.len(), 3);
194        assert_eq!(decoded.filters[0], "sensor/temp");
195        assert_eq!(decoded.filters[1], "sensor/humidity");
196        assert_eq!(decoded.filters[2], "sensor/pressure");
197    }
198
199    #[test]
200    fn test_unsubscribe_invalid_flags() {
201        let mut buf = BytesMut::new();
202        buf.put_u16(123);
203
204        let fixed_header = FixedHeader::new(PacketType::Unsubscribe, 0x00, 2); // Wrong flags
205        let result = UnsubscribePacket::decode_body(&mut buf, &fixed_header);
206        assert!(result.is_err());
207    }
208
209    #[test]
210    fn test_unsubscribe_empty_filters() {
211        let packet = UnsubscribePacket::new(123);
212
213        let mut buf = BytesMut::new();
214        let result = packet.encode(&mut buf);
215        assert!(result.is_err());
216    }
217}