mqtt5_protocol/packet/
unsubscribe.rs1use crate::encoding::{decode_string, encode_string};
2use crate::error::{MqttError, Result};
3use crate::packet::{FixedHeader, MqttPacket, PacketType};
4use crate::protocol::v5::properties::Properties;
5use bytes::{Buf, BufMut};
6
7#[derive(Debug, Clone)]
9pub struct UnsubscribePacket {
10 pub packet_id: u16,
12 pub filters: Vec<String>,
14 pub properties: Properties,
16}
17
18impl UnsubscribePacket {
19 #[must_use]
21 pub fn new(packet_id: u16) -> Self {
22 Self {
23 packet_id,
24 filters: Vec::new(),
25 properties: Properties::new(),
26 }
27 }
28
29 #[must_use]
31 pub fn add_filter(mut self, filter: impl Into<String>) -> Self {
32 self.filters.push(filter.into());
33 self
34 }
35
36 #[must_use]
38 pub fn with_user_property(mut self, key: String, value: String) -> Self {
39 self.properties.add_user_property(key, value);
40 self
41 }
42}
43
44impl MqttPacket for UnsubscribePacket {
45 fn packet_type(&self) -> PacketType {
46 PacketType::Unsubscribe
47 }
48
49 fn flags(&self) -> u8 {
50 0x02 }
52
53 fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<()> {
54 buf.put_u16(self.packet_id);
56
57 self.properties.encode(buf)?;
59
60 if self.filters.is_empty() {
62 return Err(MqttError::MalformedPacket(
63 "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
64 ));
65 }
66
67 for filter in &self.filters {
68 encode_string(buf, filter)?;
69 }
70
71 Ok(())
72 }
73
74 fn decode_body<B: Buf>(buf: &mut B, fixed_header: &FixedHeader) -> Result<Self> {
75 if fixed_header.flags != 0x02 {
77 return Err(MqttError::MalformedPacket(format!(
78 "Invalid UNSUBSCRIBE flags: expected 0x02, got 0x{:02X}",
79 fixed_header.flags
80 )));
81 }
82
83 if buf.remaining() < 2 {
85 return Err(MqttError::MalformedPacket(
86 "UNSUBSCRIBE missing packet identifier".to_string(),
87 ));
88 }
89 let packet_id = buf.get_u16();
90
91 let properties = Properties::decode(buf)?;
93
94 let mut filters = Vec::new();
96
97 if !buf.has_remaining() {
98 return Err(MqttError::MalformedPacket(
99 "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
100 ));
101 }
102
103 while buf.has_remaining() {
104 let filter = decode_string(buf)?;
105 filters.push(filter);
106 }
107
108 Ok(Self {
109 packet_id,
110 filters,
111 properties,
112 })
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::protocol::v5::properties::PropertyId;
120 use bytes::BytesMut;
121
122 #[test]
123 fn test_unsubscribe_basic() {
124 let packet = UnsubscribePacket::new(123)
125 .add_filter("temperature/+")
126 .add_filter("humidity/#");
127
128 assert_eq!(packet.packet_id, 123);
129 assert_eq!(packet.filters.len(), 2);
130 assert_eq!(packet.filters[0], "temperature/+");
131 assert_eq!(packet.filters[1], "humidity/#");
132 }
133
134 #[test]
135 fn test_unsubscribe_with_properties() {
136 let packet = UnsubscribePacket::new(456)
137 .add_filter("test/topic")
138 .with_user_property("reason".to_string(), "cleanup".to_string());
139
140 assert_eq!(packet.filters.len(), 1);
141 assert!(packet.properties.contains(PropertyId::UserProperty));
142 }
143
144 #[test]
145 fn test_unsubscribe_encode_decode() {
146 let packet = UnsubscribePacket::new(789)
147 .add_filter("sensor/temp")
148 .add_filter("sensor/humidity")
149 .add_filter("sensor/pressure");
150
151 let mut buf = BytesMut::new();
152 packet.encode(&mut buf).unwrap();
153
154 let fixed_header = FixedHeader::decode(&mut buf).unwrap();
155 assert_eq!(fixed_header.packet_type, PacketType::Unsubscribe);
156 assert_eq!(fixed_header.flags, 0x02);
157
158 let decoded = UnsubscribePacket::decode_body(&mut buf, &fixed_header).unwrap();
159 assert_eq!(decoded.packet_id, 789);
160 assert_eq!(decoded.filters.len(), 3);
161 assert_eq!(decoded.filters[0], "sensor/temp");
162 assert_eq!(decoded.filters[1], "sensor/humidity");
163 assert_eq!(decoded.filters[2], "sensor/pressure");
164 }
165
166 #[test]
167 fn test_unsubscribe_invalid_flags() {
168 let mut buf = BytesMut::new();
169 buf.put_u16(123);
170
171 let fixed_header = FixedHeader::new(PacketType::Unsubscribe, 0x00, 2); let result = UnsubscribePacket::decode_body(&mut buf, &fixed_header);
173 assert!(result.is_err());
174 }
175
176 #[test]
177 fn test_unsubscribe_empty_filters() {
178 let packet = UnsubscribePacket::new(123);
179
180 let mut buf = BytesMut::new();
181 let result = packet.encode(&mut buf);
182 assert!(result.is_err());
183 }
184}