mqttbytes/v5/
suback.rs

1use super::*;
2use alloc::vec::Vec;
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use std::convert::{TryFrom, TryInto};
5
6/// Acknowledgement to subscribe
7#[derive(Debug, Clone, PartialEq)]
8pub struct SubAck {
9    pub pkid: u16,
10    pub return_codes: Vec<SubscribeReasonCode>,
11    pub properties: Option<SubAckProperties>,
12}
13
14impl SubAck {
15    pub fn new(pkid: u16, return_codes: Vec<SubscribeReasonCode>) -> SubAck {
16        SubAck {
17            pkid,
18            return_codes,
19            properties: None,
20        }
21    }
22
23    pub fn len(&self) -> usize {
24        let mut len = 2 + self.return_codes.len();
25
26        match &self.properties {
27            Some(properties) => {
28                let properties_len = properties.len();
29                let properties_len_len = len_len(properties_len);
30                len += properties_len_len + properties_len;
31            }
32            None => {
33                // just 1 byte representing 0 len
34                len += 1;
35            }
36        }
37
38        len
39    }
40
41    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
42        let variable_header_index = fixed_header.fixed_header_len;
43        bytes.advance(variable_header_index);
44
45        let pkid = read_u16(&mut bytes)?;
46        let properties = SubAckProperties::extract(&mut bytes)?;
47
48        if !bytes.has_remaining() {
49            return Err(Error::MalformedPacket);
50        }
51
52        let mut return_codes = Vec::new();
53        while bytes.has_remaining() {
54            let return_code = read_u8(&mut bytes)?;
55            return_codes.push(return_code.try_into()?);
56        }
57
58        let suback = SubAck {
59            pkid,
60            return_codes,
61            properties,
62        };
63
64        Ok(suback)
65    }
66
67    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
68        buffer.put_u8(0x90);
69        let remaining_len = self.len();
70        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
71
72        buffer.put_u16(self.pkid);
73
74        match &self.properties {
75            Some(properties) => properties.write(buffer)?,
76            None => {
77                write_remaining_length(buffer, 0)?;
78            }
79        };
80
81        let p: Vec<u8> = self.return_codes.iter().map(|code| *code as u8).collect();
82        buffer.extend_from_slice(&p);
83        Ok(1 + remaining_len_bytes + remaining_len)
84    }
85}
86
87#[derive(Debug, Clone, PartialEq)]
88pub struct SubAckProperties {
89    pub reason_string: Option<String>,
90    pub user_properties: Vec<(String, String)>,
91}
92
93impl SubAckProperties {
94    pub fn len(&self) -> usize {
95        let mut len = 0;
96
97        if let Some(reason) = &self.reason_string {
98            len += 1 + 2 + reason.len();
99        }
100
101        for (key, value) in self.user_properties.iter() {
102            len += 1 + 2 + key.len() + 2 + value.len();
103        }
104
105        len
106    }
107
108    pub fn extract(mut bytes: &mut Bytes) -> Result<Option<SubAckProperties>, Error> {
109        let mut reason_string = None;
110        let mut user_properties = Vec::new();
111
112        let (properties_len_len, properties_len) = length(bytes.iter())?;
113        bytes.advance(properties_len_len);
114        if properties_len == 0 {
115            return Ok(None);
116        }
117
118        let mut cursor = 0;
119        // read until cursor reaches property length. properties_len = 0 will skip this loop
120        while cursor < properties_len {
121            let prop = read_u8(&mut bytes)?;
122            cursor += 1;
123
124            match property(prop)? {
125                PropertyType::ReasonString => {
126                    let reason = read_mqtt_string(&mut bytes)?;
127                    cursor += 2 + reason.len();
128                    reason_string = Some(reason);
129                }
130                PropertyType::UserProperty => {
131                    let key = read_mqtt_string(&mut bytes)?;
132                    let value = read_mqtt_string(&mut bytes)?;
133                    cursor += 2 + key.len() + 2 + value.len();
134                    user_properties.push((key, value));
135                }
136                _ => return Err(Error::InvalidPropertyType(prop)),
137            }
138        }
139
140        Ok(Some(SubAckProperties {
141            reason_string,
142            user_properties,
143        }))
144    }
145
146    fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
147        let len = self.len();
148        write_remaining_length(buffer, len)?;
149
150        if let Some(reason) = &self.reason_string {
151            buffer.put_u8(PropertyType::ReasonString as u8);
152            write_mqtt_string(buffer, reason);
153        }
154
155        for (key, value) in self.user_properties.iter() {
156            buffer.put_u8(PropertyType::UserProperty as u8);
157            write_mqtt_string(buffer, key);
158            write_mqtt_string(buffer, value);
159        }
160
161        Ok(())
162    }
163}
164
165#[repr(u8)]
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SubscribeReasonCode {
168    QoS0 = 0,
169    QoS1 = 1,
170    QoS2 = 2,
171    Unspecified = 128,
172    ImplementationSpecific = 131,
173    NotAuthorized = 135,
174    TopicFilterInvalid = 143,
175    PkidInUse = 145,
176    QuotaExceeded = 151,
177    SharedSubscriptionsNotSupported = 158,
178    SubscriptionIdNotSupported = 161,
179    WildcardSubscriptionsNotSupported = 162,
180}
181
182impl TryFrom<u8> for SubscribeReasonCode {
183    type Error = crate::Error;
184
185    fn try_from(value: u8) -> Result<Self, Self::Error> {
186        let v = match value {
187            0 => SubscribeReasonCode::QoS0,
188            1 => SubscribeReasonCode::QoS1,
189            2 => SubscribeReasonCode::QoS2,
190            128 => SubscribeReasonCode::Unspecified,
191            131 => SubscribeReasonCode::ImplementationSpecific,
192            135 => SubscribeReasonCode::NotAuthorized,
193            143 => SubscribeReasonCode::TopicFilterInvalid,
194            145 => SubscribeReasonCode::PkidInUse,
195            151 => SubscribeReasonCode::QuotaExceeded,
196            158 => SubscribeReasonCode::SharedSubscriptionsNotSupported,
197            161 => SubscribeReasonCode::SubscriptionIdNotSupported,
198            162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported,
199            v => return Err(crate::Error::InvalidSubscribeReasonCode(v)),
200        };
201
202        Ok(v)
203    }
204}
205
206#[cfg(test)]
207mod test {
208    use super::*;
209    use alloc::vec;
210    use bytes::BytesMut;
211    use pretty_assertions::assert_eq;
212
213    fn sample() -> SubAck {
214        let properties = SubAckProperties {
215            reason_string: Some("test".to_owned()),
216            user_properties: vec![("test".to_owned(), "test".to_owned())],
217        };
218
219        SubAck {
220            pkid: 42,
221            return_codes: vec![
222                SubscribeReasonCode::QoS0,
223                SubscribeReasonCode::QoS1,
224                SubscribeReasonCode::QoS2,
225                SubscribeReasonCode::Unspecified,
226            ],
227            properties: Some(properties),
228        }
229    }
230
231    fn sample_bytes() -> Vec<u8> {
232        vec![
233            0x90, // packet type
234            0x1b, // remaining len
235            0x00, 0x2a, // pkid
236            0x14, // properties len
237            0x1f, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x26, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
238            0x00, 0x04, 0x74, 0x65, 0x73, 0x74, // user properties
239            0x00, 0x01, 0x02, 0x80, // return codes
240        ]
241    }
242
243    #[test]
244    fn suback_parsing_works() {
245        let mut stream = BytesMut::new();
246        let packetstream = &sample_bytes();
247
248        stream.extend_from_slice(&packetstream[..]);
249        let fixed_header = parse_fixed_header(stream.iter()).unwrap();
250        let suback_bytes = stream.split_to(fixed_header.frame_length()).freeze();
251        let suback = SubAck::read(fixed_header, suback_bytes).unwrap();
252        assert_eq!(suback, sample());
253    }
254
255    #[test]
256    fn suback_encoding_works() {
257        let publish = sample();
258        let mut buf = BytesMut::new();
259        publish.write(&mut buf).unwrap();
260
261        // println!("{:X?}", buf);
262        // println!("{:#04X?}", &buf[..]);
263        assert_eq!(&buf[..], sample_bytes());
264    }
265}