mqute_codec/protocol/v5/
connect.rs

1//! # Connect Packet V5
2//!
3//! This module provides the complete implementation of the MQTT v5 Connect packet,
4//! including its properties, will message handling, and authentication support.
5//! The Connect packet is the first packet sent by a client to initiate a connection
6//! with an MQTT broker.
7
8use super::property::{Property, PropertyFrame, property_len};
9use super::property::{property_decode, property_decode_non_zero, property_encode};
10use crate::Error;
11use crate::codec::util::{
12    decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
13    encode_variable_integer,
14};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::common::{ConnectHeader, connect};
17use crate::protocol::util::len_bytes;
18use crate::protocol::{Credentials, Protocol, QoS, util};
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22use std::time::Duration;
23
24/// Bit flag positions for Connect packet flags
25const WILL_FLAG: usize = 2;
26const WILL_QOS: RangeInclusive<usize> = 3..=4;
27const WILL_RETAIN: usize = 5;
28
29/// Represents the properties of a Connect packet in MQTT v5.
30///
31/// These properties provide extended functionality beyond the basic connection
32/// parameters, including session management, flow control, and authentication.
33///
34/// # Example
35///
36/// ```rust
37/// use mqute_codec::protocol::v5::ConnectProperties;
38/// use std::time::Duration;
39///
40/// let connect_properties = ConnectProperties {
41///     session_expiry_interval: Some(Duration::from_secs(3600)),
42///     maximum_packet_size: Some(4096u32),
43///     ..Default::default()
44/// };
45/// ```
46#[derive(Debug, Default, Clone, PartialEq, Eq)]
47pub struct ConnectProperties {
48    /// Duration in seconds after which the session expires
49    pub session_expiry_interval: Option<Duration>,
50    /// Maximum number of QoS 1 and 2 publishes the client will process
51    pub receive_maximum: Option<u16>,
52    /// Maximum packet size the client will accept
53    pub maximum_packet_size: Option<u32>,
54    /// Highest value the client will accept as a topic alias
55    pub topic_alias_maximum: Option<u16>,
56    /// Whether the server should include response information
57    pub request_response_info: Option<bool>,
58    /// Whether the server should include reason strings
59    pub request_problem_info: Option<bool>,
60    /// User-defined key-value properties
61    pub user_properties: Vec<(String, String)>,
62    /// Authentication method name
63    pub auth_method: Option<String>,
64    /// Authentication data
65    pub auth_data: Option<Bytes>,
66}
67
68impl PropertyFrame for ConnectProperties {
69    /// Calculates the encoded length of the properties
70    fn encoded_len(&self) -> usize {
71        let mut len = 0;
72
73        len += property_len!(&self.session_expiry_interval);
74        len += property_len!(&self.receive_maximum);
75        len += property_len!(&self.maximum_packet_size);
76        len += property_len!(&self.topic_alias_maximum);
77        len += property_len!(&self.request_response_info);
78        len += property_len!(&self.request_problem_info);
79        len += property_len!(&self.user_properties);
80        len += property_len!(&self.auth_method);
81        len += property_len!(&self.auth_data);
82
83        len
84    }
85
86    /// Encodes the properties into a byte buffer
87    fn encode(&self, buf: &mut BytesMut) {
88        property_encode!(
89            &self.session_expiry_interval,
90            Property::SessionExpiryInterval,
91            buf
92        );
93        property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
94        property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
95        property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
96        property_encode!(
97            &self.request_response_info,
98            Property::RequestResponseInformation,
99            buf
100        );
101        property_encode!(
102            &self.request_problem_info,
103            Property::RequestProblemInformation,
104            buf
105        );
106        property_encode!(&self.user_properties, Property::UserProp, buf);
107        property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
108        property_encode!(&self.auth_data, Property::AuthenticationData, buf);
109    }
110
111    /// Decodes properties from a byte buffer
112    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
113        if buf.is_empty() {
114            return Ok(None);
115        }
116
117        let mut properties = ConnectProperties::default();
118
119        while buf.has_remaining() {
120            let property: Property = decode_byte(buf)?.try_into()?;
121            match property {
122                Property::SessionExpiryInterval => {
123                    property_decode!(&mut properties.session_expiry_interval, buf);
124                }
125                Property::ReceiveMaximum => {
126                    property_decode_non_zero!(&mut properties.receive_maximum, buf);
127                }
128                Property::MaximumPacketSize => {
129                    property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
130                }
131                Property::TopicAliasMaximum => {
132                    property_decode!(&mut properties.topic_alias_maximum, buf);
133                }
134                Property::RequestResponseInformation => {
135                    property_decode!(&mut properties.request_response_info, buf);
136                }
137                Property::RequestProblemInformation => {
138                    property_decode!(&mut properties.request_problem_info, buf);
139                }
140                Property::UserProp => {
141                    property_decode!(&mut properties.user_properties, buf);
142                }
143                Property::AuthenticationMethod => {
144                    property_decode!(&mut properties.auth_method, buf);
145                }
146                Property::AuthenticationData => {
147                    property_decode!(&mut properties.auth_data, buf);
148                }
149                _ => return Err(Error::PropertyMismatch),
150            };
151        }
152
153        if properties.auth_data.is_some() && properties.auth_method.is_none() {
154            return Err(Error::ProtocolError);
155        }
156
157        Ok(Some(properties))
158    }
159}
160
161impl ConnectFrame for ConnectHeader<ConnectProperties> {
162    /// Calculates the encoded length of the Connect header
163    fn encoded_len(&self) -> usize {
164        let properties_len = self
165            .properties
166            .as_ref()
167            .map(|properties| properties.encoded_len())
168            .unwrap_or(0);
169        properties_len + len_bytes(properties_len) + self.primary_encoded_len()
170    }
171
172    /// Encodes the Connect header into a byte buffer
173    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
174        self.primary_encode(buf);
175
176        let properties_len = self
177            .properties
178            .as_ref()
179            .map(|properties| properties.encoded_len())
180            .unwrap_or(0) as u32;
181
182        encode_variable_integer(buf, properties_len)?;
183
184        if let Some(properties) = self.properties.as_ref() {
185            properties.encode(buf);
186        }
187        Ok(())
188    }
189
190    /// Decodes a Connect header from a byte buffer
191    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
192        let mut header = Self::primary_decode(buf)?;
193
194        let properties_len = decode_variable_integer(buf)? as usize;
195        if buf.len() < properties_len + len_bytes(properties_len) {
196            return Err(Error::MalformedPacket);
197        }
198
199        // Skip variable byte
200        buf.advance(len_bytes(properties_len));
201
202        let mut properties_buf = buf.split_to(properties_len);
203
204        // Deserialize properties
205        header.properties = ConnectProperties::decode(&mut properties_buf)?;
206
207        Ok(header)
208    }
209}
210
211/// Represents the properties of a Will message in MQTT v5.
212///
213/// These properties provide extended functionality for the last will and testament
214/// message, including delivery timing, content format, and correlation data.
215/// # Example
216///
217/// ```rust
218/// use std::time::Duration;
219/// use mqute_codec::protocol::v5::WillProperties;
220///
221/// let will_properties = WillProperties {
222///     delay_interval: Some(Duration::from_secs(10)),
223///     content_type: Some("json".to_string()),
224///     ..Default::default()
225/// };
226/// ```
227#[derive(Debug, Default, Clone, PartialEq, Eq)]
228pub struct WillProperties {
229    /// Delay before sending the Will message after connection loss
230    pub delay_interval: Option<Duration>,
231    /// Format of the Will message payload (0=bytes, 1=UTF-8)
232    pub payload_format_indicator: Option<u8>,
233    /// Lifetime of the Will message in seconds
234    pub message_expiry_interval: Option<Duration>,
235    /// Content type descriptor (MIME type)
236    pub content_type: Option<String>,
237    /// Topic name for the response message
238    pub response_topic: Option<String>,
239    /// Correlation data for the response message
240    pub correlation_data: Option<Bytes>,
241    /// User-defined key-value properties
242    pub user_properties: Vec<(String, String)>,
243}
244
245impl PropertyFrame for WillProperties {
246    /// Calculates the encoded length of the Will properties
247    fn encoded_len(&self) -> usize {
248        let mut len = 0;
249
250        len += property_len!(&self.delay_interval);
251        len += property_len!(&self.payload_format_indicator);
252        len += property_len!(&self.message_expiry_interval);
253        len += property_len!(&self.content_type);
254        len += property_len!(&self.response_topic);
255        len += property_len!(&self.correlation_data);
256        len += property_len!(&self.user_properties);
257
258        len
259    }
260
261    /// Encodes the Will properties into a byte buffer
262    fn encode(&self, buf: &mut BytesMut) {
263        property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
264        property_encode!(
265            &self.payload_format_indicator,
266            Property::PayloadFormatIndicator,
267            buf
268        );
269        property_encode!(
270            &self.message_expiry_interval,
271            Property::MessageExpiryInterval,
272            buf
273        );
274        property_encode!(&self.content_type, Property::ContentType, buf);
275        property_encode!(&self.response_topic, Property::ResponseTopic, buf);
276        property_encode!(&self.correlation_data, Property::CorrelationData, buf);
277        property_encode!(&self.user_properties, Property::UserProp, buf);
278    }
279
280    /// Decodes Will properties from a byte buffer
281    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
282        if buf.is_empty() {
283            return Ok(None);
284        }
285
286        let mut properties = WillProperties::default();
287
288        while buf.has_remaining() {
289            let property: Property = decode_byte(buf)?.try_into()?;
290            match property {
291                Property::WillDelayInterval => {
292                    property_decode!(&mut properties.delay_interval, buf);
293                }
294                Property::PayloadFormatIndicator => {
295                    property_decode!(&mut properties.payload_format_indicator, buf);
296                    if let Some(value) = properties.payload_format_indicator {
297                        if value != 0 && value != 1 {
298                            return Err(Error::ProtocolError);
299                        }
300                    }
301                }
302                Property::MessageExpiryInterval => {
303                    property_decode!(&mut properties.message_expiry_interval, buf);
304                }
305                Property::ContentType => {
306                    property_decode!(&mut properties.content_type, buf);
307                }
308                Property::ResponseTopic => {
309                    property_decode!(&mut properties.response_topic, buf);
310                }
311                Property::CorrelationData => {
312                    property_decode!(&mut properties.correlation_data, buf);
313                }
314                Property::UserProp => {
315                    property_decode!(&mut properties.user_properties, buf);
316                }
317                _ => return Err(Error::PropertyMismatch),
318            }
319        }
320
321        Ok(Some(properties))
322    }
323}
324
325/// Represents a Last Will and Testament (LWT) message in MQTT v5.
326///
327/// The Will message is published by the broker when the client disconnects unexpectedly.
328/// It includes the message content, delivery options, and MQTT v5 properties that provide
329/// additional control over the will message delivery.
330///
331/// # Example
332///
333/// ```rust
334/// use mqute_codec::protocol::v5::Will;
335/// use bytes::Bytes;
336/// use mqute_codec::protocol::QoS;
337///
338/// let will = Will::new(None, "/topic", Bytes::new(), QoS::ExactlyOnce, false);
339/// ```
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct Will {
342    /// Will message properties
343    pub properties: Option<WillProperties>,
344    /// Topic name to publish the Will message to
345    pub topic: String,
346    /// Will message payload
347    pub payload: Bytes,
348    /// Quality of Service level for the Will message
349    pub qos: QoS,
350    /// Whether the Will message should be retained
351    pub retain: bool,
352}
353
354impl Will {
355    /// Creates a new `Will` instance with the specified parameters.
356    ///
357    /// # Panics
358    ///
359    /// Panics if the topic name is invalid according to MQTT topic naming rules.
360    pub fn new<T: Into<String>>(
361        properties: Option<WillProperties>,
362        topic: T,
363        payload: Bytes,
364        qos: QoS,
365        retain: bool,
366    ) -> Self {
367        let topic = topic.into();
368
369        if !util::is_valid_topic_name(&topic) {
370            panic!("Invalid topic name: '{}'", topic);
371        }
372
373        Will {
374            properties,
375            topic,
376            payload,
377            qos,
378            retain,
379        }
380    }
381}
382
383impl WillFrame for Will {
384    /// Calculates the encoded length of the Will message
385    fn encoded_len(&self) -> usize {
386        let properties_len = self
387            .properties
388            .as_ref()
389            .map(|properties| properties.encoded_len())
390            .unwrap_or(0);
391
392        2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
393    }
394
395    /// Updates the Connect packet flags based on Will message settings
396    fn update_flags(&self, flags: &mut u8) {
397        // Update the 'Will' flag
398        flags.set_bit(WILL_FLAG, true);
399
400        // Update 'Qos' flags
401        flags.set_bits(WILL_QOS, self.qos as u8);
402
403        // Update the 'Will Retain' flag
404        flags.set_bit(WILL_RETAIN, self.retain);
405    }
406
407    /// Encodes the Will message into a byte buffer
408    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
409        let properties_len = self
410            .properties
411            .as_ref()
412            .map(|properties| properties.encoded_len())
413            .unwrap_or(0) as u32;
414
415        encode_variable_integer(buf, properties_len)?;
416
417        if let Some(properties) = self.properties.as_ref() {
418            properties.encode(buf);
419        }
420
421        encode_string(buf, &self.topic);
422        encode_bytes(buf, &self.payload);
423        Ok(())
424    }
425
426    /// Decodes a Will message from a byte buffer
427    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
428        if !flags.get_bit(WILL_FLAG) {
429            // No 'Will'
430            return Ok(None);
431        }
432
433        let properties_len = decode_variable_integer(buf)? as usize;
434        if buf.len() < properties_len + len_bytes(properties_len) {
435            return Err(Error::MalformedPacket);
436        }
437
438        // Skip properties len
439        buf.advance(len_bytes(properties_len));
440        let mut properties_buf = buf.split_to(properties_len);
441        let properties = WillProperties::decode(&mut properties_buf)?;
442        let qos = flags.get_bits(WILL_QOS).try_into()?;
443        let retain = flags.get_bit(WILL_RETAIN);
444
445        let topic = decode_string(buf)?;
446
447        if !util::is_valid_topic_name(&topic) {
448            return Err(Error::InvalidTopicName(topic));
449        }
450
451        let payload = decode_bytes(buf)?;
452
453        Ok(Some(Will {
454            properties,
455            topic,
456            payload,
457            qos,
458            retain,
459        }))
460    }
461}
462
463// Defines the `Connect` packet for MQTT V5
464connect!(Connect<ConnectProperties, Will>, Protocol::V5);
465
466impl Connect {
467    /// Creates a new Connect packet with properties
468    ///
469    /// # Panics
470    ///
471    /// Panics if the value of the "keep alive" parameter exceeds 65535
472    pub fn with_properties<S: Into<String>>(
473        client_id: S,
474        auth: Option<Credentials>,
475        will: Option<Will>,
476        properties: ConnectProperties,
477        keep_alive: Duration,
478        clean_session: bool,
479    ) -> Self {
480        Self::from_scratch(
481            client_id,
482            auth,
483            will,
484            Some(properties),
485            keep_alive,
486            clean_session,
487        )
488    }
489
490    /// Returns the Connect properties if present
491    pub fn properties(&self) -> Option<ConnectProperties> {
492        self.header.properties.clone()
493    }
494}