Skip to main content

rumqttc/mqttbytes/v5/
suback.rs

1use super::{
2    Error, FixedHeader, PropertyType, QoS, len_len, length, property, read_mqtt_string, read_u8,
3    read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7/// Acknowledgement to subscribe
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct SubAck {
10    pub pkid: u16,
11    pub return_codes: Vec<SubscribeReasonCode>,
12    pub properties: Option<SubAckProperties>,
13}
14
15impl SubAck {
16    fn len(&self) -> usize {
17        let mut len = 2 + self.return_codes.len();
18
19        if let Some(p) = &self.properties {
20            let properties_len = p.len();
21            let properties_len_len = len_len(properties_len);
22            len += properties_len_len + properties_len;
23        } else {
24            // just 1 byte representing 0 len
25            len += 1;
26        }
27
28        len
29    }
30
31    #[must_use]
32    pub fn size(&self) -> usize {
33        let len = self.len();
34        let remaining_len_size = len_len(len);
35
36        1 + remaining_len_size + len
37    }
38
39    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
40        let variable_header_index = fixed_header.header_len;
41        bytes.advance(variable_header_index);
42
43        let pkid = read_u16(&mut bytes)?;
44        let properties = SubAckProperties::read(&mut bytes)?;
45
46        if !bytes.has_remaining() {
47            return Err(Error::MalformedPacket);
48        }
49
50        let mut return_codes = Vec::new();
51        while bytes.has_remaining() {
52            let return_code = read_u8(&mut bytes)?;
53            return_codes.push(reason(return_code)?);
54        }
55
56        let suback = Self {
57            pkid,
58            return_codes,
59            properties,
60        };
61
62        Ok(suback)
63    }
64
65    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
66        buffer.put_u8(0x90);
67        let remaining_len = self.len();
68        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
69
70        buffer.put_u16(self.pkid);
71
72        if let Some(p) = &self.properties {
73            p.write(buffer)?;
74        } else {
75            write_remaining_length(buffer, 0)?;
76        }
77
78        for &return_code in &self.return_codes {
79            buffer.put_u8(code(return_code));
80        }
81        Ok(1 + remaining_len_bytes + remaining_len)
82    }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum SubscribeReasonCode {
87    Success(QoS),
88    Failure,
89    Unspecified,
90    ImplementationSpecific,
91    NotAuthorized,
92    TopicFilterInvalid,
93    PkidInUse,
94    QuotaExceeded,
95    SharedSubscriptionsNotSupported,
96    SubscriptionIdNotSupported,
97    WildcardSubscriptionsNotSupported,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct SubAckProperties {
102    pub reason_string: Option<String>,
103    pub user_properties: Vec<(String, String)>,
104}
105
106impl SubAckProperties {
107    fn len(&self) -> usize {
108        let mut len = 0;
109
110        if let Some(reason) = &self.reason_string {
111            len += 1 + 2 + reason.len();
112        }
113
114        for (key, value) in &self.user_properties {
115            len += 1 + 2 + key.len() + 2 + value.len();
116        }
117
118        len
119    }
120
121    pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
122        let mut reason_string = None;
123        let mut user_properties = Vec::new();
124
125        let (properties_len_len, properties_len) = length(bytes.iter())?;
126        bytes.advance(properties_len_len);
127        if properties_len == 0 {
128            return Ok(None);
129        }
130
131        let mut cursor = 0;
132        // read until cursor reaches property length. properties_len = 0 will skip this loop
133        while cursor < properties_len {
134            let prop = read_u8(bytes)?;
135            cursor += 1;
136
137            match property(prop)? {
138                PropertyType::ReasonString => {
139                    let reason = read_mqtt_string(bytes)?;
140                    cursor += 2 + reason.len();
141                    reason_string = Some(reason);
142                }
143                PropertyType::UserProperty => {
144                    let key = read_mqtt_string(bytes)?;
145                    let value = read_mqtt_string(bytes)?;
146                    cursor += 2 + key.len() + 2 + value.len();
147                    user_properties.push((key, value));
148                }
149                _ => return Err(Error::InvalidPropertyType(prop)),
150            }
151        }
152
153        Ok(Some(Self {
154            reason_string,
155            user_properties,
156        }))
157    }
158
159    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
160        let len = self.len();
161        write_remaining_length(buffer, len)?;
162
163        if let Some(reason) = &self.reason_string {
164            buffer.put_u8(PropertyType::ReasonString as u8);
165            write_mqtt_string(buffer, reason);
166        }
167
168        for (key, value) in &self.user_properties {
169            buffer.put_u8(PropertyType::UserProperty as u8);
170            write_mqtt_string(buffer, key);
171            write_mqtt_string(buffer, value);
172        }
173
174        Ok(())
175    }
176}
177
178fn reason(code: u8) -> Result<SubscribeReasonCode, Error> {
179    code.try_into()
180}
181
182const fn code(value: SubscribeReasonCode) -> u8 {
183    match value {
184        SubscribeReasonCode::Success(qos) => qos as u8,
185        SubscribeReasonCode::Failure | SubscribeReasonCode::Unspecified => 0x80,
186        SubscribeReasonCode::ImplementationSpecific => 131,
187        SubscribeReasonCode::NotAuthorized => 135,
188        SubscribeReasonCode::TopicFilterInvalid => 143,
189        SubscribeReasonCode::PkidInUse => 145,
190        SubscribeReasonCode::QuotaExceeded => 151,
191        SubscribeReasonCode::SharedSubscriptionsNotSupported => 158,
192        SubscribeReasonCode::SubscriptionIdNotSupported => 161,
193        SubscribeReasonCode::WildcardSubscriptionsNotSupported => 162,
194    }
195}
196
197impl TryFrom<u8> for SubscribeReasonCode {
198    type Error = Error;
199
200    fn try_from(value: u8) -> Result<Self, Self::Error> {
201        match value {
202            0 => Ok(Self::Success(QoS::AtMostOnce)),
203            1 => Ok(Self::Success(QoS::AtLeastOnce)),
204            2 => Ok(Self::Success(QoS::ExactlyOnce)),
205            128 => Ok(Self::Unspecified),
206            131 => Ok(Self::ImplementationSpecific),
207            135 => Ok(Self::NotAuthorized),
208            143 => Ok(Self::TopicFilterInvalid),
209            145 => Ok(Self::PkidInUse),
210            151 => Ok(Self::QuotaExceeded),
211            158 => Ok(Self::SharedSubscriptionsNotSupported),
212            161 => Ok(Self::SubscriptionIdNotSupported),
213            162 => Ok(Self::WildcardSubscriptionsNotSupported),
214            _ => Err(Error::InvalidSubscribeReasonCode(value)),
215        }
216    }
217}
218
219#[cfg(test)]
220mod test {
221    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
222    use super::*;
223    use bytes::{Bytes, BytesMut};
224    use pretty_assertions::assert_eq;
225
226    #[test]
227    fn length_calculation() {
228        let mut dummy_bytes = BytesMut::new();
229        // Use user_properties to pad the size to exceed ~128 bytes to make the
230        // remaining_length field in the packet be 2 bytes long.
231        let suback_props = SubAckProperties {
232            reason_string: None,
233            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
234        };
235
236        let suback_pkt = SubAck {
237            pkid: 1,
238            return_codes: vec![SubscribeReasonCode::Success(QoS::ExactlyOnce)],
239            properties: Some(suback_props),
240        };
241
242        let size_from_size = suback_pkt.size();
243        let size_from_write = suback_pkt.write(&mut dummy_bytes).unwrap();
244        let size_from_bytes = dummy_bytes.len();
245
246        assert_eq!(size_from_write, size_from_bytes);
247        assert_eq!(size_from_size, size_from_bytes);
248    }
249
250    #[test]
251    fn reason_and_code_round_trip() {
252        let values = [
253            (0, SubscribeReasonCode::Success(QoS::AtMostOnce)),
254            (1, SubscribeReasonCode::Success(QoS::AtLeastOnce)),
255            (2, SubscribeReasonCode::Success(QoS::ExactlyOnce)),
256            (128, SubscribeReasonCode::Unspecified),
257            (131, SubscribeReasonCode::ImplementationSpecific),
258            (135, SubscribeReasonCode::NotAuthorized),
259            (143, SubscribeReasonCode::TopicFilterInvalid),
260            (145, SubscribeReasonCode::PkidInUse),
261            (151, SubscribeReasonCode::QuotaExceeded),
262            (158, SubscribeReasonCode::SharedSubscriptionsNotSupported),
263            (161, SubscribeReasonCode::SubscriptionIdNotSupported),
264            (162, SubscribeReasonCode::WildcardSubscriptionsNotSupported),
265        ];
266
267        for (raw, parsed) in values {
268            assert_eq!(reason(raw).unwrap(), parsed);
269            assert_eq!(code(parsed), raw);
270        }
271    }
272
273    #[test]
274    fn reason_invalid_code_errors() {
275        assert!(matches!(
276            reason(42),
277            Err(Error::InvalidSubscribeReasonCode(42))
278        ));
279    }
280
281    #[test]
282    fn failure_encodes_like_unspecified() {
283        assert_eq!(code(SubscribeReasonCode::Failure), 0x80);
284        assert_eq!(code(SubscribeReasonCode::Unspecified), 0x80);
285    }
286
287    #[test]
288    fn write_multiple_reasons_bytes() {
289        let mut buffer = BytesMut::new();
290        let suback = SubAck {
291            pkid: 10,
292            return_codes: vec![
293                SubscribeReasonCode::Success(QoS::AtMostOnce),
294                SubscribeReasonCode::ImplementationSpecific,
295                SubscribeReasonCode::WildcardSubscriptionsNotSupported,
296            ],
297            properties: None,
298        };
299
300        suback.write(&mut buffer).unwrap();
301
302        let expected = [
303            0x90, // Packet type
304            0x06, // Remaining length
305            0x00, 0x0A, // pkid
306            0x00, // properties length
307            0x00, // Success QoS0
308            0x83, // ImplementationSpecific
309            0xA2, // WildcardSubscriptionsNotSupported
310        ];
311        assert_eq!(&buffer[..], &expected);
312    }
313
314    #[test]
315    fn read_errors_on_invalid_first_reason_code() {
316        let packet = Bytes::from_static(&[
317            0x90, 0x0C, // packet type + remaining length
318            0x00, 0x0A, // pkid
319            0x00, // properties length
320            0xFF, // invalid first reason code
321            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // trailing reason bytes
322        ]);
323
324        let fixed_header = FixedHeader::new(0x90, 1, 0x0C);
325        assert!(matches!(
326            SubAck::read(fixed_header, packet),
327            Err(Error::InvalidSubscribeReasonCode(0xFF))
328        ));
329    }
330}