rumqttc/mqttbytes/v5/
unsubscribe.rs1use super::{
2 BufMut, BytesMut, Error, FixedHeader, PropertyType, len_len, length, property,
3 read_mqtt_string, read_u8, read_u16, write_mqtt_string, write_remaining_length,
4};
5use bytes::{Buf, Bytes};
6
7#[derive(Debug, Clone, PartialEq, Eq, Default)]
9pub struct Unsubscribe {
10 pub pkid: u16,
11 pub filters: Vec<String>,
12 pub properties: Option<UnsubscribeProperties>,
13}
14
15impl Unsubscribe {
16 pub fn new<S: Into<String>>(filter: S, properties: Option<UnsubscribeProperties>) -> Self {
17 Self {
18 filters: vec![filter.into()],
19 properties,
20 ..Default::default()
21 }
22 }
23
24 #[must_use]
25 pub fn size(&self) -> usize {
26 let len = self.len();
27 let remaining_len_size = len_len(len);
28
29 1 + remaining_len_size + len
30 }
31
32 fn len(&self) -> usize {
33 let mut len = 2 + self.filters.iter().fold(0, |s, t| 2 + s + t.len());
36
37 if let Some(p) = &self.properties {
38 let properties_len = p.len();
39 let properties_len_len = len_len(properties_len);
40 len += properties_len_len + properties_len;
41 } else {
42 len += 1;
44 }
45
46 len
47 }
48
49 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
50 let variable_header_index = fixed_header.header_len;
51 bytes.advance(variable_header_index);
52
53 let pkid = read_u16(&mut bytes)?;
54 let properties = UnsubscribeProperties::read(&mut bytes)?;
55
56 let mut filters = Vec::with_capacity(1);
57 while bytes.has_remaining() {
58 let filter = read_mqtt_string(&mut bytes)?;
59 filters.push(filter);
60 }
61
62 let unsubscribe = Self {
63 pkid,
64 filters,
65 properties,
66 };
67 Ok(unsubscribe)
68 }
69
70 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
71 buffer.put_u8(0xA2);
72
73 let remaining_len = self.len();
75 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
76
77 buffer.put_u16(self.pkid);
79
80 if let Some(p) = &self.properties {
81 p.write(buffer)?;
82 } else {
83 write_remaining_length(buffer, 0)?;
84 }
85
86 for filter in &self.filters {
88 write_mqtt_string(buffer, filter);
89 }
90
91 Ok(1 + remaining_len_bytes + remaining_len)
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct UnsubscribeProperties {
97 pub user_properties: Vec<(String, String)>,
98}
99
100impl UnsubscribeProperties {
101 fn len(&self) -> usize {
102 let mut len = 0;
103
104 for (key, value) in &self.user_properties {
105 len += 1 + 2 + key.len() + 2 + value.len();
106 }
107
108 len
109 }
110
111 pub fn read(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
112 let mut user_properties = Vec::new();
113
114 let (properties_len_len, properties_len) = length(bytes.iter())?;
115 bytes.advance(properties_len_len);
116
117 if properties_len == 0 {
118 return Ok(None);
119 }
120
121 let mut cursor = 0;
122 while cursor < properties_len {
124 let prop = read_u8(bytes)?;
125 cursor += 1;
126
127 match property(prop)? {
128 PropertyType::UserProperty => {
129 let key = read_mqtt_string(bytes)?;
130 let value = read_mqtt_string(bytes)?;
131 cursor += 2 + key.len() + 2 + value.len();
132 user_properties.push((key, value));
133 }
134 _ => return Err(Error::InvalidPropertyType(prop)),
135 }
136 }
137
138 Ok(Some(Self { user_properties }))
139 }
140
141 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
142 let len = self.len();
143 write_remaining_length(buffer, len)?;
144
145 for (key, value) in &self.user_properties {
146 buffer.put_u8(PropertyType::UserProperty as u8);
147 write_mqtt_string(buffer, key);
148 write_mqtt_string(buffer, value);
149 }
150
151 Ok(())
152 }
153}
154
155#[cfg(test)]
156mod test {
157 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
158 use super::*;
159 use bytes::BytesMut;
160 use pretty_assertions::assert_eq;
161
162 #[test]
163 fn length_calculation() {
164 let mut dummy_bytes = BytesMut::new();
165 let unsubscribe_props = UnsubscribeProperties {
168 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
169 };
170
171 let unsubscribe_pkt = Unsubscribe::new("hello/world", Some(unsubscribe_props));
172
173 let size_from_size = unsubscribe_pkt.size();
174 let size_from_write = unsubscribe_pkt.write(&mut dummy_bytes).unwrap();
175 let size_from_bytes = dummy_bytes.len();
176
177 assert_eq!(size_from_write, size_from_bytes);
178 assert_eq!(size_from_size, size_from_bytes);
179 }
180}