rumqttd/protocol/v5/
puback.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4fn len(puback: &PubAck, properties: &Option<PubAckProperties>) -> usize {
5    let mut len = 2 + 1; // pkid + reason
6
7    // If there are no properties, sending reason code is optional
8    if puback.reason == PubAckReason::Success && properties.is_none() {
9        return 2;
10    }
11
12    if let Some(p) = properties {
13        let properties_len = properties::len(p);
14        let properties_len_len = len_len(properties_len);
15        len += properties_len_len + properties_len;
16    } else {
17        // just 1 byte representing 0 len properties
18        len += 1;
19    }
20
21    len
22}
23
24pub fn read(
25    fixed_header: FixedHeader,
26    mut bytes: Bytes,
27) -> Result<(PubAck, Option<PubAckProperties>), Error> {
28    let variable_header_index = fixed_header.fixed_header_len;
29    bytes.advance(variable_header_index);
30    let pkid = read_u16(&mut bytes)?;
31
32    // No reason code or properties if remaining length == 2
33    if fixed_header.remaining_len == 2 {
34        return Ok((
35            PubAck {
36                pkid,
37                reason: PubAckReason::Success,
38            },
39            None,
40        ));
41    }
42
43    // No properties len or properties if remaining len > 2 but < 4
44    let ack_reason = read_u8(&mut bytes)?;
45    if fixed_header.remaining_len < 4 {
46        return Ok((
47            PubAck {
48                pkid,
49                reason: reason(ack_reason)?,
50            },
51            None,
52        ));
53    }
54
55    let puback = PubAck {
56        pkid,
57        reason: reason(ack_reason)?,
58    };
59
60    let properties = properties::read(&mut bytes)?;
61    Ok((puback, properties))
62}
63
64pub fn write(
65    puback: &PubAck,
66    properties: &Option<PubAckProperties>,
67    buffer: &mut BytesMut,
68) -> Result<usize, Error> {
69    let len = len(puback, properties);
70    buffer.put_u8(0x40);
71
72    let count = write_remaining_length(buffer, len)?;
73    buffer.put_u16(puback.pkid);
74
75    // Reason code is optional with success if there are no properties
76    if puback.reason == PubAckReason::Success && properties.is_none() {
77        return Ok(4);
78    }
79
80    buffer.put_u8(code(puback.reason));
81    if let Some(p) = properties {
82        properties::write(p, buffer)?;
83    } else {
84        write_remaining_length(buffer, 0)?;
85    }
86
87    Ok(1 + count + len)
88}
89
90mod properties {
91    use super::*;
92
93    pub fn len(properties: &PubAckProperties) -> usize {
94        let mut len = 0;
95
96        if let Some(reason) = &properties.reason_string {
97            len += 1 + 2 + reason.len();
98        }
99
100        for (key, value) in properties.user_properties.iter() {
101            len += 1 + 2 + key.len() + 2 + value.len();
102        }
103
104        len
105    }
106
107    pub fn read(mut bytes: &mut Bytes) -> Result<Option<PubAckProperties>, Error> {
108        let mut reason_string = None;
109        let mut user_properties = Vec::new();
110
111        let (properties_len_len, properties_len) = length(bytes.iter())?;
112        bytes.advance(properties_len_len);
113        if properties_len == 0 {
114            return Ok(None);
115        }
116
117        let mut cursor = 0;
118        // read until cursor reaches property length. properties_len = 0 will skip this loop
119        while cursor < properties_len {
120            let prop = read_u8(bytes)?;
121            cursor += 1;
122
123            match property(prop)? {
124                PropertyType::ReasonString => {
125                    let reason = read_mqtt_string(bytes)?;
126                    cursor += 2 + reason.len();
127                    reason_string = Some(reason);
128                }
129                PropertyType::UserProperty => {
130                    let key = read_mqtt_string(bytes)?;
131                    let value = read_mqtt_string(bytes)?;
132                    cursor += 2 + key.len() + 2 + value.len();
133                    user_properties.push((key, value));
134                }
135                _ => return Err(Error::InvalidPropertyType(prop)),
136            }
137        }
138
139        Ok(Some(PubAckProperties {
140            reason_string,
141            user_properties,
142        }))
143    }
144
145    pub fn write(properties: &PubAckProperties, buffer: &mut BytesMut) -> Result<(), Error> {
146        let len = len(properties);
147        write_remaining_length(buffer, len)?;
148
149        if let Some(reason) = &properties.reason_string {
150            buffer.put_u8(PropertyType::ReasonString as u8);
151            write_mqtt_string(buffer, reason);
152        }
153
154        for (key, value) in properties.user_properties.iter() {
155            buffer.put_u8(PropertyType::UserProperty as u8);
156            write_mqtt_string(buffer, key);
157            write_mqtt_string(buffer, value);
158        }
159
160        Ok(())
161    }
162}
163
164/// Connection return code type
165fn reason(num: u8) -> Result<PubAckReason, Error> {
166    let code = match num {
167        0 => PubAckReason::Success,
168        16 => PubAckReason::NoMatchingSubscribers,
169        128 => PubAckReason::UnspecifiedError,
170        131 => PubAckReason::ImplementationSpecificError,
171        135 => PubAckReason::NotAuthorized,
172        144 => PubAckReason::TopicNameInvalid,
173        145 => PubAckReason::PacketIdentifierInUse,
174        151 => PubAckReason::QuotaExceeded,
175        153 => PubAckReason::PayloadFormatInvalid,
176        num => return Err(Error::InvalidConnectReturnCode(num)),
177    };
178
179    Ok(code)
180}
181
182// TODO: Is typecasting significantly faster than functions?
183fn code(reason: PubAckReason) -> u8 {
184    match reason {
185        PubAckReason::Success => 0,
186        PubAckReason::NoMatchingSubscribers => 16,
187        PubAckReason::UnspecifiedError => 128,
188        PubAckReason::ImplementationSpecificError => 131,
189        PubAckReason::NotAuthorized => 135,
190        PubAckReason::TopicNameInvalid => 144,
191        PubAckReason::PacketIdentifierInUse => 145,
192        PubAckReason::QuotaExceeded => 151,
193        PubAckReason::PayloadFormatInvalid => 153,
194    }
195}