1use crate::{
2 codec,
3 defaults::{
4 DEFAULT_MAXIMUM_QOS, DEFAULT_RECEIVE_MAXIMUM, DEFAULT_RETAIN_AVAILABLE,
5 DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE, DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE,
6 DEFAULT_TOPIC_ALIAS_MAXIMUM, DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE,
7 },
8 Authentication, ClientID, PropertiesDecoder, Property, QoS,
9 ReasonCode::{self, ProtocolError},
10 Result as SageResult,
11};
12use std::{convert::TryInto, marker::Unpin};
13use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
14
15#[derive(PartialEq, Debug, Clone)]
19pub struct ConnAck {
20 pub session_present: bool,
23
24 pub reason_code: ReasonCode,
37
38 pub session_expiry_interval: Option<u32>,
41
42 pub receive_maximum: u16,
45
46 pub maximum_qos: QoS,
50
51 pub retain_available: bool,
55
56 pub maximum_packet_size: Option<u32>,
59
60 pub assigned_client_id: Option<ClientID>,
63
64 pub topic_alias_maximum: u16,
67
68 pub reason_string: Option<String>,
71
72 pub user_properties: Vec<(String, String)>,
74
75 pub wildcard_subscription_available: bool,
77
78 pub subscription_identifiers_available: bool,
80
81 pub shared_subscription_available: bool,
83
84 pub keep_alive: Option<u16>,
87
88 pub response_information: Option<String>,
93
94 pub reference: Option<String>,
98
99 pub authentication: Option<Authentication>,
102}
103
104impl Default for ConnAck {
105 fn default() -> Self {
106 ConnAck {
107 session_present: false,
108 reason_code: ReasonCode::Success,
109 session_expiry_interval: None,
110 receive_maximum: DEFAULT_RECEIVE_MAXIMUM,
111 maximum_qos: DEFAULT_MAXIMUM_QOS,
112 retain_available: DEFAULT_RETAIN_AVAILABLE,
113 maximum_packet_size: None,
114 assigned_client_id: None,
115 topic_alias_maximum: DEFAULT_TOPIC_ALIAS_MAXIMUM,
116 reason_string: Default::default(),
117 user_properties: Default::default(),
118 wildcard_subscription_available: DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE,
119 subscription_identifiers_available: DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE,
120 shared_subscription_available: DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE,
121 keep_alive: None,
122 response_information: Default::default(),
123 reference: None,
124 authentication: None,
125 }
126 }
127}
128
129impl ConnAck {
130 pub(crate) async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
131 let mut n_bytes = codec::write_bool(self.session_present, writer).await?;
132 n_bytes += codec::write_reason_code(self.reason_code, writer).await?;
133
134 let mut properties = Vec::new();
135
136 if let Some(v) = self.session_expiry_interval {
137 n_bytes += Property::SessionExpiryInterval(v)
138 .encode(&mut properties)
139 .await?;
140 }
141 n_bytes += Property::ReceiveMaximum(self.receive_maximum)
142 .encode(&mut properties)
143 .await?;
144 n_bytes += Property::MaximumQoS(self.maximum_qos)
145 .encode(&mut properties)
146 .await?;
147 n_bytes += Property::RetainAvailable(self.retain_available)
148 .encode(&mut properties)
149 .await?;
150 if let Some(v) = self.maximum_packet_size {
151 n_bytes += Property::MaximumPacketSize(v)
152 .encode(&mut properties)
153 .await?;
154 }
155 if let Some(v) = self.assigned_client_id {
156 n_bytes += Property::AssignedClientIdentifier(v)
157 .encode(&mut properties)
158 .await?;
159 }
160 n_bytes += Property::TopicAliasMaximum(self.topic_alias_maximum)
161 .encode(&mut properties)
162 .await?;
163 if let Some(reason_string) = self.reason_string {
164 if !reason_string.is_empty() {
165 n_bytes += Property::ReasonString(reason_string)
166 .encode(&mut properties)
167 .await?;
168 }
169 }
170 for (k, v) in self.user_properties {
171 n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
172 }
173 n_bytes += Property::WildcardSubscriptionAvailable(self.wildcard_subscription_available)
174 .encode(&mut properties)
175 .await?;
176 n_bytes += Property::SharedSubscriptionAvailable(self.shared_subscription_available)
177 .encode(&mut properties)
178 .await?;
179 if let Some(v) = self.keep_alive {
180 n_bytes += Property::ServerKeepAlive(v).encode(&mut properties).await?;
181 }
182
183 if let Some(v) = self.response_information {
184 n_bytes += Property::ResponseInformation(v)
185 .encode(&mut properties)
186 .await?;
187 }
188
189 if let Some(v) = self.reference {
190 n_bytes += Property::ServerReference(v).encode(&mut properties).await?;
191 }
192 if let Some(authentication) = self.authentication {
193 n_bytes += Property::AuthenticationMethod(authentication.method)
194 .encode(&mut properties)
195 .await?;
196 if !authentication.data.is_empty() {
197 n_bytes += Property::AuthenticationData(authentication.data)
198 .encode(&mut properties)
199 .await?;
200 }
201 }
202
203 n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
204 writer.write_all(&properties).await?;
205
206 Ok(n_bytes)
207 }
208
209 pub(crate) async fn read<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
210 let session_present = codec::read_bool(reader).await?;
211
212 let reason_code = codec::read_byte(reader).await?.try_into()?;
213
214 let mut session_expiry_interval = None;
215 let mut receive_maximum = DEFAULT_RECEIVE_MAXIMUM;
216 let mut maximum_qos = DEFAULT_MAXIMUM_QOS;
217 let mut retain_available = DEFAULT_RETAIN_AVAILABLE;
218 let mut maximum_packet_size = None;
219 let mut assigned_client_id = None;
220 let mut topic_alias_maximum = DEFAULT_TOPIC_ALIAS_MAXIMUM;
221 let mut reason_string = None;
222 let mut user_properties = Vec::new();
223 let mut wildcard_subscription_available = DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE;
224 let mut subscription_identifiers_available = DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE;
225 let mut shared_subscription_available = DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE;
226 let mut keep_alive = None;
227 let mut response_information = None;
228 let mut reference = None;
229 let mut authentication_method = None;
230 let mut authentication_data = Default::default();
231
232 let mut decoder = PropertiesDecoder::take(reader).await?;
233 while decoder.has_properties() {
234 match decoder.read().await? {
235 Property::SessionExpiryInterval(v) => session_expiry_interval = Some(v),
236 Property::ReceiveMaximum(v) => receive_maximum = v,
237 Property::MaximumQoS(v) => maximum_qos = v,
238 Property::RetainAvailable(v) => retain_available = v,
239 Property::MaximumPacketSize(v) => maximum_packet_size = Some(v),
240 Property::AssignedClientIdentifier(v) => assigned_client_id = Some(v),
241 Property::TopicAliasMaximum(v) => topic_alias_maximum = v,
242 Property::ReasonString(v) => {
243 reason_string = if v.is_empty() { None } else { Some(v) }
244 }
245 Property::UserProperty(k, v) => user_properties.push((k, v)),
246 Property::WildcardSubscriptionAvailable(v) => wildcard_subscription_available = v,
247 Property::SubscriptionIdentifiersAvailable(v) => {
248 subscription_identifiers_available = v
249 }
250 Property::SharedSubscriptionAvailable(v) => shared_subscription_available = v,
251 Property::ServerKeepAlive(v) => keep_alive = Some(v),
252 Property::ResponseInformation(v) => response_information = Some(v),
253 Property::ServerReference(v) => reference = Some(v),
254 Property::AuthenticationMethod(v) => authentication_method = Some(v),
255 Property::AuthenticationData(v) => authentication_data = v,
256 _ => return Err(ProtocolError.into()),
257 }
258 }
259
260 let authentication = if let Some(method) = authentication_method {
261 Some(Authentication {
262 method,
263 data: authentication_data,
264 })
265 } else {
266 if !authentication_data.is_empty() {
267 return Err(ProtocolError.into());
268 }
269 None
270 };
271
272 Ok(ConnAck {
273 session_present,
274 reason_code,
275 session_expiry_interval,
276 receive_maximum,
277 maximum_qos,
278 retain_available,
279 maximum_packet_size,
280 assigned_client_id,
281 topic_alias_maximum,
282 reason_string,
283 user_properties,
284 wildcard_subscription_available,
285 subscription_identifiers_available,
286 shared_subscription_available,
287 keep_alive,
288 response_information,
289 reference,
290 authentication,
291 })
292 }
293}
294
295#[cfg(test)]
296mod unit {
297
298 use super::*;
299 use std::io::Cursor;
300
301 fn encoded() -> Vec<u8> {
302 vec![
303 1, 138, 111, 17, 0, 0, 5, 57, 33, 0, 30, 36, 1, 37, 0, 39, 0, 0, 1, 0, 18, 0, 11, 87,
304 97, 108, 107, 84, 104, 105, 115, 87, 97, 121, 34, 0, 10, 31, 0, 7, 82, 85, 78, 45, 68,
305 77, 67, 38, 0, 7, 77, 111, 103, 119, 97, 195, 175, 0, 3, 67, 97, 116, 40, 0, 42, 0, 19,
306 0, 17, 26, 0, 9, 65, 101, 114, 111, 115, 109, 105, 116, 104, 28, 0, 14, 80, 97, 105,
307 110, 116, 32, 73, 116, 32, 66, 108, 97, 99, 107, 21, 0, 6, 87, 105, 108, 108, 111, 119,
308 22, 0, 4, 13, 21, 234, 94,
309 ]
310 }
311
312 fn decoded() -> ConnAck {
313 ConnAck {
314 session_present: true,
315 reason_code: ReasonCode::Banned,
316 session_expiry_interval: Some(1337),
317 receive_maximum: 30,
318 maximum_qos: QoS::AtLeastOnce,
319 retain_available: false,
320 maximum_packet_size: Some(256),
321 assigned_client_id: Some("WalkThisWay".into()),
322 topic_alias_maximum: 10,
323 reason_string: Some("RUN-DMC".into()),
324 user_properties: vec![("Mogwaï".into(), "Cat".into())],
325 wildcard_subscription_available: false,
326 subscription_identifiers_available: true,
327 shared_subscription_available: false,
328 keep_alive: Some(17),
329 response_information: Some("Aerosmith".into()),
330 reference: Some("Paint It Black".into()),
331 authentication: Some(Authentication {
332 method: "Willow".into(),
333 data: vec![0x0D, 0x15, 0xEA, 0x5E],
334 }),
335 }
336 }
337
338 #[tokio::test]
339 async fn encode() {
340 let test_data = decoded();
341 let mut tested_result = Vec::new();
342 let n_bytes = test_data.write(&mut tested_result).await.unwrap();
343 assert_eq!(tested_result, encoded());
344 assert_eq!(n_bytes, 114);
345 }
346
347 #[tokio::test]
348 async fn decode() {
349 let mut test_data = Cursor::new(encoded());
350 let tested_result = ConnAck::read(&mut test_data).await.unwrap();
351 assert_eq!(tested_result, decoded());
352 }
353}