Skip to main content

rumqttc/mqttbytes/v5/
unsubscribe.rs

1use super::{
2    BufMut, BytesMut, Error, FixedHeader, PropertyType, len_len, length, property,
3    read_mqtt_string, read_u8, read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, Bytes};
6
7/// Unsubscribe packet
8#[derive(Debug, Clone, PartialEq, Eq, Default)]
9pub struct Unsubscribe {
10    pub pkid: u16,
11    pub filters: Vec<String>,
12    pub properties: Option<UnsubscribeProperties>,
13}
14
15impl Unsubscribe {
16    pub fn new<S: Into<String>>(filter: S, properties: Option<UnsubscribeProperties>) -> Self {
17        Self {
18            filters: vec![filter.into()],
19            properties,
20            ..Default::default()
21        }
22    }
23
24    #[must_use]
25    pub fn size(&self) -> usize {
26        let len = self.len();
27        let remaining_len_size = len_len(len);
28
29        1 + remaining_len_size + len
30    }
31
32    fn len(&self) -> usize {
33        // Packet id + length of filters (unlike subscribe, this just a string.
34        // Hence 2 is prefixed for len per filter)
35        let mut len = 2 + self.filters.iter().fold(0, |s, t| 2 + s + t.len());
36
37        if let Some(p) = &self.properties {
38            let properties_len = p.len();
39            let properties_len_len = len_len(properties_len);
40            len += properties_len_len + properties_len;
41        } else {
42            // just 1 byte representing 0 len
43            len += 1;
44        }
45
46        len
47    }
48
49    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
50        let variable_header_index = fixed_header.header_len;
51        bytes.advance(variable_header_index);
52
53        let pkid = read_u16(&mut bytes)?;
54        let properties = UnsubscribeProperties::read(&mut bytes)?;
55
56        let mut filters = Vec::with_capacity(1);
57        while bytes.has_remaining() {
58            let filter = read_mqtt_string(&mut bytes)?;
59            filters.push(filter);
60        }
61
62        let unsubscribe = Self {
63            pkid,
64            filters,
65            properties,
66        };
67        Ok(unsubscribe)
68    }
69
70    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
71        buffer.put_u8(0xA2);
72
73        // write remaining length
74        let remaining_len = self.len();
75        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
76
77        // write packet id
78        buffer.put_u16(self.pkid);
79
80        if let Some(p) = &self.properties {
81            p.write(buffer)?;
82        } else {
83            write_remaining_length(buffer, 0)?;
84        }
85
86        // write filters
87        for filter in &self.filters {
88            write_mqtt_string(buffer, filter);
89        }
90
91        Ok(1 + remaining_len_bytes + remaining_len)
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct UnsubscribeProperties {
97    pub user_properties: Vec<(String, String)>,
98}
99
100impl UnsubscribeProperties {
101    fn len(&self) -> usize {
102        let mut len = 0;
103
104        for (key, value) in &self.user_properties {
105            len += 1 + 2 + key.len() + 2 + value.len();
106        }
107
108        len
109    }
110
111    pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
112        let mut user_properties = Vec::new();
113
114        let (properties_len_len, properties_len) = length(bytes.iter())?;
115        bytes.advance(properties_len_len);
116
117        if properties_len == 0 {
118            return Ok(None);
119        }
120
121        let mut cursor = 0;
122        // read until cursor reaches property length. properties_len = 0 will skip this loop
123        while cursor < properties_len {
124            let prop = read_u8(bytes)?;
125            cursor += 1;
126
127            match property(prop)? {
128                PropertyType::UserProperty => {
129                    let key = read_mqtt_string(bytes)?;
130                    let value = read_mqtt_string(bytes)?;
131                    cursor += 2 + key.len() + 2 + value.len();
132                    user_properties.push((key, value));
133                }
134                _ => return Err(Error::InvalidPropertyType(prop)),
135            }
136        }
137
138        Ok(Some(Self { user_properties }))
139    }
140
141    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
142        let len = self.len();
143        write_remaining_length(buffer, len)?;
144
145        for (key, value) in &self.user_properties {
146            buffer.put_u8(PropertyType::UserProperty as u8);
147            write_mqtt_string(buffer, key);
148            write_mqtt_string(buffer, value);
149        }
150
151        Ok(())
152    }
153}
154
155#[cfg(test)]
156mod test {
157    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
158    use super::*;
159    use bytes::BytesMut;
160    use pretty_assertions::assert_eq;
161
162    #[test]
163    fn length_calculation() {
164        let mut dummy_bytes = BytesMut::new();
165        // Use user_properties to pad the size to exceed ~128 bytes to make the
166        // remaining_length field in the packet be 2 bytes long.
167        let unsubscribe_props = UnsubscribeProperties {
168            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
169        };
170
171        let unsubscribe_pkt = Unsubscribe::new("hello/world", Some(unsubscribe_props));
172
173        let size_from_size = unsubscribe_pkt.size();
174        let size_from_write = unsubscribe_pkt.write(&mut dummy_bytes).unwrap();
175        let size_from_bytes = dummy_bytes.len();
176
177        assert_eq!(size_from_write, size_from_bytes);
178        assert_eq!(size_from_size, size_from_bytes);
179    }
180}