mqtt_bytes_v5/
unsuback.rs

1use crate::MqttString;
2
3use super::{
4    len_len, length, property, read_mqtt_string, read_u16, read_u8, write_mqtt_string,
5    write_remaining_length, Debug, Error, FixedHeader, PropertyType,
6};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Acknowledgement to unsubscribe
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct UnsubAck {
12    pub pkid: u16,
13    pub reasons: Vec<UnsubAckReason>,
14    pub properties: Option<UnsubAckProperties>,
15}
16
17impl UnsubAck {
18    fn len(&self) -> usize {
19        let mut len = 2 + self.reasons.len();
20
21        if let Some(p) = &self.properties {
22            let properties_len = p.len();
23            let properties_len_len = len_len(properties_len);
24            len += properties_len_len + properties_len;
25        } else {
26            // just 1 byte representing 0 len
27            len += 1;
28        }
29
30        len
31    }
32
33    #[must_use]
34    pub fn size(&self) -> usize {
35        let len = self.len();
36        let remaining_len_size = len_len(len);
37
38        1 + remaining_len_size + len
39    }
40
41    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<UnsubAck, Error> {
42        let variable_header_index = fixed_header.fixed_header_len;
43        bytes.advance(variable_header_index);
44
45        let pkid = read_u16(&mut bytes)?;
46        let properties = UnsubAckProperties::read(&mut bytes)?;
47
48        if !bytes.has_remaining() {
49            return Err(Error::MalformedPacket);
50        }
51
52        let mut reasons = Vec::new();
53        while bytes.has_remaining() {
54            let r = read_u8(&mut bytes)?;
55            reasons.push(reason(r)?);
56        }
57
58        let unsuback = UnsubAck {
59            pkid,
60            reasons,
61            properties,
62        };
63
64        Ok(unsuback)
65    }
66
67    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
68        buffer.put_u8(0xB0);
69        let remaining_len = self.len();
70        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
71
72        buffer.put_u16(self.pkid);
73
74        if let Some(p) = &self.properties {
75            p.write(buffer)?;
76        } else {
77            write_remaining_length(buffer, 0)?;
78        }
79
80        let p: Vec<u8> = self.reasons.iter().map(|&c| code(c)).collect();
81        buffer.extend_from_slice(&p);
82        Ok(1 + remaining_len_bytes + remaining_len)
83    }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87#[repr(u8)]
88pub enum UnsubAckReason {
89    Success,
90    NoSubscriptionExisted,
91    UnspecifiedError,
92    ImplementationSpecificError,
93    NotAuthorized,
94    TopicFilterInvalid,
95    PacketIdentifierInUse,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct UnsubAckProperties {
100    pub reason_string: Option<MqttString>,
101    pub user_properties: Vec<(MqttString, MqttString)>,
102}
103
104impl UnsubAckProperties {
105    fn len(&self) -> usize {
106        let mut len = 0;
107
108        if let Some(reason) = &self.reason_string {
109            len += 1 + 2 + reason.len();
110        }
111
112        for (key, value) in &self.user_properties {
113            len += 1 + 2 + key.len() + 2 + value.len();
114        }
115
116        len
117    }
118
119    pub fn read(bytes: &mut Bytes) -> Result<Option<UnsubAckProperties>, Error> {
120        let mut reason_string = None;
121        let mut user_properties = Vec::new();
122
123        let (properties_len_len, properties_len) = length(bytes.iter())?;
124        bytes.advance(properties_len_len);
125        if properties_len == 0 {
126            return Ok(None);
127        }
128
129        let mut cursor = 0;
130        // read until cursor reaches property length. properties_len = 0 will skip this loop
131        while cursor < properties_len {
132            let prop = read_u8(bytes)?;
133            cursor += 1;
134
135            match property(prop)? {
136                PropertyType::ReasonString => {
137                    let reason = read_mqtt_string(bytes)?;
138                    cursor += 2 + reason.len();
139                    reason_string = Some(reason);
140                }
141                PropertyType::UserProperty => {
142                    let key = read_mqtt_string(bytes)?;
143                    let value = read_mqtt_string(bytes)?;
144                    cursor += 2 + key.len() + 2 + value.len();
145                    user_properties.push((key, value));
146                }
147                _ => return Err(Error::InvalidPropertyType(prop)),
148            }
149        }
150
151        Ok(Some(UnsubAckProperties {
152            reason_string,
153            user_properties,
154        }))
155    }
156
157    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
158        let len = self.len();
159        write_remaining_length(buffer, len)?;
160
161        if let Some(reason) = &self.reason_string {
162            buffer.put_u8(PropertyType::ReasonString as u8);
163            write_mqtt_string(buffer, reason)?;
164        }
165
166        for (key, value) in &self.user_properties {
167            buffer.put_u8(PropertyType::UserProperty as u8);
168            write_mqtt_string(buffer, key)?;
169            write_mqtt_string(buffer, value)?;
170        }
171
172        Ok(())
173    }
174}
175
176/// Connection return code type
177fn reason(num: u8) -> Result<UnsubAckReason, Error> {
178    let code = match num {
179        0x00 => UnsubAckReason::Success,
180        0x11 => UnsubAckReason::NoSubscriptionExisted,
181        0x80 => UnsubAckReason::UnspecifiedError,
182        0x83 => UnsubAckReason::ImplementationSpecificError,
183        0x87 => UnsubAckReason::NotAuthorized,
184        0x8F => UnsubAckReason::TopicFilterInvalid,
185        0x91 => UnsubAckReason::PacketIdentifierInUse,
186        num => return Err(Error::InvalidSubscribeReasonCode(num)),
187    };
188
189    Ok(code)
190}
191
192fn code(reason: UnsubAckReason) -> u8 {
193    match reason {
194        UnsubAckReason::Success => 0x00,
195        UnsubAckReason::NoSubscriptionExisted => 0x11,
196        UnsubAckReason::UnspecifiedError => 0x80,
197        UnsubAckReason::ImplementationSpecificError => 0x83,
198        UnsubAckReason::NotAuthorized => 0x87,
199        UnsubAckReason::TopicFilterInvalid => 0x8F,
200        UnsubAckReason::PacketIdentifierInUse => 0x91,
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use crate::test::read_write_packets;
207    use crate::Packet;
208
209    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
210    use super::*;
211    use bytes::BytesMut;
212    use pretty_assertions::assert_eq;
213
214    #[test]
215    fn length_calculation() {
216        let mut dummy_bytes = BytesMut::new();
217        // Use user_properties to pad the size to exceed ~128 bytes to make the
218        // remaining_length field in the packet be 2 bytes long.
219        let unsuback_props = UnsubAckProperties {
220            reason_string: None,
221            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
222        };
223
224        let unsuback_pkt = UnsubAck {
225            pkid: 1,
226            reasons: vec![UnsubAckReason::Success],
227            properties: Some(unsuback_props),
228        };
229
230        let size_from_size = unsuback_pkt.size();
231        let size_from_write = unsuback_pkt.write(&mut dummy_bytes).unwrap();
232        let size_from_bytes = dummy_bytes.len();
233
234        assert_eq!(size_from_write, size_from_bytes);
235        assert_eq!(size_from_size, size_from_bytes);
236    }
237
238    #[test]
239    fn test_write_read() {
240        read_write_packets(write_read_provider());
241    }
242
243    fn write_read_provider() -> Vec<Packet> {
244        vec![
245            Packet::UnsubAck(UnsubAck {
246                pkid: 42,
247                reasons: vec![UnsubAckReason::Success],
248                properties: None,
249            }),
250            Packet::UnsubAck(UnsubAck {
251                pkid: 42,
252                reasons: vec![UnsubAckReason::Success],
253                properties: Some(UnsubAckProperties {
254                    reason_string: Some("Resaon".into()),
255                    user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
256                }),
257            }),
258        ]
259    }
260}