mqtt_bytes_v5/
pubrec.rs

1use crate::MqttString;
2
3use super::{
4    len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5    write_remaining_length, Debug, Error, FixedHeader, PropertyType,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Return code in `PubRec`
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11#[repr(u8)]
12pub enum PubRecReason {
13    Success,
14    NoMatchingSubscribers,
15    UnspecifiedError,
16    ImplementationSpecificError,
17    NotAuthorized,
18    TopicNameInvalid,
19    PacketIdentifierInUse,
20    QuotaExceeded,
21    PayloadFormatInvalid,
22}
23
24/// Acknowledgement to `QoS1` publish
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct PubRec {
27    pub pkid: u16,
28    pub reason: PubRecReason,
29    pub properties: Option<PubRecProperties>,
30}
31
32impl PubRec {
33    #[must_use]
34    pub fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
35        Self {
36            pkid,
37            reason: PubRecReason::Success,
38            properties,
39        }
40    }
41
42    #[must_use]
43    pub fn size(&self) -> usize {
44        let len = self.len();
45        let remaining_len_size = len_len(len);
46
47        1 + remaining_len_size + len
48    }
49
50    fn len(&self) -> usize {
51        let mut len = 2 + 1; // pkid + reason
52
53        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
54        // and there are no Properties. In this case the PUBREC has a Remaining Length of 2.
55        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901134>
56        if self.reason == PubRecReason::Success && self.properties.is_none() {
57            return 2;
58        }
59
60        if let Some(p) = &self.properties {
61            let properties_len = p.len();
62            let properties_len_len = len_len(properties_len);
63            len += properties_len_len + properties_len;
64        } else {
65            len += 1;
66        }
67
68        len
69    }
70
71    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubRec, Error> {
72        let variable_header_index = fixed_header.fixed_header_len;
73        bytes.advance(variable_header_index);
74        let pkid = read_u16(&mut bytes)?;
75        if fixed_header.remaining_len == 2 {
76            return Ok(PubRec {
77                pkid,
78                reason: PubRecReason::Success,
79                properties: None,
80            });
81        }
82
83        let ack_reason = read_u8(&mut bytes)?;
84        if fixed_header.remaining_len < 4 {
85            return Ok(PubRec {
86                pkid,
87                reason: reason(ack_reason)?,
88                properties: None,
89            });
90        }
91
92        let properties = PubRecProperties::read(&mut bytes)?;
93        let puback = PubRec {
94            pkid,
95            reason: reason(ack_reason)?,
96            properties,
97        };
98        Ok(puback)
99    }
100
101    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
102        let len = self.len();
103        buffer.put_u8(0x50);
104        let count = write_remaining_length(buffer, len)?;
105        buffer.put_u16(self.pkid);
106
107        // If there are no properties during success, sending reason code is optional
108        if self.reason == PubRecReason::Success && self.properties.is_none() {
109            return Ok(4);
110        }
111
112        buffer.put_u8(code(self.reason));
113
114        if let Some(p) = &self.properties {
115            p.write(buffer)?;
116        } else {
117            write_remaining_length(buffer, 0)?;
118        }
119
120        Ok(1 + count + len)
121    }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct PubRecProperties {
126    pub reason_string: Option<MqttString>,
127    pub user_properties: Vec<(MqttString, MqttString)>,
128}
129
130impl PubRecProperties {
131    fn len(&self) -> usize {
132        let mut len = 0;
133
134        if let Some(reason) = &self.reason_string {
135            len += 1 + 2 + reason.len();
136        }
137
138        for (key, value) in &self.user_properties {
139            len += 1 + 2 + key.len() + 2 + value.len();
140        }
141
142        len
143    }
144
145    pub fn read(bytes: &mut Bytes) -> Result<Option<PubRecProperties>, Error> {
146        let mut reason_string = None;
147        let mut user_properties = Vec::new();
148
149        let (properties_len_len, properties_len) = length(bytes.iter())?;
150        bytes.advance(properties_len_len);
151        if properties_len == 0 {
152            return Ok(None);
153        }
154
155        let mut cursor = 0;
156        // read until cursor reaches property length. properties_len = 0 will skip this loop
157        while cursor < properties_len {
158            let prop = read_u8(bytes)?;
159            cursor += 1;
160
161            match property(prop)? {
162                PropertyType::ReasonString => {
163                    let reason = read_mqtt_string(bytes)?;
164                    cursor += 2 + reason.len();
165                    reason_string = Some(reason);
166                }
167                PropertyType::UserProperty => {
168                    let key = read_mqtt_string(bytes)?;
169                    let value = read_mqtt_string(bytes)?;
170                    cursor += 2 + key.len() + 2 + value.len();
171                    user_properties.push((key, value));
172                }
173                _ => return Err(Error::InvalidPropertyType(prop)),
174            }
175        }
176
177        Ok(Some(PubRecProperties {
178            reason_string,
179            user_properties,
180        }))
181    }
182
183    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
184        let len = self.len();
185        write_remaining_length(buffer, len)?;
186
187        if let Some(reason) = &self.reason_string {
188            buffer.put_u8(PropertyType::ReasonString as u8);
189            write_mqtt_string(buffer, reason)?;
190        }
191
192        for (key, value) in &self.user_properties {
193            buffer.put_u8(PropertyType::UserProperty as u8);
194            write_mqtt_string(buffer, key)?;
195            write_mqtt_string(buffer, value)?;
196        }
197
198        Ok(())
199    }
200}
201
202/// Connection return code type
203fn reason(num: u8) -> Result<PubRecReason, Error> {
204    let code = match num {
205        0 => PubRecReason::Success,
206        16 => PubRecReason::NoMatchingSubscribers,
207        128 => PubRecReason::UnspecifiedError,
208        131 => PubRecReason::ImplementationSpecificError,
209        135 => PubRecReason::NotAuthorized,
210        144 => PubRecReason::TopicNameInvalid,
211        145 => PubRecReason::PacketIdentifierInUse,
212        151 => PubRecReason::QuotaExceeded,
213        153 => PubRecReason::PayloadFormatInvalid,
214        num => return Err(Error::InvalidConnectReturnCode(num)),
215    };
216
217    Ok(code)
218}
219
220fn code(reason: PubRecReason) -> u8 {
221    match reason {
222        PubRecReason::Success => 0,
223        PubRecReason::NoMatchingSubscribers => 16,
224        PubRecReason::UnspecifiedError => 128,
225        PubRecReason::ImplementationSpecificError => 131,
226        PubRecReason::NotAuthorized => 135,
227        PubRecReason::TopicNameInvalid => 144,
228        PubRecReason::PacketIdentifierInUse => 145,
229        PubRecReason::QuotaExceeded => 151,
230        PubRecReason::PayloadFormatInvalid => 153,
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use crate::test::read_write_packets;
237    use crate::Packet;
238
239    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
240    use super::*;
241    use bytes::BytesMut;
242    use pretty_assertions::assert_eq;
243
244    #[test]
245    fn length_calculation() {
246        let mut dummy_bytes = BytesMut::new();
247        // Use user_properties to pad the size to exceed ~128 bytes to make the
248        // remaining_length field in the packet be 2 bytes long.
249        let pubrec_props = PubRecProperties {
250            reason_string: None,
251            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
252        };
253
254        let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
255
256        let size_from_size = pubrec_pkt.size();
257        let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
258        let size_from_bytes = dummy_bytes.len();
259
260        assert_eq!(size_from_write, size_from_bytes);
261        assert_eq!(size_from_size, size_from_bytes);
262    }
263
264    #[test]
265    fn test_write_read() {
266        read_write_packets(write_read_provider());
267    }
268
269    fn write_read_provider() -> Vec<Packet> {
270        vec![
271            Packet::PubRec(PubRec {
272                pkid: 42,
273                reason: PubRecReason::Success,
274                properties: None,
275            }),
276            Packet::PubRec(PubRec {
277                pkid: 42,
278                reason: PubRecReason::Success,
279                properties: Some(PubRecProperties {
280                    reason_string: None,
281                    user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
282                }),
283            }),
284        ]
285    }
286}