1use crate::MqttString;
2
3use super::{
4 len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5 write_remaining_length, Debug, Error, FixedHeader, PropertyType,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11#[repr(u8)]
12pub enum PubRecReason {
13 Success,
14 NoMatchingSubscribers,
15 UnspecifiedError,
16 ImplementationSpecificError,
17 NotAuthorized,
18 TopicNameInvalid,
19 PacketIdentifierInUse,
20 QuotaExceeded,
21 PayloadFormatInvalid,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct PubRec {
27 pub pkid: u16,
28 pub reason: PubRecReason,
29 pub properties: Option<PubRecProperties>,
30}
31
32impl PubRec {
33 #[must_use]
34 pub fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
35 Self {
36 pkid,
37 reason: PubRecReason::Success,
38 properties,
39 }
40 }
41
42 #[must_use]
43 pub fn size(&self) -> usize {
44 let len = self.len();
45 let remaining_len_size = len_len(len);
46
47 1 + remaining_len_size + len
48 }
49
50 fn len(&self) -> usize {
51 let mut len = 2 + 1; if self.reason == PubRecReason::Success && self.properties.is_none() {
57 return 2;
58 }
59
60 if let Some(p) = &self.properties {
61 let properties_len = p.len();
62 let properties_len_len = len_len(properties_len);
63 len += properties_len_len + properties_len;
64 } else {
65 len += 1;
66 }
67
68 len
69 }
70
71 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubRec, Error> {
72 let variable_header_index = fixed_header.fixed_header_len;
73 bytes.advance(variable_header_index);
74 let pkid = read_u16(&mut bytes)?;
75 if fixed_header.remaining_len == 2 {
76 return Ok(PubRec {
77 pkid,
78 reason: PubRecReason::Success,
79 properties: None,
80 });
81 }
82
83 let ack_reason = read_u8(&mut bytes)?;
84 if fixed_header.remaining_len < 4 {
85 return Ok(PubRec {
86 pkid,
87 reason: reason(ack_reason)?,
88 properties: None,
89 });
90 }
91
92 let properties = PubRecProperties::read(&mut bytes)?;
93 let puback = PubRec {
94 pkid,
95 reason: reason(ack_reason)?,
96 properties,
97 };
98 Ok(puback)
99 }
100
101 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
102 let len = self.len();
103 buffer.put_u8(0x50);
104 let count = write_remaining_length(buffer, len)?;
105 buffer.put_u16(self.pkid);
106
107 if self.reason == PubRecReason::Success && self.properties.is_none() {
109 return Ok(4);
110 }
111
112 buffer.put_u8(code(self.reason));
113
114 if let Some(p) = &self.properties {
115 p.write(buffer)?;
116 } else {
117 write_remaining_length(buffer, 0)?;
118 }
119
120 Ok(1 + count + len)
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct PubRecProperties {
126 pub reason_string: Option<MqttString>,
127 pub user_properties: Vec<(MqttString, MqttString)>,
128}
129
130impl PubRecProperties {
131 fn len(&self) -> usize {
132 let mut len = 0;
133
134 if let Some(reason) = &self.reason_string {
135 len += 1 + 2 + reason.len();
136 }
137
138 for (key, value) in &self.user_properties {
139 len += 1 + 2 + key.len() + 2 + value.len();
140 }
141
142 len
143 }
144
145 pub fn read(bytes: &mut Bytes) -> Result<Option<PubRecProperties>, Error> {
146 let mut reason_string = None;
147 let mut user_properties = Vec::new();
148
149 let (properties_len_len, properties_len) = length(bytes.iter())?;
150 bytes.advance(properties_len_len);
151 if properties_len == 0 {
152 return Ok(None);
153 }
154
155 let mut cursor = 0;
156 while cursor < properties_len {
158 let prop = read_u8(bytes)?;
159 cursor += 1;
160
161 match property(prop)? {
162 PropertyType::ReasonString => {
163 let reason = read_mqtt_string(bytes)?;
164 cursor += 2 + reason.len();
165 reason_string = Some(reason);
166 }
167 PropertyType::UserProperty => {
168 let key = read_mqtt_string(bytes)?;
169 let value = read_mqtt_string(bytes)?;
170 cursor += 2 + key.len() + 2 + value.len();
171 user_properties.push((key, value));
172 }
173 _ => return Err(Error::InvalidPropertyType(prop)),
174 }
175 }
176
177 Ok(Some(PubRecProperties {
178 reason_string,
179 user_properties,
180 }))
181 }
182
183 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
184 let len = self.len();
185 write_remaining_length(buffer, len)?;
186
187 if let Some(reason) = &self.reason_string {
188 buffer.put_u8(PropertyType::ReasonString as u8);
189 write_mqtt_string(buffer, reason)?;
190 }
191
192 for (key, value) in &self.user_properties {
193 buffer.put_u8(PropertyType::UserProperty as u8);
194 write_mqtt_string(buffer, key)?;
195 write_mqtt_string(buffer, value)?;
196 }
197
198 Ok(())
199 }
200}
201
202fn reason(num: u8) -> Result<PubRecReason, Error> {
204 let code = match num {
205 0 => PubRecReason::Success,
206 16 => PubRecReason::NoMatchingSubscribers,
207 128 => PubRecReason::UnspecifiedError,
208 131 => PubRecReason::ImplementationSpecificError,
209 135 => PubRecReason::NotAuthorized,
210 144 => PubRecReason::TopicNameInvalid,
211 145 => PubRecReason::PacketIdentifierInUse,
212 151 => PubRecReason::QuotaExceeded,
213 153 => PubRecReason::PayloadFormatInvalid,
214 num => return Err(Error::InvalidConnectReturnCode(num)),
215 };
216
217 Ok(code)
218}
219
220fn code(reason: PubRecReason) -> u8 {
221 match reason {
222 PubRecReason::Success => 0,
223 PubRecReason::NoMatchingSubscribers => 16,
224 PubRecReason::UnspecifiedError => 128,
225 PubRecReason::ImplementationSpecificError => 131,
226 PubRecReason::NotAuthorized => 135,
227 PubRecReason::TopicNameInvalid => 144,
228 PubRecReason::PacketIdentifierInUse => 145,
229 PubRecReason::QuotaExceeded => 151,
230 PubRecReason::PayloadFormatInvalid => 153,
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use crate::test::read_write_packets;
237 use crate::Packet;
238
239 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
240 use super::*;
241 use bytes::BytesMut;
242 use pretty_assertions::assert_eq;
243
244 #[test]
245 fn length_calculation() {
246 let mut dummy_bytes = BytesMut::new();
247 let pubrec_props = PubRecProperties {
250 reason_string: None,
251 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
252 };
253
254 let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
255
256 let size_from_size = pubrec_pkt.size();
257 let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
258 let size_from_bytes = dummy_bytes.len();
259
260 assert_eq!(size_from_write, size_from_bytes);
261 assert_eq!(size_from_size, size_from_bytes);
262 }
263
264 #[test]
265 fn test_write_read() {
266 read_write_packets(write_read_provider());
267 }
268
269 fn write_read_provider() -> Vec<Packet> {
270 vec![
271 Packet::PubRec(PubRec {
272 pkid: 42,
273 reason: PubRecReason::Success,
274 properties: None,
275 }),
276 Packet::PubRec(PubRec {
277 pkid: 42,
278 reason: PubRecReason::Success,
279 properties: Some(PubRecProperties {
280 reason_string: None,
281 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
282 }),
283 }),
284 ]
285 }
286}