mqtt5_protocol/packet/
unsubscribe.rs1use crate::encoding::{decode_string, encode_string};
2use crate::error::{MqttError, Result};
3use crate::packet::{FixedHeader, MqttPacket, PacketType};
4use crate::protocol::v5::properties::Properties;
5use crate::types::ProtocolVersion;
6use bytes::{Buf, BufMut};
7
8#[derive(Debug, Clone)]
10pub struct UnsubscribePacket {
11 pub packet_id: u16,
13 pub filters: Vec<String>,
15 pub properties: Properties,
17 pub protocol_version: u8,
19}
20
21impl UnsubscribePacket {
22 #[must_use]
24 pub fn new(packet_id: u16) -> Self {
25 Self {
26 packet_id,
27 filters: Vec::new(),
28 properties: Properties::new(),
29 protocol_version: 5,
30 }
31 }
32
33 #[must_use]
35 pub fn new_v311(packet_id: u16) -> Self {
36 Self {
37 packet_id,
38 filters: Vec::new(),
39 properties: Properties::new(),
40 protocol_version: 4,
41 }
42 }
43
44 #[must_use]
46 pub fn add_filter(mut self, filter: impl Into<String>) -> Self {
47 self.filters.push(filter.into());
48 self
49 }
50
51 #[must_use]
53 pub fn with_user_property(mut self, key: String, value: String) -> Self {
54 self.properties.add_user_property(key, value);
55 self
56 }
57}
58
59impl MqttPacket for UnsubscribePacket {
60 fn packet_type(&self) -> PacketType {
61 PacketType::Unsubscribe
62 }
63
64 fn flags(&self) -> u8 {
65 0x02 }
67
68 fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<()> {
69 buf.put_u16(self.packet_id);
70
71 if self.protocol_version == 5 {
72 self.properties.encode(buf)?;
73 }
74
75 if self.filters.is_empty() {
76 return Err(MqttError::MalformedPacket(
77 "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
78 ));
79 }
80
81 for filter in &self.filters {
82 encode_string(buf, filter)?;
83 }
84
85 Ok(())
86 }
87
88 fn decode_body<B: Buf>(buf: &mut B, fixed_header: &FixedHeader) -> Result<Self> {
89 Self::decode_body_with_version(buf, fixed_header, 5)
90 }
91}
92
93impl UnsubscribePacket {
94 pub fn decode_body_with_version<B: Buf>(
100 buf: &mut B,
101 fixed_header: &FixedHeader,
102 protocol_version: u8,
103 ) -> Result<Self> {
104 ProtocolVersion::try_from(protocol_version)
105 .map_err(|()| MqttError::UnsupportedProtocolVersion)?;
106
107 if fixed_header.flags != 0x02 {
108 return Err(MqttError::MalformedPacket(format!(
109 "Invalid UNSUBSCRIBE flags: expected 0x02, got 0x{:02X}",
110 fixed_header.flags
111 )));
112 }
113
114 if buf.remaining() < 2 {
115 return Err(MqttError::MalformedPacket(
116 "UNSUBSCRIBE missing packet identifier".to_string(),
117 ));
118 }
119 let packet_id = buf.get_u16();
120
121 let properties = if protocol_version == 5 {
122 Properties::decode(buf)?
123 } else {
124 Properties::default()
125 };
126
127 let mut filters = Vec::new();
128
129 if !buf.has_remaining() {
130 return Err(MqttError::MalformedPacket(
131 "UNSUBSCRIBE packet must contain at least one topic filter".to_string(),
132 ));
133 }
134
135 while buf.has_remaining() {
136 let filter = decode_string(buf)?;
137 filters.push(filter);
138 }
139
140 Ok(Self {
141 packet_id,
142 filters,
143 properties,
144 protocol_version,
145 })
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::protocol::v5::properties::PropertyId;
153 use bytes::BytesMut;
154
155 #[test]
156 fn test_unsubscribe_basic() {
157 let packet = UnsubscribePacket::new(123)
158 .add_filter("temperature/+")
159 .add_filter("humidity/#");
160
161 assert_eq!(packet.packet_id, 123);
162 assert_eq!(packet.filters.len(), 2);
163 assert_eq!(packet.filters[0], "temperature/+");
164 assert_eq!(packet.filters[1], "humidity/#");
165 }
166
167 #[test]
168 fn test_unsubscribe_with_properties() {
169 let packet = UnsubscribePacket::new(456)
170 .add_filter("test/topic")
171 .with_user_property("reason".to_string(), "cleanup".to_string());
172
173 assert_eq!(packet.filters.len(), 1);
174 assert!(packet.properties.contains(PropertyId::UserProperty));
175 }
176
177 #[test]
178 fn test_unsubscribe_encode_decode() {
179 let packet = UnsubscribePacket::new(789)
180 .add_filter("sensor/temp")
181 .add_filter("sensor/humidity")
182 .add_filter("sensor/pressure");
183
184 let mut buf = BytesMut::new();
185 packet.encode(&mut buf).unwrap();
186
187 let fixed_header = FixedHeader::decode(&mut buf).unwrap();
188 assert_eq!(fixed_header.packet_type, PacketType::Unsubscribe);
189 assert_eq!(fixed_header.flags, 0x02);
190
191 let decoded = UnsubscribePacket::decode_body(&mut buf, &fixed_header).unwrap();
192 assert_eq!(decoded.packet_id, 789);
193 assert_eq!(decoded.filters.len(), 3);
194 assert_eq!(decoded.filters[0], "sensor/temp");
195 assert_eq!(decoded.filters[1], "sensor/humidity");
196 assert_eq!(decoded.filters[2], "sensor/pressure");
197 }
198
199 #[test]
200 fn test_unsubscribe_invalid_flags() {
201 let mut buf = BytesMut::new();
202 buf.put_u16(123);
203
204 let fixed_header = FixedHeader::new(PacketType::Unsubscribe, 0x00, 2); let result = UnsubscribePacket::decode_body(&mut buf, &fixed_header);
206 assert!(result.is_err());
207 }
208
209 #[test]
210 fn test_unsubscribe_empty_filters() {
211 let packet = UnsubscribePacket::new(123);
212
213 let mut buf = BytesMut::new();
214 let result = packet.encode(&mut buf);
215 assert!(result.is_err());
216 }
217}