rumqttd/protocol/v5/
puback.rs1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4fn len(puback: &PubAck, properties: &Option<PubAckProperties>) -> usize {
5 let mut len = 2 + 1; if puback.reason == PubAckReason::Success && properties.is_none() {
9 return 2;
10 }
11
12 if let Some(p) = properties {
13 let properties_len = properties::len(p);
14 let properties_len_len = len_len(properties_len);
15 len += properties_len_len + properties_len;
16 } else {
17 len += 1;
19 }
20
21 len
22}
23
24pub fn read(
25 fixed_header: FixedHeader,
26 mut bytes: Bytes,
27) -> Result<(PubAck, Option<PubAckProperties>), Error> {
28 let variable_header_index = fixed_header.fixed_header_len;
29 bytes.advance(variable_header_index);
30 let pkid = read_u16(&mut bytes)?;
31
32 if fixed_header.remaining_len == 2 {
34 return Ok((
35 PubAck {
36 pkid,
37 reason: PubAckReason::Success,
38 },
39 None,
40 ));
41 }
42
43 let ack_reason = read_u8(&mut bytes)?;
45 if fixed_header.remaining_len < 4 {
46 return Ok((
47 PubAck {
48 pkid,
49 reason: reason(ack_reason)?,
50 },
51 None,
52 ));
53 }
54
55 let puback = PubAck {
56 pkid,
57 reason: reason(ack_reason)?,
58 };
59
60 let properties = properties::read(&mut bytes)?;
61 Ok((puback, properties))
62}
63
64pub fn write(
65 puback: &PubAck,
66 properties: &Option<PubAckProperties>,
67 buffer: &mut BytesMut,
68) -> Result<usize, Error> {
69 let len = len(puback, properties);
70 buffer.put_u8(0x40);
71
72 let count = write_remaining_length(buffer, len)?;
73 buffer.put_u16(puback.pkid);
74
75 if puback.reason == PubAckReason::Success && properties.is_none() {
77 return Ok(4);
78 }
79
80 buffer.put_u8(code(puback.reason));
81 if let Some(p) = properties {
82 properties::write(p, buffer)?;
83 } else {
84 write_remaining_length(buffer, 0)?;
85 }
86
87 Ok(1 + count + len)
88}
89
90mod properties {
91 use super::*;
92
93 pub fn len(properties: &PubAckProperties) -> usize {
94 let mut len = 0;
95
96 if let Some(reason) = &properties.reason_string {
97 len += 1 + 2 + reason.len();
98 }
99
100 for (key, value) in properties.user_properties.iter() {
101 len += 1 + 2 + key.len() + 2 + value.len();
102 }
103
104 len
105 }
106
107 pub fn read(mut bytes: &mut Bytes) -> Result<Option<PubAckProperties>, Error> {
108 let mut reason_string = None;
109 let mut user_properties = Vec::new();
110
111 let (properties_len_len, properties_len) = length(bytes.iter())?;
112 bytes.advance(properties_len_len);
113 if properties_len == 0 {
114 return Ok(None);
115 }
116
117 let mut cursor = 0;
118 while cursor < properties_len {
120 let prop = read_u8(bytes)?;
121 cursor += 1;
122
123 match property(prop)? {
124 PropertyType::ReasonString => {
125 let reason = read_mqtt_string(bytes)?;
126 cursor += 2 + reason.len();
127 reason_string = Some(reason);
128 }
129 PropertyType::UserProperty => {
130 let key = read_mqtt_string(bytes)?;
131 let value = read_mqtt_string(bytes)?;
132 cursor += 2 + key.len() + 2 + value.len();
133 user_properties.push((key, value));
134 }
135 _ => return Err(Error::InvalidPropertyType(prop)),
136 }
137 }
138
139 Ok(Some(PubAckProperties {
140 reason_string,
141 user_properties,
142 }))
143 }
144
145 pub fn write(properties: &PubAckProperties, buffer: &mut BytesMut) -> Result<(), Error> {
146 let len = len(properties);
147 write_remaining_length(buffer, len)?;
148
149 if let Some(reason) = &properties.reason_string {
150 buffer.put_u8(PropertyType::ReasonString as u8);
151 write_mqtt_string(buffer, reason);
152 }
153
154 for (key, value) in properties.user_properties.iter() {
155 buffer.put_u8(PropertyType::UserProperty as u8);
156 write_mqtt_string(buffer, key);
157 write_mqtt_string(buffer, value);
158 }
159
160 Ok(())
161 }
162}
163
164fn reason(num: u8) -> Result<PubAckReason, Error> {
166 let code = match num {
167 0 => PubAckReason::Success,
168 16 => PubAckReason::NoMatchingSubscribers,
169 128 => PubAckReason::UnspecifiedError,
170 131 => PubAckReason::ImplementationSpecificError,
171 135 => PubAckReason::NotAuthorized,
172 144 => PubAckReason::TopicNameInvalid,
173 145 => PubAckReason::PacketIdentifierInUse,
174 151 => PubAckReason::QuotaExceeded,
175 153 => PubAckReason::PayloadFormatInvalid,
176 num => return Err(Error::InvalidConnectReturnCode(num)),
177 };
178
179 Ok(code)
180}
181
182fn code(reason: PubAckReason) -> u8 {
184 match reason {
185 PubAckReason::Success => 0,
186 PubAckReason::NoMatchingSubscribers => 16,
187 PubAckReason::UnspecifiedError => 128,
188 PubAckReason::ImplementationSpecificError => 131,
189 PubAckReason::NotAuthorized => 135,
190 PubAckReason::TopicNameInvalid => 144,
191 PubAckReason::PacketIdentifierInUse => 145,
192 PubAckReason::QuotaExceeded => 151,
193 PubAckReason::PayloadFormatInvalid => 153,
194 }
195}