mqtt_bytes_v5/
pubcomp.rs

1use crate::MqttString;
2
3use super::{
4    len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5    write_remaining_length, Debug, Error, FixedHeader, PropertyType,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Return code in `PubComp`
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11#[repr(u8)]
12pub enum PubCompReason {
13    Success,
14    PacketIdentifierNotFound,
15}
16
17/// `QoS2` Assured publish complete, in response to PUBREL packet
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct PubComp {
20    pub pkid: u16,
21    pub reason: PubCompReason,
22    pub properties: Option<PubCompProperties>,
23}
24
25impl PubComp {
26    #[must_use]
27    pub fn new(pkid: u16, properties: Option<PubCompProperties>) -> Self {
28        Self {
29            pkid,
30            reason: PubCompReason::Success,
31            properties,
32        }
33    }
34
35    #[must_use]
36    pub fn size(&self) -> usize {
37        if self.reason == PubCompReason::Success && self.properties.is_none() {
38            return 4;
39        }
40        let len = self.len();
41        let remaining_len_size = len_len(len);
42
43        1 + remaining_len_size + len
44    }
45
46    fn len(&self) -> usize {
47        let mut len = 2 + 1; // pkid + reason
48
49        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
50        // and there are no Properties. In this case the PUBCOMP has a Remaining Length of 2.
51        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901154>
52        if self.reason == PubCompReason::Success && self.properties.is_none() {
53            return 2;
54        }
55
56        if let Some(p) = &self.properties {
57            let properties_len = p.len();
58            let properties_len_len = len_len(properties_len);
59            len += properties_len_len + properties_len;
60        } else {
61            len += 1;
62        }
63
64        len
65    }
66
67    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubComp, Error> {
68        let variable_header_index = fixed_header.fixed_header_len;
69        bytes.advance(variable_header_index);
70        let pkid = read_u16(&mut bytes)?;
71
72        if fixed_header.remaining_len == 2 {
73            return Ok(PubComp {
74                pkid,
75                reason: PubCompReason::Success,
76                properties: None,
77            });
78        }
79
80        let ack_reason = read_u8(&mut bytes)?;
81        if fixed_header.remaining_len < 4 {
82            return Ok(PubComp {
83                pkid,
84                reason: reason(ack_reason)?,
85                properties: None,
86            });
87        }
88
89        let properties = PubCompProperties::read(&mut bytes)?;
90        let puback = PubComp {
91            pkid,
92            reason: reason(ack_reason)?,
93            properties,
94        };
95
96        Ok(puback)
97    }
98
99    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
100        let len = self.len();
101        buffer.put_u8(0x70);
102        let count = write_remaining_length(buffer, len)?;
103        buffer.put_u16(self.pkid);
104
105        // If there are no properties during success, sending reason code is optional
106        if self.reason == PubCompReason::Success && self.properties.is_none() {
107            return Ok(4);
108        }
109
110        buffer.put_u8(code(self.reason));
111
112        if let Some(p) = &self.properties {
113            p.write(buffer)?;
114        } else {
115            write_remaining_length(buffer, 0)?;
116        }
117
118        Ok(1 + count + len)
119    }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct PubCompProperties {
124    pub reason_string: Option<MqttString>,
125    pub user_properties: Vec<(MqttString, MqttString)>,
126}
127
128impl PubCompProperties {
129    fn len(&self) -> usize {
130        let mut len = 0;
131
132        if let Some(reason) = &self.reason_string {
133            len += 1 + 2 + reason.len();
134        }
135
136        for (key, value) in &self.user_properties {
137            len += 1 + 2 + key.len() + 2 + value.len();
138        }
139
140        len
141    }
142
143    pub fn read(bytes: &mut Bytes) -> Result<Option<PubCompProperties>, Error> {
144        let mut reason_string = None;
145        let mut user_properties = Vec::new();
146
147        let (properties_len_len, properties_len) = length(bytes.iter())?;
148        bytes.advance(properties_len_len);
149        if properties_len == 0 {
150            return Ok(None);
151        }
152
153        let mut cursor = 0;
154        // read until cursor reaches property length. properties_len = 0 will skip this loop
155        while cursor < properties_len {
156            let prop = read_u8(bytes)?;
157            cursor += 1;
158
159            match property(prop)? {
160                PropertyType::ReasonString => {
161                    let reason = read_mqtt_string(bytes)?;
162                    cursor += 2 + reason.len();
163                    reason_string = Some(reason);
164                }
165                PropertyType::UserProperty => {
166                    let key = read_mqtt_string(bytes)?;
167                    let value = read_mqtt_string(bytes)?;
168                    cursor += 2 + key.len() + 2 + value.len();
169                    user_properties.push((key, value));
170                }
171                _ => return Err(Error::InvalidPropertyType(prop)),
172            }
173        }
174
175        Ok(Some(PubCompProperties {
176            reason_string,
177            user_properties,
178        }))
179    }
180
181    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
182        let len = self.len();
183        write_remaining_length(buffer, len)?;
184
185        if let Some(reason) = &self.reason_string {
186            buffer.put_u8(PropertyType::ReasonString as u8);
187            write_mqtt_string(buffer, reason)?;
188        }
189
190        for (key, value) in &self.user_properties {
191            buffer.put_u8(PropertyType::UserProperty as u8);
192            write_mqtt_string(buffer, key)?;
193            write_mqtt_string(buffer, value)?;
194        }
195
196        Ok(())
197    }
198}
199
200/// Connection return code type
201fn reason(num: u8) -> Result<PubCompReason, Error> {
202    let code = match num {
203        0 => PubCompReason::Success,
204        146 => PubCompReason::PacketIdentifierNotFound,
205        num => return Err(Error::InvalidConnectReturnCode(num)),
206    };
207
208    Ok(code)
209}
210
211fn code(reason: PubCompReason) -> u8 {
212    match reason {
213        PubCompReason::Success => 0,
214        PubCompReason::PacketIdentifierNotFound => 146,
215    }
216}
217
218#[cfg(test)]
219mod test {
220    use crate::test::read_write_packets;
221    use crate::Packet;
222
223    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
224    use super::*;
225    use bytes::BytesMut;
226    use pretty_assertions::assert_eq;
227
228    #[test]
229    fn length_calculation() {
230        let mut dummy_bytes = BytesMut::new();
231        // Use user_properties to pad the size to exceed ~128 bytes to make the
232        // remaining_length field in the packet be 2 bytes long.
233        let pubcomp_props = PubCompProperties {
234            reason_string: None,
235            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
236        };
237
238        let pubcomp_pkt = PubComp::new(1, Some(pubcomp_props));
239
240        let size_from_size = pubcomp_pkt.size();
241        let size_from_write = pubcomp_pkt.write(&mut dummy_bytes).unwrap();
242        let size_from_bytes = dummy_bytes.len();
243
244        assert_eq!(size_from_write, size_from_bytes);
245        assert_eq!(size_from_size, size_from_bytes);
246    }
247
248    #[test]
249    fn test_write_read() {
250        read_write_packets(write_read_provider());
251    }
252
253    fn write_read_provider() -> Vec<Packet> {
254        vec![
255            Packet::PubComp(PubComp {
256                pkid: 42,
257                reason: PubCompReason::Success,
258                properties: None,
259            }),
260            Packet::PubComp(PubComp {
261                pkid: 42,
262                reason: PubCompReason::Success,
263                properties: Some(PubCompProperties {
264                    reason_string: Some("hello".into()),
265                    user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
266                }),
267            }),
268        ]
269    }
270}