1use super::property::{Property, PropertyFrame, property_len};
9use super::property::{property_decode, property_decode_non_zero, property_encode};
10use crate::Error;
11use crate::codec::util::{
12 decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
13 encode_variable_integer,
14};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::common::{ConnectHeader, connect};
17use crate::protocol::util::len_bytes;
18use crate::protocol::{Credentials, Protocol, QoS, util};
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22use std::time::Duration;
23
24const WILL_FLAG: usize = 2;
26const WILL_QOS: RangeInclusive<usize> = 3..=4;
27const WILL_RETAIN: usize = 5;
28
29#[derive(Debug, Default, Clone, PartialEq, Eq)]
47pub struct ConnectProperties {
48 pub session_expiry_interval: Option<Duration>,
50 pub receive_maximum: Option<u16>,
52 pub maximum_packet_size: Option<u32>,
54 pub topic_alias_maximum: Option<u16>,
56 pub request_response_info: Option<bool>,
58 pub request_problem_info: Option<bool>,
60 pub user_properties: Vec<(String, String)>,
62 pub auth_method: Option<String>,
64 pub auth_data: Option<Bytes>,
66}
67
68impl PropertyFrame for ConnectProperties {
69 fn encoded_len(&self) -> usize {
71 let mut len = 0;
72
73 len += property_len!(&self.session_expiry_interval);
74 len += property_len!(&self.receive_maximum);
75 len += property_len!(&self.maximum_packet_size);
76 len += property_len!(&self.topic_alias_maximum);
77 len += property_len!(&self.request_response_info);
78 len += property_len!(&self.request_problem_info);
79 len += property_len!(&self.user_properties);
80 len += property_len!(&self.auth_method);
81 len += property_len!(&self.auth_data);
82
83 len
84 }
85
86 fn encode(&self, buf: &mut BytesMut) {
88 property_encode!(
89 &self.session_expiry_interval,
90 Property::SessionExpiryInterval,
91 buf
92 );
93 property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
94 property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
95 property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
96 property_encode!(
97 &self.request_response_info,
98 Property::RequestResponseInformation,
99 buf
100 );
101 property_encode!(
102 &self.request_problem_info,
103 Property::RequestProblemInformation,
104 buf
105 );
106 property_encode!(&self.user_properties, Property::UserProp, buf);
107 property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
108 property_encode!(&self.auth_data, Property::AuthenticationData, buf);
109 }
110
111 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
113 if buf.is_empty() {
114 return Ok(None);
115 }
116
117 let mut properties = ConnectProperties::default();
118
119 while buf.has_remaining() {
120 let property: Property = decode_byte(buf)?.try_into()?;
121 match property {
122 Property::SessionExpiryInterval => {
123 property_decode!(&mut properties.session_expiry_interval, buf);
124 }
125 Property::ReceiveMaximum => {
126 property_decode_non_zero!(&mut properties.receive_maximum, buf);
127 }
128 Property::MaximumPacketSize => {
129 property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
130 }
131 Property::TopicAliasMaximum => {
132 property_decode!(&mut properties.topic_alias_maximum, buf);
133 }
134 Property::RequestResponseInformation => {
135 property_decode!(&mut properties.request_response_info, buf);
136 }
137 Property::RequestProblemInformation => {
138 property_decode!(&mut properties.request_problem_info, buf);
139 }
140 Property::UserProp => {
141 property_decode!(&mut properties.user_properties, buf);
142 }
143 Property::AuthenticationMethod => {
144 property_decode!(&mut properties.auth_method, buf);
145 }
146 Property::AuthenticationData => {
147 property_decode!(&mut properties.auth_data, buf);
148 }
149 _ => return Err(Error::PropertyMismatch),
150 };
151 }
152
153 if properties.auth_data.is_some() && properties.auth_method.is_none() {
154 return Err(Error::ProtocolError);
155 }
156
157 Ok(Some(properties))
158 }
159}
160
161impl ConnectFrame for ConnectHeader<ConnectProperties> {
162 fn encoded_len(&self) -> usize {
164 let properties_len = self
165 .properties
166 .as_ref()
167 .map(|properties| properties.encoded_len())
168 .unwrap_or(0);
169 properties_len + len_bytes(properties_len) + self.primary_encoded_len()
170 }
171
172 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
174 self.primary_encode(buf);
175
176 let properties_len = self
177 .properties
178 .as_ref()
179 .map(|properties| properties.encoded_len())
180 .unwrap_or(0) as u32;
181
182 encode_variable_integer(buf, properties_len)?;
183
184 if let Some(properties) = self.properties.as_ref() {
185 properties.encode(buf);
186 }
187 Ok(())
188 }
189
190 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
192 let mut header = Self::primary_decode(buf)?;
193
194 let properties_len = decode_variable_integer(buf)? as usize;
195 if buf.len() < properties_len + len_bytes(properties_len) {
196 return Err(Error::MalformedPacket);
197 }
198
199 buf.advance(len_bytes(properties_len));
201
202 let mut properties_buf = buf.split_to(properties_len);
203
204 header.properties = ConnectProperties::decode(&mut properties_buf)?;
206
207 Ok(header)
208 }
209}
210
211#[derive(Debug, Default, Clone, PartialEq, Eq)]
228pub struct WillProperties {
229 pub delay_interval: Option<Duration>,
231 pub payload_format_indicator: Option<u8>,
233 pub message_expiry_interval: Option<Duration>,
235 pub content_type: Option<String>,
237 pub response_topic: Option<String>,
239 pub correlation_data: Option<Bytes>,
241 pub user_properties: Vec<(String, String)>,
243}
244
245impl PropertyFrame for WillProperties {
246 fn encoded_len(&self) -> usize {
248 let mut len = 0;
249
250 len += property_len!(&self.delay_interval);
251 len += property_len!(&self.payload_format_indicator);
252 len += property_len!(&self.message_expiry_interval);
253 len += property_len!(&self.content_type);
254 len += property_len!(&self.response_topic);
255 len += property_len!(&self.correlation_data);
256 len += property_len!(&self.user_properties);
257
258 len
259 }
260
261 fn encode(&self, buf: &mut BytesMut) {
263 property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
264 property_encode!(
265 &self.payload_format_indicator,
266 Property::PayloadFormatIndicator,
267 buf
268 );
269 property_encode!(
270 &self.message_expiry_interval,
271 Property::MessageExpiryInterval,
272 buf
273 );
274 property_encode!(&self.content_type, Property::ContentType, buf);
275 property_encode!(&self.response_topic, Property::ResponseTopic, buf);
276 property_encode!(&self.correlation_data, Property::CorrelationData, buf);
277 property_encode!(&self.user_properties, Property::UserProp, buf);
278 }
279
280 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
282 if buf.is_empty() {
283 return Ok(None);
284 }
285
286 let mut properties = WillProperties::default();
287
288 while buf.has_remaining() {
289 let property: Property = decode_byte(buf)?.try_into()?;
290 match property {
291 Property::WillDelayInterval => {
292 property_decode!(&mut properties.delay_interval, buf);
293 }
294 Property::PayloadFormatIndicator => {
295 property_decode!(&mut properties.payload_format_indicator, buf);
296 if let Some(value) = properties.payload_format_indicator {
297 if value != 0 && value != 1 {
298 return Err(Error::ProtocolError);
299 }
300 }
301 }
302 Property::MessageExpiryInterval => {
303 property_decode!(&mut properties.message_expiry_interval, buf);
304 }
305 Property::ContentType => {
306 property_decode!(&mut properties.content_type, buf);
307 }
308 Property::ResponseTopic => {
309 property_decode!(&mut properties.response_topic, buf);
310 }
311 Property::CorrelationData => {
312 property_decode!(&mut properties.correlation_data, buf);
313 }
314 Property::UserProp => {
315 property_decode!(&mut properties.user_properties, buf);
316 }
317 _ => return Err(Error::PropertyMismatch),
318 }
319 }
320
321 Ok(Some(properties))
322 }
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct Will {
342 pub properties: Option<WillProperties>,
344 pub topic: String,
346 pub payload: Bytes,
348 pub qos: QoS,
350 pub retain: bool,
352}
353
354impl Will {
355 pub fn new<T: Into<String>>(
361 properties: Option<WillProperties>,
362 topic: T,
363 payload: Bytes,
364 qos: QoS,
365 retain: bool,
366 ) -> Self {
367 let topic = topic.into();
368
369 if !util::is_valid_topic_name(&topic) {
370 panic!("Invalid topic name: '{}'", topic);
371 }
372
373 Will {
374 properties,
375 topic,
376 payload,
377 qos,
378 retain,
379 }
380 }
381}
382
383impl WillFrame for Will {
384 fn encoded_len(&self) -> usize {
386 let properties_len = self
387 .properties
388 .as_ref()
389 .map(|properties| properties.encoded_len())
390 .unwrap_or(0);
391
392 2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
393 }
394
395 fn update_flags(&self, flags: &mut u8) {
397 flags.set_bit(WILL_FLAG, true);
399
400 flags.set_bits(WILL_QOS, self.qos as u8);
402
403 flags.set_bit(WILL_RETAIN, self.retain);
405 }
406
407 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
409 let properties_len = self
410 .properties
411 .as_ref()
412 .map(|properties| properties.encoded_len())
413 .unwrap_or(0) as u32;
414
415 encode_variable_integer(buf, properties_len)?;
416
417 if let Some(properties) = self.properties.as_ref() {
418 properties.encode(buf);
419 }
420
421 encode_string(buf, &self.topic);
422 encode_bytes(buf, &self.payload);
423 Ok(())
424 }
425
426 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
428 if !flags.get_bit(WILL_FLAG) {
429 return Ok(None);
431 }
432
433 let properties_len = decode_variable_integer(buf)? as usize;
434 if buf.len() < properties_len + len_bytes(properties_len) {
435 return Err(Error::MalformedPacket);
436 }
437
438 buf.advance(len_bytes(properties_len));
440 let mut properties_buf = buf.split_to(properties_len);
441 let properties = WillProperties::decode(&mut properties_buf)?;
442 let qos = flags.get_bits(WILL_QOS).try_into()?;
443 let retain = flags.get_bit(WILL_RETAIN);
444
445 let topic = decode_string(buf)?;
446
447 if !util::is_valid_topic_name(&topic) {
448 return Err(Error::InvalidTopicName(topic));
449 }
450
451 let payload = decode_bytes(buf)?;
452
453 Ok(Some(Will {
454 properties,
455 topic,
456 payload,
457 qos,
458 retain,
459 }))
460 }
461}
462
463connect!(Connect<ConnectProperties, Will>, Protocol::V5);
465
466impl Connect {
467 pub fn with_properties<S: Into<String>>(
473 client_id: S,
474 auth: Option<Credentials>,
475 will: Option<Will>,
476 properties: ConnectProperties,
477 keep_alive: Duration,
478 clean_session: bool,
479 ) -> Self {
480 Self::from_scratch(
481 client_id,
482 auth,
483 will,
484 Some(properties),
485 keep_alive,
486 clean_session,
487 )
488 }
489
490 pub fn properties(&self) -> Option<ConnectProperties> {
492 self.header.properties.clone()
493 }
494}