mqtt5_protocol/packet/
unsubscribe.rs

1use crate::encoding::{decode_string, encode_string};
2use crate::error::{MqttError, Result};
3use crate::packet::{FixedHeader, MqttPacket, PacketType};
4use crate::protocol::v5::properties::Properties;
5use bytes::{Buf, BufMut};
6
7/// MQTT UNSUBSCRIBE packet
8#[derive(Debug, Clone)]
9pub struct UnsubscribePacket {
10    /// Packet identifier
11    pub packet_id: u16,
12    /// Topic filters to unsubscribe from
13    pub filters: Vec<String>,
14    /// UNSUBSCRIBE properties (v5.0 only)
15    pub properties: Properties,
16}
17
18impl UnsubscribePacket {
19    /// Creates a new UNSUBSCRIBE packet
20    #[must_use]
21    pub fn new(packet_id: u16) -> Self {
22        Self {
23            packet_id,
24            filters: Vec::new(),
25            properties: Properties::new(),
26        }
27    }
28
29    /// Adds a topic filter to unsubscribe from
30    #[must_use]
31    pub fn add_filter(mut self, filter: impl Into<String>) -> Self {
32        self.filters.push(filter.into());
33        self
34    }
35
36    /// Adds a user property
37    #[must_use]
38    pub fn with_user_property(mut self, key: String, value: String) -> Self {
39        self.properties.add_user_property(key, value);
40        self
41    }
42}
43
44impl MqttPacket for UnsubscribePacket {
45    fn packet_type(&self) -> PacketType {
46        PacketType::Unsubscribe
47    }
48
49    fn flags(&self) -> u8 {
50        0x02 // UNSUBSCRIBE must have flags = 0x02
51    }
52
53    fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<()> {
54        // Variable header
55        buf.put_u16(self.packet_id);
56
57        // Properties (v5.0)
58        self.properties.encode(buf)?;
59
60        // Payload - topic filters
61        if self.filters.is_empty() {
62            return Err(MqttError::MalformedPacket(
63                "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
64            ));
65        }
66
67        for filter in &self.filters {
68            encode_string(buf, filter)?;
69        }
70
71        Ok(())
72    }
73
74    fn decode_body<B: Buf>(buf: &mut B, fixed_header: &FixedHeader) -> Result<Self> {
75        // Validate flags
76        if fixed_header.flags != 0x02 {
77            return Err(MqttError::MalformedPacket(format!(
78                "Invalid UNSUBSCRIBE flags: expected 0x02, got 0x{:02X}",
79                fixed_header.flags
80            )));
81        }
82
83        // Packet identifier
84        if buf.remaining() < 2 {
85            return Err(MqttError::MalformedPacket(
86                "UNSUBSCRIBE missing packet identifier".to_string(),
87            ));
88        }
89        let packet_id = buf.get_u16();
90
91        // Properties (v5.0)
92        let properties = Properties::decode(buf)?;
93
94        // Payload - topic filters
95        let mut filters = Vec::new();
96
97        if !buf.has_remaining() {
98            return Err(MqttError::MalformedPacket(
99                "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
100            ));
101        }
102
103        while buf.has_remaining() {
104            let filter = decode_string(buf)?;
105            filters.push(filter);
106        }
107
108        Ok(Self {
109            packet_id,
110            filters,
111            properties,
112        })
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::protocol::v5::properties::PropertyId;
120    use bytes::BytesMut;
121
122    #[test]
123    fn test_unsubscribe_basic() {
124        let packet = UnsubscribePacket::new(123)
125            .add_filter("temperature/+")
126            .add_filter("humidity/#");
127
128        assert_eq!(packet.packet_id, 123);
129        assert_eq!(packet.filters.len(), 2);
130        assert_eq!(packet.filters[0], "temperature/+");
131        assert_eq!(packet.filters[1], "humidity/#");
132    }
133
134    #[test]
135    fn test_unsubscribe_with_properties() {
136        let packet = UnsubscribePacket::new(456)
137            .add_filter("test/topic")
138            .with_user_property("reason".to_string(), "cleanup".to_string());
139
140        assert_eq!(packet.filters.len(), 1);
141        assert!(packet.properties.contains(PropertyId::UserProperty));
142    }
143
144    #[test]
145    fn test_unsubscribe_encode_decode() {
146        let packet = UnsubscribePacket::new(789)
147            .add_filter("sensor/temp")
148            .add_filter("sensor/humidity")
149            .add_filter("sensor/pressure");
150
151        let mut buf = BytesMut::new();
152        packet.encode(&mut buf).unwrap();
153
154        let fixed_header = FixedHeader::decode(&mut buf).unwrap();
155        assert_eq!(fixed_header.packet_type, PacketType::Unsubscribe);
156        assert_eq!(fixed_header.flags, 0x02);
157
158        let decoded = UnsubscribePacket::decode_body(&mut buf, &fixed_header).unwrap();
159        assert_eq!(decoded.packet_id, 789);
160        assert_eq!(decoded.filters.len(), 3);
161        assert_eq!(decoded.filters[0], "sensor/temp");
162        assert_eq!(decoded.filters[1], "sensor/humidity");
163        assert_eq!(decoded.filters[2], "sensor/pressure");
164    }
165
166    #[test]
167    fn test_unsubscribe_invalid_flags() {
168        let mut buf = BytesMut::new();
169        buf.put_u16(123);
170
171        let fixed_header = FixedHeader::new(PacketType::Unsubscribe, 0x00, 2); // Wrong flags
172        let result = UnsubscribePacket::decode_body(&mut buf, &fixed_header);
173        assert!(result.is_err());
174    }
175
176    #[test]
177    fn test_unsubscribe_empty_filters() {
178        let packet = UnsubscribePacket::new(123);
179
180        let mut buf = BytesMut::new();
181        let result = packet.encode(&mut buf);
182        assert!(result.is_err());
183    }
184}