1use crate::MqttString;
2
3use super::{
4 len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5 write_remaining_length, Debug, Error, FixedHeader, PropertyType,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PubAckReason {
12 Success,
13 NoMatchingSubscribers,
14 UnspecifiedError,
15 ImplementationSpecificError,
16 NotAuthorized,
17 TopicNameInvalid,
18 PacketIdentifierInUse,
19 QuotaExceeded,
20 PayloadFormatInvalid,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct PubAck {
26 pub pkid: u16,
27 pub reason: PubAckReason,
28 pub properties: Option<PubAckProperties>,
29}
30
31impl PubAck {
32 #[must_use]
33 pub fn new(pkid: u16, properties: Option<PubAckProperties>) -> Self {
34 Self {
35 pkid,
36 reason: PubAckReason::Success,
37 properties,
38 }
39 }
40
41 #[must_use]
42 pub fn size(&self) -> usize {
43 if self.reason == PubAckReason::Success && self.properties.is_none() {
44 return 4;
45 }
46 let len = self.len();
47 let remaining_len_size = len_len(len);
48
49 1 + remaining_len_size + len
50 }
51
52 fn len(&self) -> usize {
53 let mut len = 2 + 1; if self.reason == PubAckReason::Success && self.properties.is_none() {
57 return 2;
58 }
59
60 if let Some(p) = &self.properties {
61 let properties_len = p.len();
62 let properties_len_len = len_len(properties_len);
63 len += properties_len_len + properties_len;
64 } else {
65 len += 1;
67 }
68
69 len
70 }
71
72 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubAck, Error> {
73 let variable_header_index = fixed_header.fixed_header_len;
74 bytes.advance(variable_header_index);
75 let pkid = read_u16(&mut bytes)?;
76
77 if fixed_header.remaining_len == 2 {
79 return Ok(PubAck {
80 pkid,
81 reason: PubAckReason::Success,
82 properties: None,
83 });
84 }
85
86 let ack_reason = read_u8(&mut bytes)?;
88 if fixed_header.remaining_len < 4 {
89 return Ok(PubAck {
90 pkid,
91 reason: reason(ack_reason)?,
92 properties: None,
93 });
94 }
95
96 let properties = PubAckProperties::read(&mut bytes)?;
97 let puback = PubAck {
98 pkid,
99 reason: reason(ack_reason)?,
100 properties,
101 };
102
103 Ok(puback)
104 }
105
106 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
107 let len = self.len();
108 buffer.put_u8(0x40);
109
110 let count = write_remaining_length(buffer, len)?;
111 buffer.put_u16(self.pkid);
112
113 if self.reason == PubAckReason::Success && self.properties.is_none() {
115 return Ok(4);
116 }
117
118 buffer.put_u8(code(self.reason));
119 if let Some(p) = &self.properties {
120 p.write(buffer)?;
121 } else {
122 write_remaining_length(buffer, 0)?;
123 }
124
125 Ok(1 + count + len)
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct PubAckProperties {
131 pub reason_string: Option<MqttString>,
132 pub user_properties: Vec<(MqttString, MqttString)>,
133}
134
135impl PubAckProperties {
136 fn len(&self) -> usize {
137 let mut len = 0;
138
139 if let Some(reason) = &self.reason_string {
140 len += 1 + 2 + reason.len();
141 }
142
143 for (key, value) in &self.user_properties {
144 len += 1 + 2 + key.len() + 2 + value.len();
145 }
146
147 len
148 }
149
150 pub fn read(bytes: &mut Bytes) -> Result<Option<PubAckProperties>, Error> {
151 let mut reason_string = None;
152 let mut user_properties = Vec::new();
153
154 let (properties_len_len, properties_len) = length(bytes.iter())?;
155 bytes.advance(properties_len_len);
156 if properties_len == 0 {
157 return Ok(None);
158 }
159
160 let mut cursor = 0;
161 while cursor < properties_len {
163 let prop = read_u8(bytes)?;
164 cursor += 1;
165
166 match property(prop)? {
167 PropertyType::ReasonString => {
168 let reason = read_mqtt_string(bytes)?;
169 cursor += 2 + reason.len();
170 reason_string = Some(reason);
171 }
172 PropertyType::UserProperty => {
173 let key = read_mqtt_string(bytes)?;
174 let value = read_mqtt_string(bytes)?;
175 cursor += 2 + key.len() + 2 + value.len();
176 user_properties.push((key, value));
177 }
178 _ => return Err(Error::InvalidPropertyType(prop)),
179 }
180 }
181
182 Ok(Some(PubAckProperties {
183 reason_string,
184 user_properties,
185 }))
186 }
187
188 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
189 let len = self.len();
190 write_remaining_length(buffer, len)?;
191
192 if let Some(reason) = &self.reason_string {
193 buffer.put_u8(PropertyType::ReasonString as u8);
194 write_mqtt_string(buffer, reason)?;
195 }
196
197 for (key, value) in &self.user_properties {
198 buffer.put_u8(PropertyType::UserProperty as u8);
199 write_mqtt_string(buffer, key)?;
200 write_mqtt_string(buffer, value)?;
201 }
202
203 Ok(())
204 }
205}
206
207fn reason(num: u8) -> Result<PubAckReason, Error> {
209 let code = match num {
210 0 => PubAckReason::Success,
211 16 => PubAckReason::NoMatchingSubscribers,
212 128 => PubAckReason::UnspecifiedError,
213 131 => PubAckReason::ImplementationSpecificError,
214 135 => PubAckReason::NotAuthorized,
215 144 => PubAckReason::TopicNameInvalid,
216 145 => PubAckReason::PacketIdentifierInUse,
217 151 => PubAckReason::QuotaExceeded,
218 153 => PubAckReason::PayloadFormatInvalid,
219 num => return Err(Error::InvalidConnectReturnCode(num)),
220 };
221
222 Ok(code)
223}
224
225fn code(reason: PubAckReason) -> u8 {
227 match reason {
228 PubAckReason::Success => 0,
229 PubAckReason::NoMatchingSubscribers => 16,
230 PubAckReason::UnspecifiedError => 128,
231 PubAckReason::ImplementationSpecificError => 131,
232 PubAckReason::NotAuthorized => 135,
233 PubAckReason::TopicNameInvalid => 144,
234 PubAckReason::PacketIdentifierInUse => 145,
235 PubAckReason::QuotaExceeded => 151,
236 PubAckReason::PayloadFormatInvalid => 153,
237 }
238}
239
240#[cfg(test)]
241mod test {
242 use crate::test::read_write_packets;
243 use crate::Packet;
244
245 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
246 use super::*;
247 use bytes::BytesMut;
248 use pretty_assertions::assert_eq;
249
250 #[test]
251 fn length_calculation() {
252 let mut dummy_bytes = BytesMut::new();
253 let puback_props = PubAckProperties {
256 reason_string: None,
257 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
258 };
259
260 let puback_pkt = PubAck::new(1, Some(puback_props));
261
262 let size_from_size = puback_pkt.size();
263 let size_from_write = puback_pkt.write(&mut dummy_bytes).unwrap();
264 let size_from_bytes = dummy_bytes.len();
265
266 assert_eq!(size_from_write, size_from_bytes);
267 assert_eq!(size_from_size, size_from_bytes);
268 }
269
270 #[test]
271 fn test_write_read() {
272 read_write_packets(write_read_provider());
273 }
274
275 fn write_read_provider() -> Vec<Packet> {
276 vec![
277 Packet::PubAck(PubAck {
278 pkid: 42,
279 reason: PubAckReason::Success,
280 properties: None,
281 }),
282 Packet::PubAck(PubAck {
283 pkid: 42,
284 reason: PubAckReason::Success,
285 properties: Some(PubAckProperties {
286 reason_string: Some("hello".into()),
287 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
288 }),
289 }),
290 ]
291 }
292}