Skip to main content

rumqttc/mqttbytes/v5/
pubcomp.rs

1use super::{
2    Error, FixedHeader, PropertyType, len_len, length, property, read_mqtt_string, read_u8,
3    read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7/// Return code in `PubComp`
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9#[repr(u8)]
10pub enum PubCompReason {
11    Success = 0,
12    PacketIdentifierNotFound = 146,
13}
14
15/// `QoS2` Assured publish complete, in response to PUBREL packet
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct PubComp {
18    pub pkid: u16,
19    pub reason: PubCompReason,
20    pub properties: Option<PubCompProperties>,
21}
22
23impl PubComp {
24    #[must_use]
25    pub const fn new(pkid: u16, properties: Option<PubCompProperties>) -> Self {
26        Self {
27            pkid,
28            reason: PubCompReason::Success,
29            properties,
30        }
31    }
32
33    #[must_use]
34    pub fn size(&self) -> usize {
35        if self.reason == PubCompReason::Success && self.properties.is_none() {
36            return 4;
37        }
38        let len = self.len();
39        let remaining_len_size = len_len(len);
40
41        1 + remaining_len_size + len
42    }
43
44    fn len(&self) -> usize {
45        let mut len = 2 + 1; // pkid + reason
46
47        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
48        // and there are no Properties. In this case the PUBCOMP has a Remaining Length of 2.
49        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901154>
50        if self.reason == PubCompReason::Success && self.properties.is_none() {
51            return 2;
52        }
53
54        if let Some(p) = &self.properties {
55            let properties_len = p.len();
56            let properties_len_len = len_len(properties_len);
57            len += properties_len_len + properties_len;
58        } else {
59            len += 1;
60        }
61
62        len
63    }
64
65    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
66        let variable_header_index = fixed_header.header_len;
67        bytes.advance(variable_header_index);
68        let pkid = read_u16(&mut bytes)?;
69
70        if fixed_header.remaining_len == 2 {
71            return Ok(Self {
72                pkid,
73                reason: PubCompReason::Success,
74                properties: None,
75            });
76        }
77
78        let ack_reason = read_u8(&mut bytes)?;
79        if fixed_header.remaining_len < 4 {
80            return Ok(Self {
81                pkid,
82                reason: reason(ack_reason)?,
83                properties: None,
84            });
85        }
86
87        let properties = PubCompProperties::read(&mut bytes)?;
88        let puback = Self {
89            pkid,
90            reason: reason(ack_reason)?,
91            properties,
92        };
93
94        Ok(puback)
95    }
96
97    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
98        let len = self.len();
99        buffer.put_u8(0x70);
100        let count = write_remaining_length(buffer, len)?;
101        buffer.put_u16(self.pkid);
102
103        // If there are no properties during success, sending reason code is optional
104        if self.reason == PubCompReason::Success && self.properties.is_none() {
105            return Ok(4);
106        }
107
108        buffer.put_u8(code(self.reason));
109
110        if let Some(p) = &self.properties {
111            p.write(buffer)?;
112        } else {
113            write_remaining_length(buffer, 0)?;
114        }
115
116        Ok(1 + count + len)
117    }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct PubCompProperties {
122    pub reason_string: Option<String>,
123    pub user_properties: Vec<(String, String)>,
124}
125
126impl PubCompProperties {
127    fn len(&self) -> usize {
128        let mut len = 0;
129
130        if let Some(reason) = &self.reason_string {
131            len += 1 + 2 + reason.len();
132        }
133
134        for (key, value) in &self.user_properties {
135            len += 1 + 2 + key.len() + 2 + value.len();
136        }
137
138        len
139    }
140
141    pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
142        let mut reason_string = None;
143        let mut user_properties = Vec::new();
144
145        let (properties_len_len, properties_len) = length(bytes.iter())?;
146        bytes.advance(properties_len_len);
147        if properties_len == 0 {
148            return Ok(None);
149        }
150
151        let mut cursor = 0;
152        // read until cursor reaches property length. properties_len = 0 will skip this loop
153        while cursor < properties_len {
154            let prop = read_u8(bytes)?;
155            cursor += 1;
156
157            match property(prop)? {
158                PropertyType::ReasonString => {
159                    let reason = read_mqtt_string(bytes)?;
160                    cursor += 2 + reason.len();
161                    reason_string = Some(reason);
162                }
163                PropertyType::UserProperty => {
164                    let key = read_mqtt_string(bytes)?;
165                    let value = read_mqtt_string(bytes)?;
166                    cursor += 2 + key.len() + 2 + value.len();
167                    user_properties.push((key, value));
168                }
169                _ => return Err(Error::InvalidPropertyType(prop)),
170            }
171        }
172
173        Ok(Some(Self {
174            reason_string,
175            user_properties,
176        }))
177    }
178
179    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
180        let len = self.len();
181        write_remaining_length(buffer, len)?;
182
183        if let Some(reason) = &self.reason_string {
184            buffer.put_u8(PropertyType::ReasonString as u8);
185            write_mqtt_string(buffer, reason);
186        }
187
188        for (key, value) in &self.user_properties {
189            buffer.put_u8(PropertyType::UserProperty as u8);
190            write_mqtt_string(buffer, key);
191            write_mqtt_string(buffer, value);
192        }
193
194        Ok(())
195    }
196}
197
198/// Connection return code type
199fn reason(num: u8) -> Result<PubCompReason, Error> {
200    num.try_into()
201}
202
203const fn code(reason: PubCompReason) -> u8 {
204    reason as u8
205}
206
207impl TryFrom<u8> for PubCompReason {
208    type Error = Error;
209
210    fn try_from(value: u8) -> Result<Self, Self::Error> {
211        match value {
212            0 => Ok(Self::Success),
213            146 => Ok(Self::PacketIdentifierNotFound),
214            _ => Err(Error::InvalidConnectReturnCode(value)),
215        }
216    }
217}
218
219#[cfg(test)]
220mod test {
221    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
222    use super::*;
223    use bytes::BytesMut;
224    use pretty_assertions::assert_eq;
225
226    #[test]
227    fn length_calculation() {
228        let mut dummy_bytes = BytesMut::new();
229        // Use user_properties to pad the size to exceed ~128 bytes to make the
230        // remaining_length field in the packet be 2 bytes long.
231        let pubcomp_props = PubCompProperties {
232            reason_string: None,
233            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
234        };
235
236        let pubcomp_pkt = PubComp::new(1, Some(pubcomp_props));
237
238        let size_from_size = pubcomp_pkt.size();
239        let size_from_write = pubcomp_pkt.write(&mut dummy_bytes).unwrap();
240        let size_from_bytes = dummy_bytes.len();
241
242        assert_eq!(size_from_write, size_from_bytes);
243        assert_eq!(size_from_size, size_from_bytes);
244    }
245
246    #[test]
247    fn reason_code_cast_matches_spec() {
248        assert_eq!(PubCompReason::Success as u8, 0);
249        assert_eq!(PubCompReason::PacketIdentifierNotFound as u8, 146);
250    }
251
252    #[test]
253    fn reason_and_code_round_trip() {
254        let values = [
255            (0, PubCompReason::Success),
256            (146, PubCompReason::PacketIdentifierNotFound),
257        ];
258
259        for (raw, parsed) in values {
260            assert_eq!(reason(raw).unwrap(), parsed);
261            assert_eq!(code(parsed), raw);
262        }
263    }
264
265    #[test]
266    fn reason_invalid_code_errors() {
267        assert!(matches!(
268            reason(42),
269            Err(Error::InvalidConnectReturnCode(42))
270        ));
271    }
272}