1use super::{
2 Error, FixedHeader, PropertyType, QoS, len_len, length, property, read_mqtt_string, read_u8,
3 read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct SubAck {
10 pub pkid: u16,
11 pub return_codes: Vec<SubscribeReasonCode>,
12 pub properties: Option<SubAckProperties>,
13}
14
15impl SubAck {
16 fn len(&self) -> usize {
17 let mut len = 2 + self.return_codes.len();
18
19 if let Some(p) = &self.properties {
20 let properties_len = p.len();
21 let properties_len_len = len_len(properties_len);
22 len += properties_len_len + properties_len;
23 } else {
24 len += 1;
26 }
27
28 len
29 }
30
31 #[must_use]
32 pub fn size(&self) -> usize {
33 let len = self.len();
34 let remaining_len_size = len_len(len);
35
36 1 + remaining_len_size + len
37 }
38
39 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
40 let variable_header_index = fixed_header.header_len;
41 bytes.advance(variable_header_index);
42
43 let pkid = read_u16(&mut bytes)?;
44 let properties = SubAckProperties::read(&mut bytes)?;
45
46 if !bytes.has_remaining() {
47 return Err(Error::MalformedPacket);
48 }
49
50 let mut return_codes = Vec::new();
51 while bytes.has_remaining() {
52 let return_code = read_u8(&mut bytes)?;
53 return_codes.push(reason(return_code)?);
54 }
55
56 let suback = Self {
57 pkid,
58 return_codes,
59 properties,
60 };
61
62 Ok(suback)
63 }
64
65 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
66 buffer.put_u8(0x90);
67 let remaining_len = self.len();
68 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
69
70 buffer.put_u16(self.pkid);
71
72 if let Some(p) = &self.properties {
73 p.write(buffer)?;
74 } else {
75 write_remaining_length(buffer, 0)?;
76 }
77
78 for &return_code in &self.return_codes {
79 buffer.put_u8(code(return_code));
80 }
81 Ok(1 + remaining_len_bytes + remaining_len)
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum SubscribeReasonCode {
87 Success(QoS),
88 Failure,
89 Unspecified,
90 ImplementationSpecific,
91 NotAuthorized,
92 TopicFilterInvalid,
93 PkidInUse,
94 QuotaExceeded,
95 SharedSubscriptionsNotSupported,
96 SubscriptionIdNotSupported,
97 WildcardSubscriptionsNotSupported,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct SubAckProperties {
102 pub reason_string: Option<String>,
103 pub user_properties: Vec<(String, String)>,
104}
105
106impl SubAckProperties {
107 fn len(&self) -> usize {
108 let mut len = 0;
109
110 if let Some(reason) = &self.reason_string {
111 len += 1 + 2 + reason.len();
112 }
113
114 for (key, value) in &self.user_properties {
115 len += 1 + 2 + key.len() + 2 + value.len();
116 }
117
118 len
119 }
120
121 pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
122 let mut reason_string = None;
123 let mut user_properties = Vec::new();
124
125 let (properties_len_len, properties_len) = length(bytes.iter())?;
126 bytes.advance(properties_len_len);
127 if properties_len == 0 {
128 return Ok(None);
129 }
130
131 let mut cursor = 0;
132 while cursor < properties_len {
134 let prop = read_u8(bytes)?;
135 cursor += 1;
136
137 match property(prop)? {
138 PropertyType::ReasonString => {
139 let reason = read_mqtt_string(bytes)?;
140 cursor += 2 + reason.len();
141 reason_string = Some(reason);
142 }
143 PropertyType::UserProperty => {
144 let key = read_mqtt_string(bytes)?;
145 let value = read_mqtt_string(bytes)?;
146 cursor += 2 + key.len() + 2 + value.len();
147 user_properties.push((key, value));
148 }
149 _ => return Err(Error::InvalidPropertyType(prop)),
150 }
151 }
152
153 Ok(Some(Self {
154 reason_string,
155 user_properties,
156 }))
157 }
158
159 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
160 let len = self.len();
161 write_remaining_length(buffer, len)?;
162
163 if let Some(reason) = &self.reason_string {
164 buffer.put_u8(PropertyType::ReasonString as u8);
165 write_mqtt_string(buffer, reason);
166 }
167
168 for (key, value) in &self.user_properties {
169 buffer.put_u8(PropertyType::UserProperty as u8);
170 write_mqtt_string(buffer, key);
171 write_mqtt_string(buffer, value);
172 }
173
174 Ok(())
175 }
176}
177
178fn reason(code: u8) -> Result<SubscribeReasonCode, Error> {
179 code.try_into()
180}
181
182const fn code(value: SubscribeReasonCode) -> u8 {
183 match value {
184 SubscribeReasonCode::Success(qos) => qos as u8,
185 SubscribeReasonCode::Failure | SubscribeReasonCode::Unspecified => 0x80,
186 SubscribeReasonCode::ImplementationSpecific => 131,
187 SubscribeReasonCode::NotAuthorized => 135,
188 SubscribeReasonCode::TopicFilterInvalid => 143,
189 SubscribeReasonCode::PkidInUse => 145,
190 SubscribeReasonCode::QuotaExceeded => 151,
191 SubscribeReasonCode::SharedSubscriptionsNotSupported => 158,
192 SubscribeReasonCode::SubscriptionIdNotSupported => 161,
193 SubscribeReasonCode::WildcardSubscriptionsNotSupported => 162,
194 }
195}
196
197impl TryFrom<u8> for SubscribeReasonCode {
198 type Error = Error;
199
200 fn try_from(value: u8) -> Result<Self, Self::Error> {
201 match value {
202 0 => Ok(Self::Success(QoS::AtMostOnce)),
203 1 => Ok(Self::Success(QoS::AtLeastOnce)),
204 2 => Ok(Self::Success(QoS::ExactlyOnce)),
205 128 => Ok(Self::Unspecified),
206 131 => Ok(Self::ImplementationSpecific),
207 135 => Ok(Self::NotAuthorized),
208 143 => Ok(Self::TopicFilterInvalid),
209 145 => Ok(Self::PkidInUse),
210 151 => Ok(Self::QuotaExceeded),
211 158 => Ok(Self::SharedSubscriptionsNotSupported),
212 161 => Ok(Self::SubscriptionIdNotSupported),
213 162 => Ok(Self::WildcardSubscriptionsNotSupported),
214 _ => Err(Error::InvalidSubscribeReasonCode(value)),
215 }
216 }
217}
218
219#[cfg(test)]
220mod test {
221 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
222 use super::*;
223 use bytes::{Bytes, BytesMut};
224 use pretty_assertions::assert_eq;
225
226 #[test]
227 fn length_calculation() {
228 let mut dummy_bytes = BytesMut::new();
229 let suback_props = SubAckProperties {
232 reason_string: None,
233 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
234 };
235
236 let suback_pkt = SubAck {
237 pkid: 1,
238 return_codes: vec![SubscribeReasonCode::Success(QoS::ExactlyOnce)],
239 properties: Some(suback_props),
240 };
241
242 let size_from_size = suback_pkt.size();
243 let size_from_write = suback_pkt.write(&mut dummy_bytes).unwrap();
244 let size_from_bytes = dummy_bytes.len();
245
246 assert_eq!(size_from_write, size_from_bytes);
247 assert_eq!(size_from_size, size_from_bytes);
248 }
249
250 #[test]
251 fn reason_and_code_round_trip() {
252 let values = [
253 (0, SubscribeReasonCode::Success(QoS::AtMostOnce)),
254 (1, SubscribeReasonCode::Success(QoS::AtLeastOnce)),
255 (2, SubscribeReasonCode::Success(QoS::ExactlyOnce)),
256 (128, SubscribeReasonCode::Unspecified),
257 (131, SubscribeReasonCode::ImplementationSpecific),
258 (135, SubscribeReasonCode::NotAuthorized),
259 (143, SubscribeReasonCode::TopicFilterInvalid),
260 (145, SubscribeReasonCode::PkidInUse),
261 (151, SubscribeReasonCode::QuotaExceeded),
262 (158, SubscribeReasonCode::SharedSubscriptionsNotSupported),
263 (161, SubscribeReasonCode::SubscriptionIdNotSupported),
264 (162, SubscribeReasonCode::WildcardSubscriptionsNotSupported),
265 ];
266
267 for (raw, parsed) in values {
268 assert_eq!(reason(raw).unwrap(), parsed);
269 assert_eq!(code(parsed), raw);
270 }
271 }
272
273 #[test]
274 fn reason_invalid_code_errors() {
275 assert!(matches!(
276 reason(42),
277 Err(Error::InvalidSubscribeReasonCode(42))
278 ));
279 }
280
281 #[test]
282 fn failure_encodes_like_unspecified() {
283 assert_eq!(code(SubscribeReasonCode::Failure), 0x80);
284 assert_eq!(code(SubscribeReasonCode::Unspecified), 0x80);
285 }
286
287 #[test]
288 fn write_multiple_reasons_bytes() {
289 let mut buffer = BytesMut::new();
290 let suback = SubAck {
291 pkid: 10,
292 return_codes: vec![
293 SubscribeReasonCode::Success(QoS::AtMostOnce),
294 SubscribeReasonCode::ImplementationSpecific,
295 SubscribeReasonCode::WildcardSubscriptionsNotSupported,
296 ],
297 properties: None,
298 };
299
300 suback.write(&mut buffer).unwrap();
301
302 let expected = [
303 0x90, 0x06, 0x00, 0x0A, 0x00, 0x00, 0x83, 0xA2, ];
311 assert_eq!(&buffer[..], &expected);
312 }
313
314 #[test]
315 fn read_errors_on_invalid_first_reason_code() {
316 let packet = Bytes::from_static(&[
317 0x90, 0x0C, 0x00, 0x0A, 0x00, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]);
323
324 let fixed_header = FixedHeader::new(0x90, 1, 0x0C);
325 assert!(matches!(
326 SubAck::read(fixed_header, packet),
327 Err(Error::InvalidSubscribeReasonCode(0xFF))
328 ));
329 }
330}