1use super::{
2 Error, FixedHeader, PropertyType, len_len, length, property, read_mqtt_string, read_u8,
3 read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9#[repr(u8)]
10pub enum PubRecReason {
11 Success = 0,
12 NoMatchingSubscribers = 16,
13 UnspecifiedError = 128,
14 ImplementationSpecificError = 131,
15 NotAuthorized = 135,
16 TopicNameInvalid = 144,
17 PacketIdentifierInUse = 145,
18 QuotaExceeded = 151,
19 PayloadFormatInvalid = 153,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PubRec {
25 pub pkid: u16,
26 pub reason: PubRecReason,
27 pub properties: Option<PubRecProperties>,
28}
29
30impl PubRec {
31 #[must_use]
32 pub const fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
33 Self {
34 pkid,
35 reason: PubRecReason::Success,
36 properties,
37 }
38 }
39
40 #[must_use]
41 pub fn size(&self) -> usize {
42 let len = self.len();
43 let remaining_len_size = len_len(len);
44
45 1 + remaining_len_size + len
46 }
47
48 fn len(&self) -> usize {
49 let mut len = 2 + 1; if self.reason == PubRecReason::Success && self.properties.is_none() {
55 return 2;
56 }
57
58 if let Some(p) = &self.properties {
59 let properties_len = p.len();
60 let properties_len_len = len_len(properties_len);
61 len += properties_len_len + properties_len;
62 } else {
63 len += 1;
64 }
65
66 len
67 }
68
69 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
70 let variable_header_index = fixed_header.header_len;
71 bytes.advance(variable_header_index);
72 let pkid = read_u16(&mut bytes)?;
73 if fixed_header.remaining_len == 2 {
74 return Ok(Self {
75 pkid,
76 reason: PubRecReason::Success,
77 properties: None,
78 });
79 }
80
81 let ack_reason = read_u8(&mut bytes)?;
82 if fixed_header.remaining_len < 4 {
83 return Ok(Self {
84 pkid,
85 reason: reason(ack_reason)?,
86 properties: None,
87 });
88 }
89
90 let properties = PubRecProperties::read(&mut bytes)?;
91 let puback = Self {
92 pkid,
93 reason: reason(ack_reason)?,
94 properties,
95 };
96 Ok(puback)
97 }
98
99 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
100 let len = self.len();
101 buffer.put_u8(0x50);
102 let count = write_remaining_length(buffer, len)?;
103 buffer.put_u16(self.pkid);
104
105 if self.reason == PubRecReason::Success && self.properties.is_none() {
107 return Ok(4);
108 }
109
110 buffer.put_u8(code(self.reason));
111
112 if let Some(p) = &self.properties {
113 p.write(buffer)?;
114 } else {
115 write_remaining_length(buffer, 0)?;
116 }
117
118 Ok(1 + count + len)
119 }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct PubRecProperties {
124 pub reason_string: Option<String>,
125 pub user_properties: Vec<(String, String)>,
126}
127
128impl PubRecProperties {
129 fn len(&self) -> usize {
130 let mut len = 0;
131
132 if let Some(reason) = &self.reason_string {
133 len += 1 + 2 + reason.len();
134 }
135
136 for (key, value) in &self.user_properties {
137 len += 1 + 2 + key.len() + 2 + value.len();
138 }
139
140 len
141 }
142
143 pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
144 let mut reason_string = None;
145 let mut user_properties = Vec::new();
146
147 let (properties_len_len, properties_len) = length(bytes.iter())?;
148 bytes.advance(properties_len_len);
149 if properties_len == 0 {
150 return Ok(None);
151 }
152
153 let mut cursor = 0;
154 while cursor < properties_len {
156 let prop = read_u8(bytes)?;
157 cursor += 1;
158
159 match property(prop)? {
160 PropertyType::ReasonString => {
161 let reason = read_mqtt_string(bytes)?;
162 cursor += 2 + reason.len();
163 reason_string = Some(reason);
164 }
165 PropertyType::UserProperty => {
166 let key = read_mqtt_string(bytes)?;
167 let value = read_mqtt_string(bytes)?;
168 cursor += 2 + key.len() + 2 + value.len();
169 user_properties.push((key, value));
170 }
171 _ => return Err(Error::InvalidPropertyType(prop)),
172 }
173 }
174
175 Ok(Some(Self {
176 reason_string,
177 user_properties,
178 }))
179 }
180
181 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
182 let len = self.len();
183 write_remaining_length(buffer, len)?;
184
185 if let Some(reason) = &self.reason_string {
186 buffer.put_u8(PropertyType::ReasonString as u8);
187 write_mqtt_string(buffer, reason);
188 }
189
190 for (key, value) in &self.user_properties {
191 buffer.put_u8(PropertyType::UserProperty as u8);
192 write_mqtt_string(buffer, key);
193 write_mqtt_string(buffer, value);
194 }
195
196 Ok(())
197 }
198}
199
200fn reason(num: u8) -> Result<PubRecReason, Error> {
202 num.try_into()
203}
204
205const fn code(reason: PubRecReason) -> u8 {
206 reason as u8
207}
208
209impl TryFrom<u8> for PubRecReason {
210 type Error = Error;
211
212 fn try_from(value: u8) -> Result<Self, Self::Error> {
213 match value {
214 0 => Ok(Self::Success),
215 16 => Ok(Self::NoMatchingSubscribers),
216 128 => Ok(Self::UnspecifiedError),
217 131 => Ok(Self::ImplementationSpecificError),
218 135 => Ok(Self::NotAuthorized),
219 144 => Ok(Self::TopicNameInvalid),
220 145 => Ok(Self::PacketIdentifierInUse),
221 151 => Ok(Self::QuotaExceeded),
222 153 => Ok(Self::PayloadFormatInvalid),
223 _ => Err(Error::InvalidConnectReturnCode(value)),
224 }
225 }
226}
227
228#[cfg(test)]
229mod test {
230 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
231 use super::*;
232 use bytes::BytesMut;
233 use pretty_assertions::assert_eq;
234
235 #[test]
236 fn length_calculation() {
237 let mut dummy_bytes = BytesMut::new();
238 let pubrec_props = PubRecProperties {
241 reason_string: None,
242 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
243 };
244
245 let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
246
247 let size_from_size = pubrec_pkt.size();
248 let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
249 let size_from_bytes = dummy_bytes.len();
250
251 assert_eq!(size_from_write, size_from_bytes);
252 assert_eq!(size_from_size, size_from_bytes);
253 }
254
255 #[test]
256 fn reason_code_cast_matches_spec() {
257 assert_eq!(PubRecReason::Success as u8, 0);
258 assert_eq!(PubRecReason::NoMatchingSubscribers as u8, 16);
259 assert_eq!(PubRecReason::UnspecifiedError as u8, 128);
260 assert_eq!(PubRecReason::ImplementationSpecificError as u8, 131);
261 assert_eq!(PubRecReason::NotAuthorized as u8, 135);
262 assert_eq!(PubRecReason::TopicNameInvalid as u8, 144);
263 assert_eq!(PubRecReason::PacketIdentifierInUse as u8, 145);
264 assert_eq!(PubRecReason::QuotaExceeded as u8, 151);
265 assert_eq!(PubRecReason::PayloadFormatInvalid as u8, 153);
266 }
267
268 #[test]
269 fn reason_and_code_round_trip() {
270 let values = [
271 (0, PubRecReason::Success),
272 (16, PubRecReason::NoMatchingSubscribers),
273 (128, PubRecReason::UnspecifiedError),
274 (131, PubRecReason::ImplementationSpecificError),
275 (135, PubRecReason::NotAuthorized),
276 (144, PubRecReason::TopicNameInvalid),
277 (145, PubRecReason::PacketIdentifierInUse),
278 (151, PubRecReason::QuotaExceeded),
279 (153, PubRecReason::PayloadFormatInvalid),
280 ];
281
282 for (raw, parsed) in values {
283 assert_eq!(reason(raw).unwrap(), parsed);
284 assert_eq!(code(parsed), raw);
285 }
286 }
287
288 #[test]
289 fn reason_invalid_code_errors() {
290 assert!(matches!(
291 reason(42),
292 Err(Error::InvalidConnectReturnCode(42))
293 ));
294 }
295}