mqute_codec/protocol/v5/
connect.rs

1//! # Connect Packet V5
2//!
3//! This module provides the complete implementation of the MQTT v5 Connect packet,
4//! including its properties, will message handling, and authentication support.
5//! The Connect packet is the first packet sent by a client to initiate a connection
6//! with an MQTT broker.
7
8use super::property::{property_decode, property_decode_non_zero, property_encode};
9use super::property::{property_len, Property, PropertyFrame};
10use crate::codec::util::{
11    decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
12    encode_variable_integer,
13};
14use crate::protocol::common::{connect, ConnectHeader};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::util::len_bytes;
17use crate::protocol::{Credentials, Protocol, QoS};
18use crate::Error;
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22
23/// Bit flag positions for Connect packet flags
24const WILL_FLAG: usize = 2;
25const WILL_QOS: RangeInclusive<usize> = 3..=4;
26const WILL_RETAIN: usize = 5;
27
28/// Represents the properties of a Connect packet in MQTT v5.
29///
30/// These properties provide extended functionality beyond the basic connection
31/// parameters, including session management, flow control, and authentication.
32///
33/// # Example
34///
35/// ```rust
36/// use mqute_codec::protocol::v5::ConnectProperties;
37///
38/// let connect_properties = ConnectProperties {
39///     session_expiry_interval: Some(3600u32),
40///     maximum_packet_size: Some(4096u32),
41///     ..Default::default()
42/// };
43/// ```
44#[derive(Debug, Default, Clone, PartialEq, Eq)]
45pub struct ConnectProperties {
46    /// Duration in seconds after which the session expires
47    pub session_expiry_interval: Option<u32>,
48    /// Maximum number of QoS 1 and 2 publishes the client will process
49    pub receive_maximum: Option<u16>,
50    /// Maximum packet size the client will accept
51    pub maximum_packet_size: Option<u32>,
52    /// Highest value the client will accept as a topic alias
53    pub topic_alias_maximum: Option<u16>,
54    /// Whether the server should include response information
55    pub request_response_info: Option<bool>,
56    /// Whether the server should include reason strings
57    pub request_problem_info: Option<bool>,
58    /// User-defined key-value properties
59    pub user_properties: Vec<(String, String)>,
60    /// Authentication method name
61    pub auth_method: Option<String>,
62    /// Authentication data
63    pub auth_data: Option<Bytes>,
64}
65
66impl PropertyFrame for ConnectProperties {
67    /// Calculates the encoded length of the properties
68    fn encoded_len(&self) -> usize {
69        let mut len = 0;
70
71        len += property_len!(&self.session_expiry_interval);
72        len += property_len!(&self.receive_maximum);
73        len += property_len!(&self.maximum_packet_size);
74        len += property_len!(&self.topic_alias_maximum);
75        len += property_len!(&self.request_response_info);
76        len += property_len!(&self.request_problem_info);
77        len += property_len!(&self.user_properties);
78        len += property_len!(&self.auth_method);
79        len += property_len!(&self.auth_data);
80
81        len
82    }
83
84    /// Encodes the properties into a byte buffer
85    fn encode(&self, buf: &mut BytesMut) {
86        property_encode!(
87            &self.session_expiry_interval,
88            Property::SessionExpiryInterval,
89            buf
90        );
91        property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
92        property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
93        property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
94        property_encode!(
95            &self.request_response_info,
96            Property::RequestResponseInformation,
97            buf
98        );
99        property_encode!(
100            &self.request_problem_info,
101            Property::RequestProblemInformation,
102            buf
103        );
104        property_encode!(&self.user_properties, Property::UserProp, buf);
105        property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
106        property_encode!(&self.auth_data, Property::AuthenticationData, buf);
107    }
108
109    /// Decodes properties from a byte buffer
110    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
111        if buf.is_empty() {
112            return Ok(None);
113        }
114
115        let mut properties = ConnectProperties::default();
116
117        while buf.has_remaining() {
118            let property: Property = decode_byte(buf)?.try_into()?;
119            match property {
120                Property::SessionExpiryInterval => {
121                    property_decode!(&mut properties.session_expiry_interval, buf);
122                }
123                Property::ReceiveMaximum => {
124                    property_decode_non_zero!(&mut properties.receive_maximum, buf);
125                }
126                Property::MaximumPacketSize => {
127                    property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
128                }
129                Property::TopicAliasMaximum => {
130                    property_decode!(&mut properties.topic_alias_maximum, buf);
131                }
132                Property::RequestResponseInformation => {
133                    property_decode!(&mut properties.request_response_info, buf);
134                }
135                Property::RequestProblemInformation => {
136                    property_decode!(&mut properties.request_problem_info, buf);
137                }
138                Property::UserProp => {
139                    property_decode!(&mut properties.user_properties, buf);
140                }
141                Property::AuthenticationMethod => {
142                    property_decode!(&mut properties.auth_method, buf);
143                }
144                Property::AuthenticationData => {
145                    property_decode!(&mut properties.auth_data, buf);
146                }
147                _ => return Err(Error::PropertyMismatch),
148            };
149        }
150
151        if properties.auth_data.is_some() && properties.auth_method.is_none() {
152            return Err(Error::ProtocolError);
153        }
154
155        Ok(Some(properties))
156    }
157}
158
159impl ConnectFrame for ConnectHeader<ConnectProperties> {
160    /// Calculates the encoded length of the Connect header
161    fn encoded_len(&self) -> usize {
162        let properties_len = self
163            .properties
164            .as_ref()
165            .map(|properties| properties.encoded_len())
166            .unwrap_or(0);
167        properties_len + len_bytes(properties_len) + self.primary_encoded_len()
168    }
169
170    /// Encodes the Connect header into a byte buffer
171    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
172        self.primary_encode(buf);
173
174        let properties_len = self
175            .properties
176            .as_ref()
177            .map(|properties| properties.encoded_len())
178            .unwrap_or(0) as u32;
179
180        encode_variable_integer(buf, properties_len)?;
181
182        if let Some(properties) = self.properties.as_ref() {
183            properties.encode(buf);
184        }
185        Ok(())
186    }
187
188    /// Decodes a Connect header from a byte buffer
189    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
190        let mut header = Self::primary_decode(buf)?;
191
192        let properties_len = decode_variable_integer(buf)? as usize;
193        if buf.len() < properties_len + len_bytes(properties_len) {
194            return Err(Error::MalformedPacket);
195        }
196
197        // Skip variable byte
198        buf.advance(len_bytes(properties_len));
199
200        let mut properties_buf = buf.split_to(properties_len);
201
202        // Deserialize properties
203        header.properties = ConnectProperties::decode(&mut properties_buf)?;
204
205        Ok(header)
206    }
207}
208
209/// Represents the properties of a Will message in MQTT v5.
210///
211/// These properties provide extended functionality for the last will and testament
212/// message, including delivery timing, content format, and correlation data.
213/// # Example
214///
215/// ```rust
216/// use mqute_codec::protocol::v5::WillProperties;
217///
218/// let will_properties = WillProperties {
219///     delay_interval: Some(10u32),
220///     content_type: Some("json".to_string()),
221///     ..Default::default()
222/// };
223/// ```
224#[derive(Debug, Default, Clone, PartialEq, Eq)]
225pub struct WillProperties {
226    /// Delay before sending the Will message after connection loss
227    pub delay_interval: Option<u32>,
228    /// Format of the Will message payload (0=bytes, 1=UTF-8)
229    pub payload_format_indicator: Option<u8>,
230    /// Lifetime of the Will message in seconds
231    pub message_expiry_interval: Option<u32>,
232    /// Content type descriptor (MIME type)
233    pub content_type: Option<String>,
234    /// Topic name for the response message
235    pub response_topic: Option<String>,
236    /// Correlation data for the response message
237    pub correlation_data: Option<Bytes>,
238    /// User-defined key-value properties
239    pub user_properties: Vec<(String, String)>,
240}
241
242impl PropertyFrame for WillProperties {
243    /// Calculates the encoded length of the Will properties
244    fn encoded_len(&self) -> usize {
245        let mut len = 0;
246
247        len += property_len!(&self.delay_interval);
248        len += property_len!(&self.payload_format_indicator);
249        len += property_len!(&self.message_expiry_interval);
250        len += property_len!(&self.content_type);
251        len += property_len!(&self.response_topic);
252        len += property_len!(&self.correlation_data);
253        len += property_len!(&self.user_properties);
254
255        len
256    }
257
258    /// Encodes the Will properties into a byte buffer
259    fn encode(&self, buf: &mut BytesMut) {
260        property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
261        property_encode!(
262            &self.payload_format_indicator,
263            Property::PayloadFormatIndicator,
264            buf
265        );
266        property_encode!(
267            &self.message_expiry_interval,
268            Property::MessageExpiryInterval,
269            buf
270        );
271        property_encode!(&self.content_type, Property::ContentType, buf);
272        property_encode!(&self.response_topic, Property::ResponseTopic, buf);
273        property_encode!(&self.correlation_data, Property::CorrelationData, buf);
274        property_encode!(&self.user_properties, Property::UserProp, buf);
275    }
276
277    /// Decodes Will properties from a byte buffer
278    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
279        if buf.is_empty() {
280            return Ok(None);
281        }
282
283        let mut properties = WillProperties::default();
284
285        while buf.has_remaining() {
286            let property: Property = decode_byte(buf)?.try_into()?;
287            match property {
288                Property::WillDelayInterval => {
289                    property_decode!(&mut properties.delay_interval, buf);
290                }
291                Property::PayloadFormatIndicator => {
292                    property_decode!(&mut properties.payload_format_indicator, buf);
293                    if let Some(value) = properties.payload_format_indicator {
294                        if value != 0 && value != 1 {
295                            return Err(Error::ProtocolError);
296                        }
297                    }
298                }
299                Property::MessageExpiryInterval => {
300                    property_decode!(&mut properties.message_expiry_interval, buf);
301                }
302                Property::ContentType => {
303                    property_decode!(&mut properties.content_type, buf);
304                }
305                Property::ResponseTopic => {
306                    property_decode!(&mut properties.response_topic, buf);
307                }
308                Property::CorrelationData => {
309                    property_decode!(&mut properties.correlation_data, buf);
310                }
311                Property::UserProp => {
312                    property_decode!(&mut properties.user_properties, buf);
313                }
314                _ => return Err(Error::PropertyMismatch),
315            }
316        }
317
318        Ok(Some(properties))
319    }
320}
321
322/// Represents a Will message in MQTT v5.
323///
324/// The Will message is published by the broker when the client disconnects
325/// unexpectedly. It includes the message content, delivery options, and properties.
326///
327/// # Example
328///
329/// ```rust
330/// use mqute_codec::protocol::v5::Will;
331/// use bytes::Bytes;
332/// use mqute_codec::protocol::QoS;
333///
334/// let will = Will::new(None, "tpoic", Bytes::new(), QoS::ExactlyOnce, false);
335/// ```
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct Will {
338    /// Will message properties
339    pub properties: Option<WillProperties>,
340    /// Topic name to publish the Will message to
341    pub topic: String,
342    /// Will message payload
343    pub payload: Bytes,
344    /// Quality of Service level for the Will message
345    pub qos: QoS,
346    /// Whether the Will message should be retained
347    pub retain: bool,
348}
349
350impl Will {
351    /// Creates a new `Will` packet
352    pub fn new<T: Into<String>>(
353        properties: Option<WillProperties>,
354        topic: T,
355        payload: Bytes,
356        qos: QoS,
357        retain: bool,
358    ) -> Will {
359        Will {
360            properties,
361            topic: topic.into(),
362            payload,
363            qos,
364            retain,
365        }
366    }
367}
368
369impl WillFrame for Will {
370    /// Calculates the encoded length of the Will message
371    fn encoded_len(&self) -> usize {
372        let properties_len = self
373            .properties
374            .as_ref()
375            .map(|properties| properties.encoded_len())
376            .unwrap_or(0);
377
378        2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
379    }
380
381    /// Updates the Connect packet flags based on Will message settings
382    fn update_flags(&self, flags: &mut u8) {
383        // Update the 'Will' flag
384        flags.set_bit(WILL_FLAG, true);
385
386        // Update 'Qos' flags
387        flags.set_bits(WILL_QOS, self.qos as u8);
388
389        // Update the 'Will Retain' flag
390        flags.set_bit(WILL_RETAIN, self.retain);
391    }
392
393    /// Encodes the Will message into a byte buffer
394    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
395        let properties_len = self
396            .properties
397            .as_ref()
398            .map(|properties| properties.encoded_len())
399            .unwrap_or(0) as u32;
400
401        encode_variable_integer(buf, properties_len)?;
402
403        if let Some(properties) = self.properties.as_ref() {
404            properties.encode(buf);
405        }
406
407        encode_string(buf, &self.topic);
408        encode_bytes(buf, &self.payload);
409        Ok(())
410    }
411
412    /// Decodes a Will message from a byte buffer
413    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
414        if !flags.get_bit(WILL_FLAG) {
415            // No 'Will'
416            return Ok(None);
417        }
418
419        let properties_len = decode_variable_integer(buf)? as usize;
420        if buf.len() < properties_len + len_bytes(properties_len) {
421            return Err(Error::MalformedPacket);
422        }
423
424        // Skip properties len
425        buf.advance(len_bytes(properties_len));
426        let mut properties_buf = buf.split_to(properties_len);
427        let properties = WillProperties::decode(&mut properties_buf)?;
428        let qos = flags.get_bits(WILL_QOS).try_into()?;
429        let retain = flags.get_bit(WILL_RETAIN);
430
431        let topic = decode_string(buf)?;
432        let payload = decode_bytes(buf)?;
433
434        Ok(Some(Will {
435            properties,
436            topic,
437            payload,
438            qos,
439            retain,
440        }))
441    }
442}
443
444// Defines the `Connect` packet for MQTT V5
445connect!(Connect<ConnectProperties, Will>, Protocol::V5);
446
447impl Connect {
448    /// Creates a new Connect packet with properties
449    pub fn with_properties<S: Into<String>>(
450        client_id: S,
451        auth: Option<Credentials>,
452        will: Option<Will>,
453        properties: ConnectProperties,
454        keep_alive: u16,
455        clean_session: bool,
456    ) -> Self {
457        Self::from_scratch(
458            client_id,
459            auth,
460            will,
461            Some(properties),
462            keep_alive,
463            clean_session,
464        )
465    }
466
467    /// Returns the Connect properties if present
468    pub fn properties(&self) -> Option<ConnectProperties> {
469        self.header.properties.clone()
470    }
471}