1use crate::MqttString;
2
3use super::{
4 len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5 write_remaining_length, Debug, Error, FixedHeader, PropertyType, QoS,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct SubAck {
12 pub pkid: u16,
13 pub return_codes: Vec<SubscribeReasonCode>,
14 pub properties: Option<SubAckProperties>,
15}
16
17impl SubAck {
18 fn len(&self) -> usize {
19 let mut len = 2 + self.return_codes.len();
20
21 if let Some(p) = &self.properties {
22 let properties_len = p.len();
23 let properties_len_len = len_len(properties_len);
24 len += properties_len_len + properties_len;
25 } else {
26 len += 1;
28 }
29
30 len
31 }
32
33 #[must_use]
34 pub fn size(&self) -> usize {
35 let len = self.len();
36 let remaining_len_size = len_len(len);
37
38 1 + remaining_len_size + len
39 }
40
41 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<SubAck, Error> {
42 let variable_header_index = fixed_header.fixed_header_len;
43 bytes.advance(variable_header_index);
44
45 let pkid = read_u16(&mut bytes)?;
46 let properties = SubAckProperties::read(&mut bytes)?;
47
48 if !bytes.has_remaining() {
49 return Err(Error::MalformedPacket);
50 }
51
52 let mut return_codes = Vec::new();
53 while bytes.has_remaining() {
54 let return_code = read_u8(&mut bytes)?;
55 return_codes.push(reason(return_code)?);
56 }
57
58 let suback = SubAck {
59 pkid,
60 return_codes,
61 properties,
62 };
63
64 Ok(suback)
65 }
66
67 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
68 buffer.put_u8(0x90);
69 let remaining_len = self.len();
70 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
71
72 buffer.put_u16(self.pkid);
73
74 if let Some(p) = &self.properties {
75 p.write(buffer)?;
76 } else {
77 write_remaining_length(buffer, 0)?;
78 }
79
80 let p: Vec<u8> = self.return_codes.iter().map(|&c| code(c)).collect();
81
82 buffer.extend_from_slice(&p);
83 Ok(1 + remaining_len_bytes + remaining_len)
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum SubscribeReasonCode {
89 Success(QoS),
90 Unspecified,
91 ImplementationSpecific,
92 NotAuthorized,
93 TopicFilterInvalid,
94 PkidInUse,
95 QuotaExceeded,
96 SharedSubscriptionsNotSupported,
97 SubscriptionIdNotSupported,
98 WildcardSubscriptionsNotSupported,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct SubAckProperties {
103 pub reason_string: Option<MqttString>,
104 pub user_properties: Vec<(MqttString, MqttString)>,
105}
106
107impl SubAckProperties {
108 fn len(&self) -> usize {
109 let mut len = 0;
110
111 if let Some(reason) = &self.reason_string {
112 len += 1 + 2 + reason.len();
113 }
114
115 for (key, value) in &self.user_properties {
116 len += 1 + 2 + key.len() + 2 + value.len();
117 }
118
119 len
120 }
121
122 pub fn read(bytes: &mut Bytes) -> Result<Option<SubAckProperties>, Error> {
123 let mut reason_string = None;
124 let mut user_properties = Vec::new();
125
126 let (properties_len_len, properties_len) = length(bytes.iter())?;
127 bytes.advance(properties_len_len);
128 if properties_len == 0 {
129 return Ok(None);
130 }
131
132 let mut cursor = 0;
133 while cursor < properties_len {
135 let prop = read_u8(bytes)?;
136 cursor += 1;
137
138 match property(prop)? {
139 PropertyType::ReasonString => {
140 let reason = read_mqtt_string(bytes)?;
141 cursor += 2 + reason.len();
142 reason_string = Some(reason);
143 }
144 PropertyType::UserProperty => {
145 let key = read_mqtt_string(bytes)?;
146 let value = read_mqtt_string(bytes)?;
147 cursor += 2 + key.len() + 2 + value.len();
148 user_properties.push((key, value));
149 }
150 _ => return Err(Error::InvalidPropertyType(prop)),
151 }
152 }
153
154 Ok(Some(SubAckProperties {
155 reason_string,
156 user_properties,
157 }))
158 }
159
160 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
161 let len = self.len();
162 write_remaining_length(buffer, len)?;
163
164 if let Some(reason) = &self.reason_string {
165 buffer.put_u8(PropertyType::ReasonString as u8);
166 write_mqtt_string(buffer, reason)?;
167 }
168
169 for (key, value) in &self.user_properties {
170 buffer.put_u8(PropertyType::UserProperty as u8);
171 write_mqtt_string(buffer, key)?;
172 write_mqtt_string(buffer, value)?;
173 }
174
175 Ok(())
176 }
177}
178
179fn reason(code: u8) -> Result<SubscribeReasonCode, Error> {
180 let v = match code {
181 0 => SubscribeReasonCode::Success(QoS::AtMostOnce),
182 1 => SubscribeReasonCode::Success(QoS::AtLeastOnce),
183 2 => SubscribeReasonCode::Success(QoS::ExactlyOnce),
184 128 => SubscribeReasonCode::Unspecified,
185 131 => SubscribeReasonCode::ImplementationSpecific,
186 135 => SubscribeReasonCode::NotAuthorized,
187 143 => SubscribeReasonCode::TopicFilterInvalid,
188 145 => SubscribeReasonCode::PkidInUse,
189 151 => SubscribeReasonCode::QuotaExceeded,
190 158 => SubscribeReasonCode::SharedSubscriptionsNotSupported,
191 161 => SubscribeReasonCode::SubscriptionIdNotSupported,
192 162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported,
193 v => return Err(Error::InvalidSubscribeReasonCode(v)),
194 };
195
196 Ok(v)
197}
198
199fn code(value: SubscribeReasonCode) -> u8 {
200 match value {
201 SubscribeReasonCode::Success(qos) => qos as u8,
202 SubscribeReasonCode::Unspecified => 128,
203 SubscribeReasonCode::ImplementationSpecific => 131,
204 SubscribeReasonCode::NotAuthorized => 135,
205 SubscribeReasonCode::TopicFilterInvalid => 143,
206 SubscribeReasonCode::PkidInUse => 145,
207 SubscribeReasonCode::QuotaExceeded => 151,
208 SubscribeReasonCode::SharedSubscriptionsNotSupported => 158,
209 SubscribeReasonCode::SubscriptionIdNotSupported => 161,
210 SubscribeReasonCode::WildcardSubscriptionsNotSupported => 162,
211 }
212}
213
214#[cfg(test)]
215mod test {
216 use crate::test::read_write_packets;
217 use crate::Packet;
218
219 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
220 use super::*;
221 use bytes::BytesMut;
222 use pretty_assertions::assert_eq;
223
224 #[test]
225 fn length_calculation() {
226 let mut dummy_bytes = BytesMut::new();
227 let suback_props = SubAckProperties {
230 reason_string: None,
231 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
232 };
233
234 let suback_pkt = SubAck {
235 pkid: 1,
236 return_codes: vec![SubscribeReasonCode::Success(QoS::ExactlyOnce)],
237 properties: Some(suback_props),
238 };
239
240 let size_from_size = suback_pkt.size();
241 let size_from_write = suback_pkt.write(&mut dummy_bytes).unwrap();
242 let size_from_bytes = dummy_bytes.len();
243
244 assert_eq!(size_from_write, size_from_bytes);
245 assert_eq!(size_from_size, size_from_bytes);
246 }
247
248 #[test]
249 fn test_write_read() {
250 read_write_packets(write_read_provider());
251 }
252
253 fn write_read_provider() -> Vec<Packet> {
254 vec![
255 Packet::SubAck(SubAck {
256 pkid: 42,
257 return_codes: vec![SubscribeReasonCode::Success(QoS::AtMostOnce)],
258 properties: None,
259 }),
260 Packet::SubAck(SubAck {
261 pkid: 42,
262 return_codes: vec![SubscribeReasonCode::Success(QoS::AtMostOnce)],
263 properties: Some(SubAckProperties {
264 reason_string: Some("test".into()),
265 user_properties: vec![
266 (USER_PROP_KEY.into(), USER_PROP_VAL.into()),
267 ("property2".into(), "value2".into()),
268 ],
269 }),
270 }),
271 ]
272 }
273}