sage_mqtt/control/
pubcomp.rs1use crate::{
2 codec, PropertiesDecoder, Property,
3 ReasonCode::{self, ProtocolError},
4 Result as SageResult,
5};
6use std::{convert::TryInto, marker::Unpin};
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
8
9#[derive(Debug, PartialEq, Clone)]
12pub struct PubComp {
13 pub packet_identifier: u16,
16
17 pub reason_code: ReasonCode,
28
29 pub reason_string: Option<String>,
31
32 pub user_properties: Vec<(String, String)>,
34}
35
36impl Default for PubComp {
37 fn default() -> Self {
38 PubComp {
39 packet_identifier: 0,
40 reason_code: ReasonCode::Success,
41 reason_string: None,
42 user_properties: Default::default(),
43 }
44 }
45}
46
47impl PubComp {
48 pub(crate) async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
49 let mut n_bytes = codec::write_two_byte_integer(self.packet_identifier, writer).await?;
50
51 let mut properties = Vec::new();
52
53 if let Some(v) = self.reason_string {
54 n_bytes += Property::ReasonString(v).encode(&mut properties).await?;
55 }
56 for (k, v) in self.user_properties {
57 n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
58 }
59
60 if n_bytes == 2 && self.reason_code != ReasonCode::Success {
61 Ok(2)
62 } else {
63 n_bytes += codec::write_reason_code(self.reason_code, writer).await?;
64 n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
65 writer.write_all(&properties).await?;
66 Ok(n_bytes)
67 }
68 }
69
70 pub(crate) async fn read<R: AsyncRead + Unpin>(
71 reader: &mut R,
72 shortened: bool,
73 ) -> SageResult<Self> {
74 let packet_identifier = codec::read_two_byte_integer(reader).await?;
75
76 let mut pubcomp = PubComp {
77 packet_identifier,
78 ..Default::default()
79 };
80
81 if shortened {
82 pubcomp.reason_code = ReasonCode::Success;
83 } else {
84 pubcomp.reason_code = codec::read_byte(reader).await?.try_into()?;
85
86 let mut properties = PropertiesDecoder::take(reader).await?;
87 while properties.has_properties() {
88 match properties.read().await? {
89 Property::ReasonString(v) => pubcomp.reason_string = Some(v),
90 Property::UserProperty(k, v) => pubcomp.user_properties.push((k, v)),
91 _ => return Err(ProtocolError.into()),
92 }
93 }
94 }
95
96 Ok(pubcomp)
97 }
98}
99
100#[cfg(test)]
101mod unit {
102
103 use super::*;
104 use std::io::Cursor;
105
106 fn encoded() -> Vec<u8> {
107 vec![
108 5, 57, 146, 28, 31, 0, 11, 66, 108, 97, 99, 107, 32, 66, 101, 116, 116, 121, 38, 0, 6,
109 72, 195, 166, 114, 121, 97, 0, 3, 67, 97, 116,
110 ]
111 }
112
113 fn decoded() -> PubComp {
114 PubComp {
115 packet_identifier: 1337,
116 reason_code: ReasonCode::PacketIdentifierNotFound,
117 reason_string: Some("Black Betty".into()),
118 user_properties: vec![("Hærya".into(), "Cat".into())],
119 }
120 }
121
122 #[tokio::test]
123 async fn encode() {
124 let test_data = decoded();
125 let mut tested_result = Vec::new();
126 let n_bytes = test_data.write(&mut tested_result).await.unwrap();
127 assert_eq!(tested_result, encoded());
128 assert_eq!(n_bytes, 32);
129 }
130
131 #[tokio::test]
132 async fn decode() {
133 let mut test_data = Cursor::new(encoded());
134 let tested_result = PubComp::read(&mut test_data, false).await.unwrap();
135 assert_eq!(tested_result, decoded());
136 }
137}