1use super::property::{Property, PropertyFrame, property_len};
9use super::property::{property_decode, property_decode_non_zero, property_encode};
10use crate::Error;
11use crate::codec::util::{
12 decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
13 encode_variable_integer,
14};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::common::{ConnectHeader, connect};
17use crate::protocol::util::len_bytes;
18use crate::protocol::{Credentials, Protocol, QoS};
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22use std::time::Duration;
23
24const WILL_FLAG: usize = 2;
26const WILL_QOS: RangeInclusive<usize> = 3..=4;
27const WILL_RETAIN: usize = 5;
28
29#[derive(Debug, Default, Clone, PartialEq, Eq)]
47pub struct ConnectProperties {
48 pub session_expiry_interval: Option<Duration>,
50 pub receive_maximum: Option<u16>,
52 pub maximum_packet_size: Option<u32>,
54 pub topic_alias_maximum: Option<u16>,
56 pub request_response_info: Option<bool>,
58 pub request_problem_info: Option<bool>,
60 pub user_properties: Vec<(String, String)>,
62 pub auth_method: Option<String>,
64 pub auth_data: Option<Bytes>,
66}
67
68impl PropertyFrame for ConnectProperties {
69 fn encoded_len(&self) -> usize {
71 let mut len = 0;
72
73 len += property_len!(&self.session_expiry_interval);
74 len += property_len!(&self.receive_maximum);
75 len += property_len!(&self.maximum_packet_size);
76 len += property_len!(&self.topic_alias_maximum);
77 len += property_len!(&self.request_response_info);
78 len += property_len!(&self.request_problem_info);
79 len += property_len!(&self.user_properties);
80 len += property_len!(&self.auth_method);
81 len += property_len!(&self.auth_data);
82
83 len
84 }
85
86 fn encode(&self, buf: &mut BytesMut) {
88 property_encode!(
89 &self.session_expiry_interval,
90 Property::SessionExpiryInterval,
91 buf
92 );
93 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
94 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
95 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
96 property_encode!(
97 &self.request_response_info,
98 Property::RequestResponseInformation,
99 buf
100 );
101 property_encode!(
102 &self.request_problem_info,
103 Property::RequestProblemInformation,
104 buf
105 );
106 property_encode!(&self.user_properties, Property::UserProp, buf);
107 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
108 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
109 }
110
111 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
113 if buf.is_empty() {
114 return Ok(None);
115 }
116
117 let mut properties = ConnectProperties::default();
118
119 while buf.has_remaining() {
120 let property: Property = decode_byte(buf)?.try_into()?;
121 match property {
122 Property::SessionExpiryInterval => {
123 property_decode!(&mut properties.session_expiry_interval, buf);
124 }
125 Property::ReceiveMaximum => {
126 property_decode_non_zero!(&mut properties.receive_maximum, buf);
127 }
128 Property::MaximumPacketSize => {
129 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
130 }
131 Property::TopicAliasMaximum => {
132 property_decode!(&mut properties.topic_alias_maximum, buf);
133 }
134 Property::RequestResponseInformation => {
135 property_decode!(&mut properties.request_response_info, buf);
136 }
137 Property::RequestProblemInformation => {
138 property_decode!(&mut properties.request_problem_info, buf);
139 }
140 Property::UserProp => {
141 property_decode!(&mut properties.user_properties, buf);
142 }
143 Property::AuthenticationMethod => {
144 property_decode!(&mut properties.auth_method, buf);
145 }
146 Property::AuthenticationData => {
147 property_decode!(&mut properties.auth_data, buf);
148 }
149 _ => return Err(Error::PropertyMismatch),
150 };
151 }
152
153 if properties.auth_data.is_some() && properties.auth_method.is_none() {
154 return Err(Error::ProtocolError);
155 }
156
157 Ok(Some(properties))
158 }
159}
160
161impl ConnectFrame for ConnectHeader<ConnectProperties> {
162 fn encoded_len(&self) -> usize {
164 let properties_len = self
165 .properties
166 .as_ref()
167 .map(|properties| properties.encoded_len())
168 .unwrap_or(0);
169 properties_len + len_bytes(properties_len) + self.primary_encoded_len()
170 }
171
172 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
174 self.primary_encode(buf);
175
176 let properties_len = self
177 .properties
178 .as_ref()
179 .map(|properties| properties.encoded_len())
180 .unwrap_or(0) as u32;
181
182 encode_variable_integer(buf, properties_len)?;
183
184 if let Some(properties) = self.properties.as_ref() {
185 properties.encode(buf);
186 }
187 Ok(())
188 }
189
190 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
192 let mut header = Self::primary_decode(buf)?;
193
194 let properties_len = decode_variable_integer(buf)? as usize;
195 if buf.len() < properties_len + len_bytes(properties_len) {
196 return Err(Error::MalformedPacket);
197 }
198
199 buf.advance(len_bytes(properties_len));
201
202 let mut properties_buf = buf.split_to(properties_len);
203
204 header.properties = ConnectProperties::decode(&mut properties_buf)?;
206
207 Ok(header)
208 }
209}
210
211#[derive(Debug, Default, Clone, PartialEq, Eq)]
228pub struct WillProperties {
229 pub delay_interval: Option<Duration>,
231 pub payload_format_indicator: Option<u8>,
233 pub message_expiry_interval: Option<Duration>,
235 pub content_type: Option<String>,
237 pub response_topic: Option<String>,
239 pub correlation_data: Option<Bytes>,
241 pub user_properties: Vec<(String, String)>,
243}
244
245impl PropertyFrame for WillProperties {
246 fn encoded_len(&self) -> usize {
248 let mut len = 0;
249
250 len += property_len!(&self.delay_interval);
251 len += property_len!(&self.payload_format_indicator);
252 len += property_len!(&self.message_expiry_interval);
253 len += property_len!(&self.content_type);
254 len += property_len!(&self.response_topic);
255 len += property_len!(&self.correlation_data);
256 len += property_len!(&self.user_properties);
257
258 len
259 }
260
261 fn encode(&self, buf: &mut BytesMut) {
263 property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
264 property_encode!(
265 &self.payload_format_indicator,
266 Property::PayloadFormatIndicator,
267 buf
268 );
269 property_encode!(
270 &self.message_expiry_interval,
271 Property::MessageExpiryInterval,
272 buf
273 );
274 property_encode!(&self.content_type, Property::ContentType, buf);
275 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
276 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
277 property_encode!(&self.user_properties, Property::UserProp, buf);
278 }
279
280 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
282 if buf.is_empty() {
283 return Ok(None);
284 }
285
286 let mut properties = WillProperties::default();
287
288 while buf.has_remaining() {
289 let property: Property = decode_byte(buf)?.try_into()?;
290 match property {
291 Property::WillDelayInterval => {
292 property_decode!(&mut properties.delay_interval, buf);
293 }
294 Property::PayloadFormatIndicator => {
295 property_decode!(&mut properties.payload_format_indicator, buf);
296 if let Some(value) = properties.payload_format_indicator {
297 if value != 0 && value != 1 {
298 return Err(Error::ProtocolError);
299 }
300 }
301 }
302 Property::MessageExpiryInterval => {
303 property_decode!(&mut properties.message_expiry_interval, buf);
304 }
305 Property::ContentType => {
306 property_decode!(&mut properties.content_type, buf);
307 }
308 Property::ResponseTopic => {
309 property_decode!(&mut properties.response_topic, buf);
310 }
311 Property::CorrelationData => {
312 property_decode!(&mut properties.correlation_data, buf);
313 }
314 Property::UserProp => {
315 property_decode!(&mut properties.user_properties, buf);
316 }
317 _ => return Err(Error::PropertyMismatch),
318 }
319 }
320
321 Ok(Some(properties))
322 }
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct Will {
341 pub properties: Option<WillProperties>,
343 pub topic: String,
345 pub payload: Bytes,
347 pub qos: QoS,
349 pub retain: bool,
351}
352
353impl Will {
354 pub fn new<T: Into<String>>(
356 properties: Option<WillProperties>,
357 topic: T,
358 payload: Bytes,
359 qos: QoS,
360 retain: bool,
361 ) -> Will {
362 Will {
363 properties,
364 topic: topic.into(),
365 payload,
366 qos,
367 retain,
368 }
369 }
370}
371
372impl WillFrame for Will {
373 fn encoded_len(&self) -> usize {
375 let properties_len = self
376 .properties
377 .as_ref()
378 .map(|properties| properties.encoded_len())
379 .unwrap_or(0);
380
381 2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
382 }
383
384 fn update_flags(&self, flags: &mut u8) {
386 flags.set_bit(WILL_FLAG, true);
388
389 flags.set_bits(WILL_QOS, self.qos as u8);
391
392 flags.set_bit(WILL_RETAIN, self.retain);
394 }
395
396 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
398 let properties_len = self
399 .properties
400 .as_ref()
401 .map(|properties| properties.encoded_len())
402 .unwrap_or(0) as u32;
403
404 encode_variable_integer(buf, properties_len)?;
405
406 if let Some(properties) = self.properties.as_ref() {
407 properties.encode(buf);
408 }
409
410 encode_string(buf, &self.topic);
411 encode_bytes(buf, &self.payload);
412 Ok(())
413 }
414
415 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
417 if !flags.get_bit(WILL_FLAG) {
418 return Ok(None);
420 }
421
422 let properties_len = decode_variable_integer(buf)? as usize;
423 if buf.len() < properties_len + len_bytes(properties_len) {
424 return Err(Error::MalformedPacket);
425 }
426
427 buf.advance(len_bytes(properties_len));
429 let mut properties_buf = buf.split_to(properties_len);
430 let properties = WillProperties::decode(&mut properties_buf)?;
431 let qos = flags.get_bits(WILL_QOS).try_into()?;
432 let retain = flags.get_bit(WILL_RETAIN);
433
434 let topic = decode_string(buf)?;
435 let payload = decode_bytes(buf)?;
436
437 Ok(Some(Will {
438 properties,
439 topic,
440 payload,
441 qos,
442 retain,
443 }))
444 }
445}
446
447connect!(Connect<ConnectProperties, Will>, Protocol::V5);
449
450impl Connect {
451 pub fn with_properties<S: Into<String>>(
457 client_id: S,
458 auth: Option<Credentials>,
459 will: Option<Will>,
460 properties: ConnectProperties,
461 keep_alive: Duration,
462 clean_session: bool,
463 ) -> Self {
464 Self::from_scratch(
465 client_id,
466 auth,
467 will,
468 Some(properties),
469 keep_alive,
470 clean_session,
471 )
472 }
473
474 pub fn properties(&self) -> Option<ConnectProperties> {
476 self.header.properties.clone()
477 }
478}