1use std::convert::{TryFrom, TryInto};
2
3use bytes::{BufMut, Bytes, BytesMut};
4
5use super::{
6 Buf, Error, FixedHeader, PacketType, len_len, length, read_mqtt_string, read_u8, read_u32,
7 write_mqtt_string, write_remaining_length,
8};
9
10use super::{PropertyType, property};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(u8)]
14pub enum DisconnectReasonCode {
15 NormalDisconnection = 0x00,
17 DisconnectWithWillMessage = 0x04,
19 UnspecifiedError = 0x80,
21 MalformedPacket = 0x81,
23 ProtocolError = 0x82,
25 ImplementationSpecificError = 0x83,
27 NotAuthorized = 0x87,
29 ServerBusy = 0x89,
31 ServerShuttingDown = 0x8B,
33 KeepAliveTimeout = 0x8D,
35 SessionTakenOver = 0x8E,
37 TopicFilterInvalid = 0x8F,
39 TopicNameInvalid = 0x90,
41 ReceiveMaximumExceeded = 0x93,
43 TopicAliasInvalid = 0x94,
45 PacketTooLarge = 0x95,
47 MessageRateTooHigh = 0x96,
49 QuotaExceeded = 0x97,
51 AdministrativeAction = 0x98,
53 PayloadFormatInvalid = 0x99,
55 RetainNotSupported = 0x9A,
57 QoSNotSupported = 0x9B,
59 UseAnotherServer = 0x9C,
61 ServerMoved = 0x9D,
63 SharedSubscriptionNotSupported = 0x9E,
65 ConnectionRateExceeded = 0x9F,
67 MaximumConnectTime = 0xA0,
69 SubscriptionIdentifiersNotSupported = 0xA1,
71 WildcardSubscriptionsNotSupported = 0xA2,
73}
74
75impl TryFrom<u8> for DisconnectReasonCode {
76 type Error = Error;
77
78 fn try_from(value: u8) -> Result<Self, Self::Error> {
79 let rc = match value {
80 0x00 => Self::NormalDisconnection,
81 0x04 => Self::DisconnectWithWillMessage,
82 0x80 => Self::UnspecifiedError,
83 0x81 => Self::MalformedPacket,
84 0x82 => Self::ProtocolError,
85 0x83 => Self::ImplementationSpecificError,
86 0x87 => Self::NotAuthorized,
87 0x89 => Self::ServerBusy,
88 0x8B => Self::ServerShuttingDown,
89 0x8D => Self::KeepAliveTimeout,
90 0x8E => Self::SessionTakenOver,
91 0x8F => Self::TopicFilterInvalid,
92 0x90 => Self::TopicNameInvalid,
93 0x93 => Self::ReceiveMaximumExceeded,
94 0x94 => Self::TopicAliasInvalid,
95 0x95 => Self::PacketTooLarge,
96 0x96 => Self::MessageRateTooHigh,
97 0x97 => Self::QuotaExceeded,
98 0x98 => Self::AdministrativeAction,
99 0x99 => Self::PayloadFormatInvalid,
100 0x9A => Self::RetainNotSupported,
101 0x9B => Self::QoSNotSupported,
102 0x9C => Self::UseAnotherServer,
103 0x9D => Self::ServerMoved,
104 0x9E => Self::SharedSubscriptionNotSupported,
105 0x9F => Self::ConnectionRateExceeded,
106 0xA0 => Self::MaximumConnectTime,
107 0xA1 => Self::SubscriptionIdentifiersNotSupported,
108 0xA2 => Self::WildcardSubscriptionsNotSupported,
109 other => return Err(Error::InvalidConnectReturnCode(other)),
110 };
111
112 Ok(rc)
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct DisconnectProperties {
118 pub session_expiry_interval: Option<u32>,
120
121 pub reason_string: Option<String>,
123
124 pub user_properties: Vec<(String, String)>,
126
127 pub server_reference: Option<String>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct Disconnect {
133 pub reason_code: DisconnectReasonCode,
135
136 pub properties: Option<DisconnectProperties>,
138}
139
140impl DisconnectProperties {
141 fn len(&self) -> usize {
151 let mut length = 0;
152
153 if self.session_expiry_interval.is_some() {
154 length += 1 + 4;
155 }
156
157 if let Some(reason) = &self.reason_string {
158 length += 1 + 2 + reason.len();
159 }
160
161 for (key, value) in &self.user_properties {
162 length += 1 + 2 + key.len() + 2 + value.len();
163 }
164
165 if let Some(server_reference) = &self.server_reference {
166 length += 1 + 2 + server_reference.len();
167 }
168
169 length
170 }
171
172 pub fn extract(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
173 let (properties_len_len, properties_len) = length(bytes.iter())?;
174
175 bytes.advance(properties_len_len);
176
177 if properties_len == 0 {
178 return Ok(None);
179 }
180
181 let mut session_expiry_interval = None;
182 let mut reason_string = None;
183 let mut user_properties = Vec::new();
184 let mut server_reference = None;
185
186 let mut cursor = 0;
187
188 while cursor < properties_len {
190 let prop = read_u8(bytes)?;
191 cursor += 1;
192
193 match property(prop)? {
194 PropertyType::SessionExpiryInterval => {
195 session_expiry_interval = Some(read_u32(bytes)?);
196 cursor += 4;
197 }
198 PropertyType::ReasonString => {
199 let reason = read_mqtt_string(bytes)?;
200 cursor += 2 + reason.len();
201 reason_string = Some(reason);
202 }
203 PropertyType::UserProperty => {
204 let key = read_mqtt_string(bytes)?;
205 let value = read_mqtt_string(bytes)?;
206 cursor += 2 + key.len() + 2 + value.len();
207 user_properties.push((key, value));
208 }
209 PropertyType::ServerReference => {
210 let reference = read_mqtt_string(bytes)?;
211 cursor += 2 + reference.len();
212 server_reference = Some(reference);
213 }
214 _ => return Err(Error::InvalidPropertyType(prop)),
215 }
216 }
217
218 let properties = Self {
219 session_expiry_interval,
220 reason_string,
221 user_properties,
222 server_reference,
223 };
224
225 Ok(Some(properties))
226 }
227
228 fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
229 let length = self.len();
230 write_remaining_length(buffer, length)?;
231
232 if let Some(session_expiry_interval) = self.session_expiry_interval {
233 buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
234 buffer.put_u32(session_expiry_interval);
235 }
236
237 if let Some(reason) = &self.reason_string {
238 buffer.put_u8(PropertyType::ReasonString as u8);
239 write_mqtt_string(buffer, reason);
240 }
241
242 for (key, value) in &self.user_properties {
243 buffer.put_u8(PropertyType::UserProperty as u8);
244 write_mqtt_string(buffer, key);
245 write_mqtt_string(buffer, value);
246 }
247
248 if let Some(reference) = &self.server_reference {
249 buffer.put_u8(PropertyType::ServerReference as u8);
250 write_mqtt_string(buffer, reference);
251 }
252
253 Ok(())
254 }
255}
256
257impl Disconnect {
258 #[must_use]
259 pub const fn new(reason: DisconnectReasonCode) -> Self {
260 Self {
261 reason_code: reason,
262 properties: None,
263 }
264 }
265
266 #[must_use]
267 pub const fn new_with_properties(
268 reason: DisconnectReasonCode,
269 properties: DisconnectProperties,
270 ) -> Self {
271 Self {
272 reason_code: reason,
273 properties: Some(properties),
274 }
275 }
276
277 fn len(&self) -> usize {
278 if self.reason_code == DisconnectReasonCode::NormalDisconnection
279 && self.properties.is_none()
280 {
281 return 2; }
283
284 let mut length = 1; if let Some(properties) = &self.properties {
287 let properties_len = properties.len();
288 let properties_len_len = len_len(properties_len);
289 length += properties_len_len + properties_len;
290 }
291
292 length
293 }
294
295 #[must_use]
296 pub fn size(&self) -> usize {
297 let len = self.len();
298 if len == 2 {
299 return len;
300 }
301
302 let remaining_len_size = len_len(len);
303
304 1 + remaining_len_size + len
305 }
306
307 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
308 let packet_type = fixed_header.byte1 >> 4;
309 let flags = fixed_header.byte1 & 0b0000_1111;
310
311 bytes.advance(fixed_header.header_len);
312
313 if packet_type != PacketType::Disconnect as u8 {
314 return Err(Error::InvalidPacketType(packet_type));
315 }
316
317 if flags != 0x00 {
318 return Err(Error::MalformedPacket);
319 }
320
321 if fixed_header.remaining_len == 0 {
322 return Ok(Self::new(DisconnectReasonCode::NormalDisconnection));
323 }
324
325 let reason_code = read_u8(&mut bytes)?;
326
327 let disconnect = Self {
328 reason_code: reason_code.try_into()?,
329 properties: DisconnectProperties::extract(&mut bytes)?,
330 };
331
332 Ok(disconnect)
333 }
334
335 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
336 buffer.put_u8(0xE0);
337
338 let length = self.len();
339
340 if length == 2 {
341 buffer.put_u8(0x00);
342 return Ok(length);
343 }
344
345 let len_len = write_remaining_length(buffer, length)?;
346
347 buffer.put_u8(self.reason_code as u8);
348
349 if let Some(properties) = &self.properties {
350 properties.write(buffer)?;
351 } else {
352 write_remaining_length(buffer, 0)?;
353 }
354
355 Ok(1 + len_len + length)
356 }
357}
358
359#[cfg(test)]
360mod test {
361 use bytes::BytesMut;
362
363 use super::{Disconnect, DisconnectProperties, DisconnectReasonCode};
364 use crate::mqttbytes::v5::parse_fixed_header;
365
366 #[test]
367 fn disconnect1_parsing_works() {
368 let mut buffer = bytes::BytesMut::new();
369 let packet_bytes = [
370 0xE0, 0x00, ];
373 let expected = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
374
375 buffer.extend_from_slice(&packet_bytes[..]);
376
377 let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
378 let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
379 let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
380
381 assert_eq!(disconnect, expected);
382 }
383
384 #[test]
385 fn disconnect1_encoding_works() {
386 let mut buffer = BytesMut::new();
387 let disconnect = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
388 let expected = [
389 0xE0, 0x00, ];
392
393 disconnect.write(&mut buffer).unwrap();
394
395 assert_eq!(&buffer[..], &expected);
396 }
397
398 fn sample2() -> Disconnect {
399 let properties = DisconnectProperties {
400 session_expiry_interval: Some(1234),
401 reason_string: Some("test".to_owned()),
402 user_properties: vec![("test".to_owned(), "test".to_owned())],
403 server_reference: Some("test".to_owned()),
404 };
405
406 Disconnect {
407 reason_code: DisconnectReasonCode::UnspecifiedError,
408 properties: Some(properties),
409 }
410 }
411
412 fn sample_bytes2() -> Vec<u8> {
413 vec![
414 0xE0, 0x22, 0x80, 0x20, 0x11, 0x00, 0x00, 0x04, 0xd2, 0x1F, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x26, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x00, 0x04, 0x74, 0x65, 0x73,
421 0x74, 0x1C, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, ]
424 }
425
426 #[test]
427 fn disconnect2_parsing_works() {
428 let mut buffer = bytes::BytesMut::new();
429 let packet_bytes = sample_bytes2();
430 let expected = sample2();
431
432 buffer.extend_from_slice(&packet_bytes[..]);
433
434 let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
435 let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
436 let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
437
438 assert_eq!(disconnect, expected);
439 }
440
441 #[test]
442 fn disconnect2_encoding_works() {
443 let mut buffer = BytesMut::new();
444
445 let disconnect = sample2();
446 let expected = sample_bytes2();
447
448 disconnect.write(&mut buffer).unwrap();
449
450 assert_eq!(&buffer[..], &expected);
451 }
452
453 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
455 use pretty_assertions::assert_eq;
457
458 #[test]
459 fn length_calculation() {
460 let mut dummy_bytes = BytesMut::new();
461 let disconn_props = DisconnectProperties {
464 session_expiry_interval: None,
465 reason_string: None,
466 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
467 server_reference: None,
468 };
469
470 let mut disconn_pkt = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
471 disconn_pkt.properties = Some(disconn_props);
472
473 let size_from_size = disconn_pkt.size();
474 let size_from_write = disconn_pkt.write(&mut dummy_bytes).unwrap();
475 let size_from_bytes = dummy_bytes.len();
476
477 assert_eq!(size_from_write, size_from_bytes);
478 assert_eq!(size_from_size, size_from_bytes);
479 }
480}