1use super::property::{property_decode, property_decode_non_zero, property_encode};
9use super::property::{property_len, Property, PropertyFrame};
10use crate::codec::util::{
11 decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
12 encode_variable_integer,
13};
14use crate::protocol::common::{connect, ConnectHeader};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::util::len_bytes;
17use crate::protocol::{Credentials, Protocol, QoS};
18use crate::Error;
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22
23const WILL_FLAG: usize = 2;
25const WILL_QOS: RangeInclusive<usize> = 3..=4;
26const WILL_RETAIN: usize = 5;
27
28#[derive(Debug, Default, Clone, PartialEq, Eq)]
45pub struct ConnectProperties {
46 pub session_expiry_interval: Option<u32>,
48 pub receive_maximum: Option<u16>,
50 pub maximum_packet_size: Option<u32>,
52 pub topic_alias_maximum: Option<u16>,
54 pub request_response_info: Option<bool>,
56 pub request_problem_info: Option<bool>,
58 pub user_properties: Vec<(String, String)>,
60 pub auth_method: Option<String>,
62 pub auth_data: Option<Bytes>,
64}
65
66impl PropertyFrame for ConnectProperties {
67 fn encoded_len(&self) -> usize {
69 let mut len = 0;
70
71 len += property_len!(&self.session_expiry_interval);
72 len += property_len!(&self.receive_maximum);
73 len += property_len!(&self.maximum_packet_size);
74 len += property_len!(&self.topic_alias_maximum);
75 len += property_len!(&self.request_response_info);
76 len += property_len!(&self.request_problem_info);
77 len += property_len!(&self.user_properties);
78 len += property_len!(&self.auth_method);
79 len += property_len!(&self.auth_data);
80
81 len
82 }
83
84 fn encode(&self, buf: &mut BytesMut) {
86 property_encode!(
87 &self.session_expiry_interval,
88 Property::SessionExpiryInterval,
89 buf
90 );
91 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
92 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
93 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
94 property_encode!(
95 &self.request_response_info,
96 Property::RequestResponseInformation,
97 buf
98 );
99 property_encode!(
100 &self.request_problem_info,
101 Property::RequestProblemInformation,
102 buf
103 );
104 property_encode!(&self.user_properties, Property::UserProp, buf);
105 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
106 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
107 }
108
109 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
111 if buf.is_empty() {
112 return Ok(None);
113 }
114
115 let mut properties = ConnectProperties::default();
116
117 while buf.has_remaining() {
118 let property: Property = decode_byte(buf)?.try_into()?;
119 match property {
120 Property::SessionExpiryInterval => {
121 property_decode!(&mut properties.session_expiry_interval, buf);
122 }
123 Property::ReceiveMaximum => {
124 property_decode_non_zero!(&mut properties.receive_maximum, buf);
125 }
126 Property::MaximumPacketSize => {
127 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
128 }
129 Property::TopicAliasMaximum => {
130 property_decode!(&mut properties.topic_alias_maximum, buf);
131 }
132 Property::RequestResponseInformation => {
133 property_decode!(&mut properties.request_response_info, buf);
134 }
135 Property::RequestProblemInformation => {
136 property_decode!(&mut properties.request_problem_info, buf);
137 }
138 Property::UserProp => {
139 property_decode!(&mut properties.user_properties, buf);
140 }
141 Property::AuthenticationMethod => {
142 property_decode!(&mut properties.auth_method, buf);
143 }
144 Property::AuthenticationData => {
145 property_decode!(&mut properties.auth_data, buf);
146 }
147 _ => return Err(Error::PropertyMismatch),
148 };
149 }
150
151 if properties.auth_data.is_some() && properties.auth_method.is_none() {
152 return Err(Error::ProtocolError);
153 }
154
155 Ok(Some(properties))
156 }
157}
158
159impl ConnectFrame for ConnectHeader<ConnectProperties> {
160 fn encoded_len(&self) -> usize {
162 let properties_len = self
163 .properties
164 .as_ref()
165 .map(|properties| properties.encoded_len())
166 .unwrap_or(0);
167 properties_len + len_bytes(properties_len) + self.primary_encoded_len()
168 }
169
170 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
172 self.primary_encode(buf);
173
174 let properties_len = self
175 .properties
176 .as_ref()
177 .map(|properties| properties.encoded_len())
178 .unwrap_or(0) as u32;
179
180 encode_variable_integer(buf, properties_len)?;
181
182 if let Some(properties) = self.properties.as_ref() {
183 properties.encode(buf);
184 }
185 Ok(())
186 }
187
188 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
190 let mut header = Self::primary_decode(buf)?;
191
192 let properties_len = decode_variable_integer(buf)? as usize;
193 if buf.len() < properties_len + len_bytes(properties_len) {
194 return Err(Error::MalformedPacket);
195 }
196
197 buf.advance(len_bytes(properties_len));
199
200 let mut properties_buf = buf.split_to(properties_len);
201
202 header.properties = ConnectProperties::decode(&mut properties_buf)?;
204
205 Ok(header)
206 }
207}
208
209#[derive(Debug, Default, Clone, PartialEq, Eq)]
225pub struct WillProperties {
226 pub delay_interval: Option<u32>,
228 pub payload_format_indicator: Option<u8>,
230 pub message_expiry_interval: Option<u32>,
232 pub content_type: Option<String>,
234 pub response_topic: Option<String>,
236 pub correlation_data: Option<Bytes>,
238 pub user_properties: Vec<(String, String)>,
240}
241
242impl PropertyFrame for WillProperties {
243 fn encoded_len(&self) -> usize {
245 let mut len = 0;
246
247 len += property_len!(&self.delay_interval);
248 len += property_len!(&self.payload_format_indicator);
249 len += property_len!(&self.message_expiry_interval);
250 len += property_len!(&self.content_type);
251 len += property_len!(&self.response_topic);
252 len += property_len!(&self.correlation_data);
253 len += property_len!(&self.user_properties);
254
255 len
256 }
257
258 fn encode(&self, buf: &mut BytesMut) {
260 property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
261 property_encode!(
262 &self.payload_format_indicator,
263 Property::PayloadFormatIndicator,
264 buf
265 );
266 property_encode!(
267 &self.message_expiry_interval,
268 Property::MessageExpiryInterval,
269 buf
270 );
271 property_encode!(&self.content_type, Property::ContentType, buf);
272 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
273 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
274 property_encode!(&self.user_properties, Property::UserProp, buf);
275 }
276
277 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
279 if buf.is_empty() {
280 return Ok(None);
281 }
282
283 let mut properties = WillProperties::default();
284
285 while buf.has_remaining() {
286 let property: Property = decode_byte(buf)?.try_into()?;
287 match property {
288 Property::WillDelayInterval => {
289 property_decode!(&mut properties.delay_interval, buf);
290 }
291 Property::PayloadFormatIndicator => {
292 property_decode!(&mut properties.payload_format_indicator, buf);
293 if let Some(value) = properties.payload_format_indicator {
294 if value != 0 && value != 1 {
295 return Err(Error::ProtocolError);
296 }
297 }
298 }
299 Property::MessageExpiryInterval => {
300 property_decode!(&mut properties.message_expiry_interval, buf);
301 }
302 Property::ContentType => {
303 property_decode!(&mut properties.content_type, buf);
304 }
305 Property::ResponseTopic => {
306 property_decode!(&mut properties.response_topic, buf);
307 }
308 Property::CorrelationData => {
309 property_decode!(&mut properties.correlation_data, buf);
310 }
311 Property::UserProp => {
312 property_decode!(&mut properties.user_properties, buf);
313 }
314 _ => return Err(Error::PropertyMismatch),
315 }
316 }
317
318 Ok(Some(properties))
319 }
320}
321
322#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct Will {
338 pub properties: Option<WillProperties>,
340 pub topic: String,
342 pub payload: Bytes,
344 pub qos: QoS,
346 pub retain: bool,
348}
349
350impl Will {
351 pub fn new<T: Into<String>>(
353 properties: Option<WillProperties>,
354 topic: T,
355 payload: Bytes,
356 qos: QoS,
357 retain: bool,
358 ) -> Will {
359 Will {
360 properties,
361 topic: topic.into(),
362 payload,
363 qos,
364 retain,
365 }
366 }
367}
368
369impl WillFrame for Will {
370 fn encoded_len(&self) -> usize {
372 let properties_len = self
373 .properties
374 .as_ref()
375 .map(|properties| properties.encoded_len())
376 .unwrap_or(0);
377
378 2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
379 }
380
381 fn update_flags(&self, flags: &mut u8) {
383 flags.set_bit(WILL_FLAG, true);
385
386 flags.set_bits(WILL_QOS, self.qos as u8);
388
389 flags.set_bit(WILL_RETAIN, self.retain);
391 }
392
393 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
395 let properties_len = self
396 .properties
397 .as_ref()
398 .map(|properties| properties.encoded_len())
399 .unwrap_or(0) as u32;
400
401 encode_variable_integer(buf, properties_len)?;
402
403 if let Some(properties) = self.properties.as_ref() {
404 properties.encode(buf);
405 }
406
407 encode_string(buf, &self.topic);
408 encode_bytes(buf, &self.payload);
409 Ok(())
410 }
411
412 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
414 if !flags.get_bit(WILL_FLAG) {
415 return Ok(None);
417 }
418
419 let properties_len = decode_variable_integer(buf)? as usize;
420 if buf.len() < properties_len + len_bytes(properties_len) {
421 return Err(Error::MalformedPacket);
422 }
423
424 buf.advance(len_bytes(properties_len));
426 let mut properties_buf = buf.split_to(properties_len);
427 let properties = WillProperties::decode(&mut properties_buf)?;
428 let qos = flags.get_bits(WILL_QOS).try_into()?;
429 let retain = flags.get_bit(WILL_RETAIN);
430
431 let topic = decode_string(buf)?;
432 let payload = decode_bytes(buf)?;
433
434 Ok(Some(Will {
435 properties,
436 topic,
437 payload,
438 qos,
439 retain,
440 }))
441 }
442}
443
444connect!(Connect<ConnectProperties, Will>, Protocol::V5);
446
447impl Connect {
448 pub fn with_properties<S: Into<String>>(
450 client_id: S,
451 auth: Option<Credentials>,
452 will: Option<Will>,
453 properties: ConnectProperties,
454 keep_alive: u16,
455 clean_session: bool,
456 ) -> Self {
457 Self::from_scratch(
458 client_id,
459 auth,
460 will,
461 Some(properties),
462 keep_alive,
463 clean_session,
464 )
465 }
466
467 pub fn properties(&self) -> Option<ConnectProperties> {
469 self.header.properties.clone()
470 }
471}