1use super::*;
2use alloc::vec::Vec;
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use std::convert::{TryFrom, TryInto};
5
6#[derive(Debug, Clone, PartialEq)]
8pub struct SubAck {
9 pub pkid: u16,
10 pub return_codes: Vec<SubscribeReasonCode>,
11 pub properties: Option<SubAckProperties>,
12}
13
14impl SubAck {
15 pub fn new(pkid: u16, return_codes: Vec<SubscribeReasonCode>) -> SubAck {
16 SubAck {
17 pkid,
18 return_codes,
19 properties: None,
20 }
21 }
22
23 pub fn len(&self) -> usize {
24 let mut len = 2 + self.return_codes.len();
25
26 match &self.properties {
27 Some(properties) => {
28 let properties_len = properties.len();
29 let properties_len_len = len_len(properties_len);
30 len += properties_len_len + properties_len;
31 }
32 None => {
33 len += 1;
35 }
36 }
37
38 len
39 }
40
41 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
42 let variable_header_index = fixed_header.fixed_header_len;
43 bytes.advance(variable_header_index);
44
45 let pkid = read_u16(&mut bytes)?;
46 let properties = SubAckProperties::extract(&mut bytes)?;
47
48 if !bytes.has_remaining() {
49 return Err(Error::MalformedPacket);
50 }
51
52 let mut return_codes = Vec::new();
53 while bytes.has_remaining() {
54 let return_code = read_u8(&mut bytes)?;
55 return_codes.push(return_code.try_into()?);
56 }
57
58 let suback = SubAck {
59 pkid,
60 return_codes,
61 properties,
62 };
63
64 Ok(suback)
65 }
66
67 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
68 buffer.put_u8(0x90);
69 let remaining_len = self.len();
70 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
71
72 buffer.put_u16(self.pkid);
73
74 match &self.properties {
75 Some(properties) => properties.write(buffer)?,
76 None => {
77 write_remaining_length(buffer, 0)?;
78 }
79 };
80
81 let p: Vec<u8> = self.return_codes.iter().map(|code| *code as u8).collect();
82 buffer.extend_from_slice(&p);
83 Ok(1 + remaining_len_bytes + remaining_len)
84 }
85}
86
87#[derive(Debug, Clone, PartialEq)]
88pub struct SubAckProperties {
89 pub reason_string: Option<String>,
90 pub user_properties: Vec<(String, String)>,
91}
92
93impl SubAckProperties {
94 pub fn len(&self) -> usize {
95 let mut len = 0;
96
97 if let Some(reason) = &self.reason_string {
98 len += 1 + 2 + reason.len();
99 }
100
101 for (key, value) in self.user_properties.iter() {
102 len += 1 + 2 + key.len() + 2 + value.len();
103 }
104
105 len
106 }
107
108 pub fn extract(mut bytes: &mut Bytes) -> Result<Option<SubAckProperties>, Error> {
109 let mut reason_string = None;
110 let mut user_properties = Vec::new();
111
112 let (properties_len_len, properties_len) = length(bytes.iter())?;
113 bytes.advance(properties_len_len);
114 if properties_len == 0 {
115 return Ok(None);
116 }
117
118 let mut cursor = 0;
119 while cursor < properties_len {
121 let prop = read_u8(&mut bytes)?;
122 cursor += 1;
123
124 match property(prop)? {
125 PropertyType::ReasonString => {
126 let reason = read_mqtt_string(&mut bytes)?;
127 cursor += 2 + reason.len();
128 reason_string = Some(reason);
129 }
130 PropertyType::UserProperty => {
131 let key = read_mqtt_string(&mut bytes)?;
132 let value = read_mqtt_string(&mut bytes)?;
133 cursor += 2 + key.len() + 2 + value.len();
134 user_properties.push((key, value));
135 }
136 _ => return Err(Error::InvalidPropertyType(prop)),
137 }
138 }
139
140 Ok(Some(SubAckProperties {
141 reason_string,
142 user_properties,
143 }))
144 }
145
146 fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
147 let len = self.len();
148 write_remaining_length(buffer, len)?;
149
150 if let Some(reason) = &self.reason_string {
151 buffer.put_u8(PropertyType::ReasonString as u8);
152 write_mqtt_string(buffer, reason);
153 }
154
155 for (key, value) in self.user_properties.iter() {
156 buffer.put_u8(PropertyType::UserProperty as u8);
157 write_mqtt_string(buffer, key);
158 write_mqtt_string(buffer, value);
159 }
160
161 Ok(())
162 }
163}
164
165#[repr(u8)]
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SubscribeReasonCode {
168 QoS0 = 0,
169 QoS1 = 1,
170 QoS2 = 2,
171 Unspecified = 128,
172 ImplementationSpecific = 131,
173 NotAuthorized = 135,
174 TopicFilterInvalid = 143,
175 PkidInUse = 145,
176 QuotaExceeded = 151,
177 SharedSubscriptionsNotSupported = 158,
178 SubscriptionIdNotSupported = 161,
179 WildcardSubscriptionsNotSupported = 162,
180}
181
182impl TryFrom<u8> for SubscribeReasonCode {
183 type Error = crate::Error;
184
185 fn try_from(value: u8) -> Result<Self, Self::Error> {
186 let v = match value {
187 0 => SubscribeReasonCode::QoS0,
188 1 => SubscribeReasonCode::QoS1,
189 2 => SubscribeReasonCode::QoS2,
190 128 => SubscribeReasonCode::Unspecified,
191 131 => SubscribeReasonCode::ImplementationSpecific,
192 135 => SubscribeReasonCode::NotAuthorized,
193 143 => SubscribeReasonCode::TopicFilterInvalid,
194 145 => SubscribeReasonCode::PkidInUse,
195 151 => SubscribeReasonCode::QuotaExceeded,
196 158 => SubscribeReasonCode::SharedSubscriptionsNotSupported,
197 161 => SubscribeReasonCode::SubscriptionIdNotSupported,
198 162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported,
199 v => return Err(crate::Error::InvalidSubscribeReasonCode(v)),
200 };
201
202 Ok(v)
203 }
204}
205
206#[cfg(test)]
207mod test {
208 use super::*;
209 use alloc::vec;
210 use bytes::BytesMut;
211 use pretty_assertions::assert_eq;
212
213 fn sample() -> SubAck {
214 let properties = SubAckProperties {
215 reason_string: Some("test".to_owned()),
216 user_properties: vec![("test".to_owned(), "test".to_owned())],
217 };
218
219 SubAck {
220 pkid: 42,
221 return_codes: vec![
222 SubscribeReasonCode::QoS0,
223 SubscribeReasonCode::QoS1,
224 SubscribeReasonCode::QoS2,
225 SubscribeReasonCode::Unspecified,
226 ],
227 properties: Some(properties),
228 }
229 }
230
231 fn sample_bytes() -> Vec<u8> {
232 vec![
233 0x90, 0x1b, 0x00, 0x2a, 0x14, 0x1f, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x26, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
238 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x00, 0x01, 0x02, 0x80, ]
241 }
242
243 #[test]
244 fn suback_parsing_works() {
245 let mut stream = BytesMut::new();
246 let packetstream = &sample_bytes();
247
248 stream.extend_from_slice(&packetstream[..]);
249 let fixed_header = parse_fixed_header(stream.iter()).unwrap();
250 let suback_bytes = stream.split_to(fixed_header.frame_length()).freeze();
251 let suback = SubAck::read(fixed_header, suback_bytes).unwrap();
252 assert_eq!(suback, sample());
253 }
254
255 #[test]
256 fn suback_encoding_works() {
257 let publish = sample();
258 let mut buf = BytesMut::new();
259 publish.write(&mut buf).unwrap();
260
261 assert_eq!(&buf[..], sample_bytes());
264 }
265}