1use super::property::{property_decode, property_decode_non_zero, property_encode};
9use super::property::{property_len, Property, PropertyFrame};
10use crate::codec::util::{
11 decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
12 encode_variable_integer,
13};
14use crate::protocol::common::{connect, ConnectHeader};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::util::len_bytes;
17use crate::protocol::{Credentials, Protocol, QoS};
18use crate::Error;
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22use std::time::Duration;
23
24const WILL_FLAG: usize = 2;
26const WILL_QOS: RangeInclusive<usize> = 3..=4;
27const WILL_RETAIN: usize = 5;
28
29#[derive(Debug, Default, Clone, PartialEq, Eq)]
46pub struct ConnectProperties {
47 pub session_expiry_interval: Option<u32>,
49 pub receive_maximum: Option<u16>,
51 pub maximum_packet_size: Option<u32>,
53 pub topic_alias_maximum: Option<u16>,
55 pub request_response_info: Option<bool>,
57 pub request_problem_info: Option<bool>,
59 pub user_properties: Vec<(String, String)>,
61 pub auth_method: Option<String>,
63 pub auth_data: Option<Bytes>,
65}
66
67impl PropertyFrame for ConnectProperties {
68 fn encoded_len(&self) -> usize {
70 let mut len = 0;
71
72 len += property_len!(&self.session_expiry_interval);
73 len += property_len!(&self.receive_maximum);
74 len += property_len!(&self.maximum_packet_size);
75 len += property_len!(&self.topic_alias_maximum);
76 len += property_len!(&self.request_response_info);
77 len += property_len!(&self.request_problem_info);
78 len += property_len!(&self.user_properties);
79 len += property_len!(&self.auth_method);
80 len += property_len!(&self.auth_data);
81
82 len
83 }
84
85 fn encode(&self, buf: &mut BytesMut) {
87 property_encode!(
88 &self.session_expiry_interval,
89 Property::SessionExpiryInterval,
90 buf
91 );
92 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
93 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
94 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
95 property_encode!(
96 &self.request_response_info,
97 Property::RequestResponseInformation,
98 buf
99 );
100 property_encode!(
101 &self.request_problem_info,
102 Property::RequestProblemInformation,
103 buf
104 );
105 property_encode!(&self.user_properties, Property::UserProp, buf);
106 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
107 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
108 }
109
110 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
112 if buf.is_empty() {
113 return Ok(None);
114 }
115
116 let mut properties = ConnectProperties::default();
117
118 while buf.has_remaining() {
119 let property: Property = decode_byte(buf)?.try_into()?;
120 match property {
121 Property::SessionExpiryInterval => {
122 property_decode!(&mut properties.session_expiry_interval, buf);
123 }
124 Property::ReceiveMaximum => {
125 property_decode_non_zero!(&mut properties.receive_maximum, buf);
126 }
127 Property::MaximumPacketSize => {
128 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
129 }
130 Property::TopicAliasMaximum => {
131 property_decode!(&mut properties.topic_alias_maximum, buf);
132 }
133 Property::RequestResponseInformation => {
134 property_decode!(&mut properties.request_response_info, buf);
135 }
136 Property::RequestProblemInformation => {
137 property_decode!(&mut properties.request_problem_info, buf);
138 }
139 Property::UserProp => {
140 property_decode!(&mut properties.user_properties, buf);
141 }
142 Property::AuthenticationMethod => {
143 property_decode!(&mut properties.auth_method, buf);
144 }
145 Property::AuthenticationData => {
146 property_decode!(&mut properties.auth_data, buf);
147 }
148 _ => return Err(Error::PropertyMismatch),
149 };
150 }
151
152 if properties.auth_data.is_some() && properties.auth_method.is_none() {
153 return Err(Error::ProtocolError);
154 }
155
156 Ok(Some(properties))
157 }
158}
159
160impl ConnectFrame for ConnectHeader<ConnectProperties> {
161 fn encoded_len(&self) -> usize {
163 let properties_len = self
164 .properties
165 .as_ref()
166 .map(|properties| properties.encoded_len())
167 .unwrap_or(0);
168 properties_len + len_bytes(properties_len) + self.primary_encoded_len()
169 }
170
171 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
173 self.primary_encode(buf);
174
175 let properties_len = self
176 .properties
177 .as_ref()
178 .map(|properties| properties.encoded_len())
179 .unwrap_or(0) as u32;
180
181 encode_variable_integer(buf, properties_len)?;
182
183 if let Some(properties) = self.properties.as_ref() {
184 properties.encode(buf);
185 }
186 Ok(())
187 }
188
189 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
191 let mut header = Self::primary_decode(buf)?;
192
193 let properties_len = decode_variable_integer(buf)? as usize;
194 if buf.len() < properties_len + len_bytes(properties_len) {
195 return Err(Error::MalformedPacket);
196 }
197
198 buf.advance(len_bytes(properties_len));
200
201 let mut properties_buf = buf.split_to(properties_len);
202
203 header.properties = ConnectProperties::decode(&mut properties_buf)?;
205
206 Ok(header)
207 }
208}
209
210#[derive(Debug, Default, Clone, PartialEq, Eq)]
226pub struct WillProperties {
227 pub delay_interval: Option<u32>,
229 pub payload_format_indicator: Option<u8>,
231 pub message_expiry_interval: Option<u32>,
233 pub content_type: Option<String>,
235 pub response_topic: Option<String>,
237 pub correlation_data: Option<Bytes>,
239 pub user_properties: Vec<(String, String)>,
241}
242
243impl PropertyFrame for WillProperties {
244 fn encoded_len(&self) -> usize {
246 let mut len = 0;
247
248 len += property_len!(&self.delay_interval);
249 len += property_len!(&self.payload_format_indicator);
250 len += property_len!(&self.message_expiry_interval);
251 len += property_len!(&self.content_type);
252 len += property_len!(&self.response_topic);
253 len += property_len!(&self.correlation_data);
254 len += property_len!(&self.user_properties);
255
256 len
257 }
258
259 fn encode(&self, buf: &mut BytesMut) {
261 property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
262 property_encode!(
263 &self.payload_format_indicator,
264 Property::PayloadFormatIndicator,
265 buf
266 );
267 property_encode!(
268 &self.message_expiry_interval,
269 Property::MessageExpiryInterval,
270 buf
271 );
272 property_encode!(&self.content_type, Property::ContentType, buf);
273 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
274 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
275 property_encode!(&self.user_properties, Property::UserProp, buf);
276 }
277
278 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
280 if buf.is_empty() {
281 return Ok(None);
282 }
283
284 let mut properties = WillProperties::default();
285
286 while buf.has_remaining() {
287 let property: Property = decode_byte(buf)?.try_into()?;
288 match property {
289 Property::WillDelayInterval => {
290 property_decode!(&mut properties.delay_interval, buf);
291 }
292 Property::PayloadFormatIndicator => {
293 property_decode!(&mut properties.payload_format_indicator, buf);
294 if let Some(value) = properties.payload_format_indicator {
295 if value != 0 && value != 1 {
296 return Err(Error::ProtocolError);
297 }
298 }
299 }
300 Property::MessageExpiryInterval => {
301 property_decode!(&mut properties.message_expiry_interval, buf);
302 }
303 Property::ContentType => {
304 property_decode!(&mut properties.content_type, buf);
305 }
306 Property::ResponseTopic => {
307 property_decode!(&mut properties.response_topic, buf);
308 }
309 Property::CorrelationData => {
310 property_decode!(&mut properties.correlation_data, buf);
311 }
312 Property::UserProp => {
313 property_decode!(&mut properties.user_properties, buf);
314 }
315 _ => return Err(Error::PropertyMismatch),
316 }
317 }
318
319 Ok(Some(properties))
320 }
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct Will {
339 pub properties: Option<WillProperties>,
341 pub topic: String,
343 pub payload: Bytes,
345 pub qos: QoS,
347 pub retain: bool,
349}
350
351impl Will {
352 pub fn new<T: Into<String>>(
354 properties: Option<WillProperties>,
355 topic: T,
356 payload: Bytes,
357 qos: QoS,
358 retain: bool,
359 ) -> Will {
360 Will {
361 properties,
362 topic: topic.into(),
363 payload,
364 qos,
365 retain,
366 }
367 }
368}
369
370impl WillFrame for Will {
371 fn encoded_len(&self) -> usize {
373 let properties_len = self
374 .properties
375 .as_ref()
376 .map(|properties| properties.encoded_len())
377 .unwrap_or(0);
378
379 2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
380 }
381
382 fn update_flags(&self, flags: &mut u8) {
384 flags.set_bit(WILL_FLAG, true);
386
387 flags.set_bits(WILL_QOS, self.qos as u8);
389
390 flags.set_bit(WILL_RETAIN, self.retain);
392 }
393
394 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
396 let properties_len = self
397 .properties
398 .as_ref()
399 .map(|properties| properties.encoded_len())
400 .unwrap_or(0) as u32;
401
402 encode_variable_integer(buf, properties_len)?;
403
404 if let Some(properties) = self.properties.as_ref() {
405 properties.encode(buf);
406 }
407
408 encode_string(buf, &self.topic);
409 encode_bytes(buf, &self.payload);
410 Ok(())
411 }
412
413 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
415 if !flags.get_bit(WILL_FLAG) {
416 return Ok(None);
418 }
419
420 let properties_len = decode_variable_integer(buf)? as usize;
421 if buf.len() < properties_len + len_bytes(properties_len) {
422 return Err(Error::MalformedPacket);
423 }
424
425 buf.advance(len_bytes(properties_len));
427 let mut properties_buf = buf.split_to(properties_len);
428 let properties = WillProperties::decode(&mut properties_buf)?;
429 let qos = flags.get_bits(WILL_QOS).try_into()?;
430 let retain = flags.get_bit(WILL_RETAIN);
431
432 let topic = decode_string(buf)?;
433 let payload = decode_bytes(buf)?;
434
435 Ok(Some(Will {
436 properties,
437 topic,
438 payload,
439 qos,
440 retain,
441 }))
442 }
443}
444
445connect!(Connect<ConnectProperties, Will>, Protocol::V5);
447
448impl Connect {
449 pub fn with_properties<S: Into<String>>(
455 client_id: S,
456 auth: Option<Credentials>,
457 will: Option<Will>,
458 properties: ConnectProperties,
459 keep_alive: Duration,
460 clean_session: bool,
461 ) -> Self {
462 Self::from_scratch(
463 client_id,
464 auth,
465 will,
466 Some(properties),
467 keep_alive,
468 clean_session,
469 )
470 }
471
472 pub fn properties(&self) -> Option<ConnectProperties> {
474 self.header.properties.clone()
475 }
476}