Skip to main content

rumqttc/mqttbytes/v5/
pubrec.rs

1use super::{
2    Error, FixedHeader, PropertyType, len_len, length, property, read_mqtt_string, read_u8,
3    read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7/// Return code in `PubRec`
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9#[repr(u8)]
10pub enum PubRecReason {
11    Success = 0,
12    NoMatchingSubscribers = 16,
13    UnspecifiedError = 128,
14    ImplementationSpecificError = 131,
15    NotAuthorized = 135,
16    TopicNameInvalid = 144,
17    PacketIdentifierInUse = 145,
18    QuotaExceeded = 151,
19    PayloadFormatInvalid = 153,
20}
21
22/// Acknowledgement to `QoS1` publish
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PubRec {
25    pub pkid: u16,
26    pub reason: PubRecReason,
27    pub properties: Option<PubRecProperties>,
28}
29
30impl PubRec {
31    #[must_use]
32    pub const fn new(pkid: u16, properties: Option<PubRecProperties>) -> Self {
33        Self {
34            pkid,
35            reason: PubRecReason::Success,
36            properties,
37        }
38    }
39
40    #[must_use]
41    pub fn size(&self) -> usize {
42        let len = self.len();
43        let remaining_len_size = len_len(len);
44
45        1 + remaining_len_size + len
46    }
47
48    fn len(&self) -> usize {
49        let mut len = 2 + 1; // pkid + reason
50
51        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
52        // and there are no Properties. In this case the PUBREC has a Remaining Length of 2.
53        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901134>
54        if self.reason == PubRecReason::Success && self.properties.is_none() {
55            return 2;
56        }
57
58        if let Some(p) = &self.properties {
59            let properties_len = p.len();
60            let properties_len_len = len_len(properties_len);
61            len += properties_len_len + properties_len;
62        } else {
63            len += 1;
64        }
65
66        len
67    }
68
69    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
70        let variable_header_index = fixed_header.header_len;
71        bytes.advance(variable_header_index);
72        let pkid = read_u16(&mut bytes)?;
73        if fixed_header.remaining_len == 2 {
74            return Ok(Self {
75                pkid,
76                reason: PubRecReason::Success,
77                properties: None,
78            });
79        }
80
81        let ack_reason = read_u8(&mut bytes)?;
82        if fixed_header.remaining_len < 4 {
83            return Ok(Self {
84                pkid,
85                reason: reason(ack_reason)?,
86                properties: None,
87            });
88        }
89
90        let properties = PubRecProperties::read(&mut bytes)?;
91        let puback = Self {
92            pkid,
93            reason: reason(ack_reason)?,
94            properties,
95        };
96        Ok(puback)
97    }
98
99    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
100        let len = self.len();
101        buffer.put_u8(0x50);
102        let count = write_remaining_length(buffer, len)?;
103        buffer.put_u16(self.pkid);
104
105        // If there are no properties during success, sending reason code is optional
106        if self.reason == PubRecReason::Success && self.properties.is_none() {
107            return Ok(4);
108        }
109
110        buffer.put_u8(code(self.reason));
111
112        if let Some(p) = &self.properties {
113            p.write(buffer)?;
114        } else {
115            write_remaining_length(buffer, 0)?;
116        }
117
118        Ok(1 + count + len)
119    }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct PubRecProperties {
124    pub reason_string: Option<String>,
125    pub user_properties: Vec<(String, String)>,
126}
127
128impl PubRecProperties {
129    fn len(&self) -> usize {
130        let mut len = 0;
131
132        if let Some(reason) = &self.reason_string {
133            len += 1 + 2 + reason.len();
134        }
135
136        for (key, value) in &self.user_properties {
137            len += 1 + 2 + key.len() + 2 + value.len();
138        }
139
140        len
141    }
142
143    pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
144        let mut reason_string = None;
145        let mut user_properties = Vec::new();
146
147        let (properties_len_len, properties_len) = length(bytes.iter())?;
148        bytes.advance(properties_len_len);
149        if properties_len == 0 {
150            return Ok(None);
151        }
152
153        let mut cursor = 0;
154        // read until cursor reaches property length. properties_len = 0 will skip this loop
155        while cursor < properties_len {
156            let prop = read_u8(bytes)?;
157            cursor += 1;
158
159            match property(prop)? {
160                PropertyType::ReasonString => {
161                    let reason = read_mqtt_string(bytes)?;
162                    cursor += 2 + reason.len();
163                    reason_string = Some(reason);
164                }
165                PropertyType::UserProperty => {
166                    let key = read_mqtt_string(bytes)?;
167                    let value = read_mqtt_string(bytes)?;
168                    cursor += 2 + key.len() + 2 + value.len();
169                    user_properties.push((key, value));
170                }
171                _ => return Err(Error::InvalidPropertyType(prop)),
172            }
173        }
174
175        Ok(Some(Self {
176            reason_string,
177            user_properties,
178        }))
179    }
180
181    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
182        let len = self.len();
183        write_remaining_length(buffer, len)?;
184
185        if let Some(reason) = &self.reason_string {
186            buffer.put_u8(PropertyType::ReasonString as u8);
187            write_mqtt_string(buffer, reason);
188        }
189
190        for (key, value) in &self.user_properties {
191            buffer.put_u8(PropertyType::UserProperty as u8);
192            write_mqtt_string(buffer, key);
193            write_mqtt_string(buffer, value);
194        }
195
196        Ok(())
197    }
198}
199
200/// Connection return code type
201fn reason(num: u8) -> Result<PubRecReason, Error> {
202    num.try_into()
203}
204
205const fn code(reason: PubRecReason) -> u8 {
206    reason as u8
207}
208
209impl TryFrom<u8> for PubRecReason {
210    type Error = Error;
211
212    fn try_from(value: u8) -> Result<Self, Self::Error> {
213        match value {
214            0 => Ok(Self::Success),
215            16 => Ok(Self::NoMatchingSubscribers),
216            128 => Ok(Self::UnspecifiedError),
217            131 => Ok(Self::ImplementationSpecificError),
218            135 => Ok(Self::NotAuthorized),
219            144 => Ok(Self::TopicNameInvalid),
220            145 => Ok(Self::PacketIdentifierInUse),
221            151 => Ok(Self::QuotaExceeded),
222            153 => Ok(Self::PayloadFormatInvalid),
223            _ => Err(Error::InvalidConnectReturnCode(value)),
224        }
225    }
226}
227
228#[cfg(test)]
229mod test {
230    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
231    use super::*;
232    use bytes::BytesMut;
233    use pretty_assertions::assert_eq;
234
235    #[test]
236    fn length_calculation() {
237        let mut dummy_bytes = BytesMut::new();
238        // Use user_properties to pad the size to exceed ~128 bytes to make the
239        // remaining_length field in the packet be 2 bytes long.
240        let pubrec_props = PubRecProperties {
241            reason_string: None,
242            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
243        };
244
245        let pubrec_pkt = PubRec::new(1, Some(pubrec_props));
246
247        let size_from_size = pubrec_pkt.size();
248        let size_from_write = pubrec_pkt.write(&mut dummy_bytes).unwrap();
249        let size_from_bytes = dummy_bytes.len();
250
251        assert_eq!(size_from_write, size_from_bytes);
252        assert_eq!(size_from_size, size_from_bytes);
253    }
254
255    #[test]
256    fn reason_code_cast_matches_spec() {
257        assert_eq!(PubRecReason::Success as u8, 0);
258        assert_eq!(PubRecReason::NoMatchingSubscribers as u8, 16);
259        assert_eq!(PubRecReason::UnspecifiedError as u8, 128);
260        assert_eq!(PubRecReason::ImplementationSpecificError as u8, 131);
261        assert_eq!(PubRecReason::NotAuthorized as u8, 135);
262        assert_eq!(PubRecReason::TopicNameInvalid as u8, 144);
263        assert_eq!(PubRecReason::PacketIdentifierInUse as u8, 145);
264        assert_eq!(PubRecReason::QuotaExceeded as u8, 151);
265        assert_eq!(PubRecReason::PayloadFormatInvalid as u8, 153);
266    }
267
268    #[test]
269    fn reason_and_code_round_trip() {
270        let values = [
271            (0, PubRecReason::Success),
272            (16, PubRecReason::NoMatchingSubscribers),
273            (128, PubRecReason::UnspecifiedError),
274            (131, PubRecReason::ImplementationSpecificError),
275            (135, PubRecReason::NotAuthorized),
276            (144, PubRecReason::TopicNameInvalid),
277            (145, PubRecReason::PacketIdentifierInUse),
278            (151, PubRecReason::QuotaExceeded),
279            (153, PubRecReason::PayloadFormatInvalid),
280        ];
281
282        for (raw, parsed) in values {
283            assert_eq!(reason(raw).unwrap(), parsed);
284            assert_eq!(code(parsed), raw);
285        }
286    }
287
288    #[test]
289    fn reason_invalid_code_errors() {
290        assert!(matches!(
291            reason(42),
292            Err(Error::InvalidConnectReturnCode(42))
293        ));
294    }
295}