mqute_codec/protocol/v5/
connect.rs

1//! # Connect Packet V5
2//!
3//! This module provides the complete implementation of the MQTT v5 Connect packet,
4//! including its properties, will message handling, and authentication support.
5//! The Connect packet is the first packet sent by a client to initiate a connection
6//! with an MQTT broker.
7
8use super::property::{property_decode, property_decode_non_zero, property_encode};
9use super::property::{property_len, Property, PropertyFrame};
10use crate::codec::util::{
11    decode_byte, decode_bytes, decode_string, decode_variable_integer, encode_bytes, encode_string,
12    encode_variable_integer,
13};
14use crate::protocol::common::{connect, ConnectHeader};
15use crate::protocol::common::{ConnectFrame, WillFrame};
16use crate::protocol::util::len_bytes;
17use crate::protocol::{Credentials, Protocol, QoS};
18use crate::Error;
19use bit_field::BitField;
20use bytes::{Buf, Bytes, BytesMut};
21use std::ops::RangeInclusive;
22use std::time::Duration;
23
24/// Bit flag positions for Connect packet flags
25const WILL_FLAG: usize = 2;
26const WILL_QOS: RangeInclusive<usize> = 3..=4;
27const WILL_RETAIN: usize = 5;
28
29/// Represents the properties of a Connect packet in MQTT v5.
30///
31/// These properties provide extended functionality beyond the basic connection
32/// parameters, including session management, flow control, and authentication.
33///
34/// # Example
35///
36/// ```rust
37/// use mqute_codec::protocol::v5::ConnectProperties;
38///
39/// let connect_properties = ConnectProperties {
40///     session_expiry_interval: Some(3600u32),
41///     maximum_packet_size: Some(4096u32),
42///     ..Default::default()
43/// };
44/// ```
45#[derive(Debug, Default, Clone, PartialEq, Eq)]
46pub struct ConnectProperties {
47    /// Duration in seconds after which the session expires
48    pub session_expiry_interval: Option<u32>,
49    /// Maximum number of QoS 1 and 2 publishes the client will process
50    pub receive_maximum: Option<u16>,
51    /// Maximum packet size the client will accept
52    pub maximum_packet_size: Option<u32>,
53    /// Highest value the client will accept as a topic alias
54    pub topic_alias_maximum: Option<u16>,
55    /// Whether the server should include response information
56    pub request_response_info: Option<bool>,
57    /// Whether the server should include reason strings
58    pub request_problem_info: Option<bool>,
59    /// User-defined key-value properties
60    pub user_properties: Vec<(String, String)>,
61    /// Authentication method name
62    pub auth_method: Option<String>,
63    /// Authentication data
64    pub auth_data: Option<Bytes>,
65}
66
67impl PropertyFrame for ConnectProperties {
68    /// Calculates the encoded length of the properties
69    fn encoded_len(&self) -> usize {
70        let mut len = 0;
71
72        len += property_len!(&self.session_expiry_interval);
73        len += property_len!(&self.receive_maximum);
74        len += property_len!(&self.maximum_packet_size);
75        len += property_len!(&self.topic_alias_maximum);
76        len += property_len!(&self.request_response_info);
77        len += property_len!(&self.request_problem_info);
78        len += property_len!(&self.user_properties);
79        len += property_len!(&self.auth_method);
80        len += property_len!(&self.auth_data);
81
82        len
83    }
84
85    /// Encodes the properties into a byte buffer
86    fn encode(&self, buf: &mut BytesMut) {
87        property_encode!(
88            &self.session_expiry_interval,
89            Property::SessionExpiryInterval,
90            buf
91        );
92        property_encode!(&self.receive_maximum, Property::ReceiveMaximum, buf);
93        property_encode!(&self.maximum_packet_size, Property::MaximumPacketSize, buf);
94        property_encode!(&self.topic_alias_maximum, Property::TopicAliasMaximum, buf);
95        property_encode!(
96            &self.request_response_info,
97            Property::RequestResponseInformation,
98            buf
99        );
100        property_encode!(
101            &self.request_problem_info,
102            Property::RequestProblemInformation,
103            buf
104        );
105        property_encode!(&self.user_properties, Property::UserProp, buf);
106        property_encode!(&self.auth_method, Property::AuthenticationMethod, buf);
107        property_encode!(&self.auth_data, Property::AuthenticationData, buf);
108    }
109
110    /// Decodes properties from a byte buffer
111    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
112        if buf.is_empty() {
113            return Ok(None);
114        }
115
116        let mut properties = ConnectProperties::default();
117
118        while buf.has_remaining() {
119            let property: Property = decode_byte(buf)?.try_into()?;
120            match property {
121                Property::SessionExpiryInterval => {
122                    property_decode!(&mut properties.session_expiry_interval, buf);
123                }
124                Property::ReceiveMaximum => {
125                    property_decode_non_zero!(&mut properties.receive_maximum, buf);
126                }
127                Property::MaximumPacketSize => {
128                    property_decode_non_zero!(&mut properties.maximum_packet_size, buf);
129                }
130                Property::TopicAliasMaximum => {
131                    property_decode!(&mut properties.topic_alias_maximum, buf);
132                }
133                Property::RequestResponseInformation => {
134                    property_decode!(&mut properties.request_response_info, buf);
135                }
136                Property::RequestProblemInformation => {
137                    property_decode!(&mut properties.request_problem_info, buf);
138                }
139                Property::UserProp => {
140                    property_decode!(&mut properties.user_properties, buf);
141                }
142                Property::AuthenticationMethod => {
143                    property_decode!(&mut properties.auth_method, buf);
144                }
145                Property::AuthenticationData => {
146                    property_decode!(&mut properties.auth_data, buf);
147                }
148                _ => return Err(Error::PropertyMismatch),
149            };
150        }
151
152        if properties.auth_data.is_some() && properties.auth_method.is_none() {
153            return Err(Error::ProtocolError);
154        }
155
156        Ok(Some(properties))
157    }
158}
159
160impl ConnectFrame for ConnectHeader<ConnectProperties> {
161    /// Calculates the encoded length of the Connect header
162    fn encoded_len(&self) -> usize {
163        let properties_len = self
164            .properties
165            .as_ref()
166            .map(|properties| properties.encoded_len())
167            .unwrap_or(0);
168        properties_len + len_bytes(properties_len) + self.primary_encoded_len()
169    }
170
171    /// Encodes the Connect header into a byte buffer
172    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
173        self.primary_encode(buf);
174
175        let properties_len = self
176            .properties
177            .as_ref()
178            .map(|properties| properties.encoded_len())
179            .unwrap_or(0) as u32;
180
181        encode_variable_integer(buf, properties_len)?;
182
183        if let Some(properties) = self.properties.as_ref() {
184            properties.encode(buf);
185        }
186        Ok(())
187    }
188
189    /// Decodes a Connect header from a byte buffer
190    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
191        let mut header = Self::primary_decode(buf)?;
192
193        let properties_len = decode_variable_integer(buf)? as usize;
194        if buf.len() < properties_len + len_bytes(properties_len) {
195            return Err(Error::MalformedPacket);
196        }
197
198        // Skip variable byte
199        buf.advance(len_bytes(properties_len));
200
201        let mut properties_buf = buf.split_to(properties_len);
202
203        // Deserialize properties
204        header.properties = ConnectProperties::decode(&mut properties_buf)?;
205
206        Ok(header)
207    }
208}
209
210/// Represents the properties of a Will message in MQTT v5.
211///
212/// These properties provide extended functionality for the last will and testament
213/// message, including delivery timing, content format, and correlation data.
214/// # Example
215///
216/// ```rust
217/// use mqute_codec::protocol::v5::WillProperties;
218///
219/// let will_properties = WillProperties {
220///     delay_interval: Some(10u32),
221///     content_type: Some("json".to_string()),
222///     ..Default::default()
223/// };
224/// ```
225#[derive(Debug, Default, Clone, PartialEq, Eq)]
226pub struct WillProperties {
227    /// Delay before sending the Will message after connection loss
228    pub delay_interval: Option<u32>,
229    /// Format of the Will message payload (0=bytes, 1=UTF-8)
230    pub payload_format_indicator: Option<u8>,
231    /// Lifetime of the Will message in seconds
232    pub message_expiry_interval: Option<u32>,
233    /// Content type descriptor (MIME type)
234    pub content_type: Option<String>,
235    /// Topic name for the response message
236    pub response_topic: Option<String>,
237    /// Correlation data for the response message
238    pub correlation_data: Option<Bytes>,
239    /// User-defined key-value properties
240    pub user_properties: Vec<(String, String)>,
241}
242
243impl PropertyFrame for WillProperties {
244    /// Calculates the encoded length of the Will properties
245    fn encoded_len(&self) -> usize {
246        let mut len = 0;
247
248        len += property_len!(&self.delay_interval);
249        len += property_len!(&self.payload_format_indicator);
250        len += property_len!(&self.message_expiry_interval);
251        len += property_len!(&self.content_type);
252        len += property_len!(&self.response_topic);
253        len += property_len!(&self.correlation_data);
254        len += property_len!(&self.user_properties);
255
256        len
257    }
258
259    /// Encodes the Will properties into a byte buffer
260    fn encode(&self, buf: &mut BytesMut) {
261        property_encode!(&self.delay_interval, Property::WillDelayInterval, buf);
262        property_encode!(
263            &self.payload_format_indicator,
264            Property::PayloadFormatIndicator,
265            buf
266        );
267        property_encode!(
268            &self.message_expiry_interval,
269            Property::MessageExpiryInterval,
270            buf
271        );
272        property_encode!(&self.content_type, Property::ContentType, buf);
273        property_encode!(&self.response_topic, Property::ResponseTopic, buf);
274        property_encode!(&self.correlation_data, Property::CorrelationData, buf);
275        property_encode!(&self.user_properties, Property::UserProp, buf);
276    }
277
278    /// Decodes Will properties from a byte buffer
279    fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error> {
280        if buf.is_empty() {
281            return Ok(None);
282        }
283
284        let mut properties = WillProperties::default();
285
286        while buf.has_remaining() {
287            let property: Property = decode_byte(buf)?.try_into()?;
288            match property {
289                Property::WillDelayInterval => {
290                    property_decode!(&mut properties.delay_interval, buf);
291                }
292                Property::PayloadFormatIndicator => {
293                    property_decode!(&mut properties.payload_format_indicator, buf);
294                    if let Some(value) = properties.payload_format_indicator {
295                        if value != 0 && value != 1 {
296                            return Err(Error::ProtocolError);
297                        }
298                    }
299                }
300                Property::MessageExpiryInterval => {
301                    property_decode!(&mut properties.message_expiry_interval, buf);
302                }
303                Property::ContentType => {
304                    property_decode!(&mut properties.content_type, buf);
305                }
306                Property::ResponseTopic => {
307                    property_decode!(&mut properties.response_topic, buf);
308                }
309                Property::CorrelationData => {
310                    property_decode!(&mut properties.correlation_data, buf);
311                }
312                Property::UserProp => {
313                    property_decode!(&mut properties.user_properties, buf);
314                }
315                _ => return Err(Error::PropertyMismatch),
316            }
317        }
318
319        Ok(Some(properties))
320    }
321}
322
323/// Represents a Will message in MQTT v5.
324///
325/// The Will message is published by the broker when the client disconnects
326/// unexpectedly. It includes the message content, delivery options, and properties.
327///
328/// # Example
329///
330/// ```rust
331/// use mqute_codec::protocol::v5::Will;
332/// use bytes::Bytes;
333/// use mqute_codec::protocol::QoS;
334///
335/// let will = Will::new(None, "tpoic", Bytes::new(), QoS::ExactlyOnce, false);
336/// ```
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct Will {
339    /// Will message properties
340    pub properties: Option<WillProperties>,
341    /// Topic name to publish the Will message to
342    pub topic: String,
343    /// Will message payload
344    pub payload: Bytes,
345    /// Quality of Service level for the Will message
346    pub qos: QoS,
347    /// Whether the Will message should be retained
348    pub retain: bool,
349}
350
351impl Will {
352    /// Creates a new `Will` packet
353    pub fn new<T: Into<String>>(
354        properties: Option<WillProperties>,
355        topic: T,
356        payload: Bytes,
357        qos: QoS,
358        retain: bool,
359    ) -> Will {
360        Will {
361            properties,
362            topic: topic.into(),
363            payload,
364            qos,
365            retain,
366        }
367    }
368}
369
370impl WillFrame for Will {
371    /// Calculates the encoded length of the Will message
372    fn encoded_len(&self) -> usize {
373        let properties_len = self
374            .properties
375            .as_ref()
376            .map(|properties| properties.encoded_len())
377            .unwrap_or(0);
378
379        2 + self.topic.len() + 2 + self.payload.len() + len_bytes(properties_len) + properties_len
380    }
381
382    /// Updates the Connect packet flags based on Will message settings
383    fn update_flags(&self, flags: &mut u8) {
384        // Update the 'Will' flag
385        flags.set_bit(WILL_FLAG, true);
386
387        // Update 'Qos' flags
388        flags.set_bits(WILL_QOS, self.qos as u8);
389
390        // Update the 'Will Retain' flag
391        flags.set_bit(WILL_RETAIN, self.retain);
392    }
393
394    /// Encodes the Will message into a byte buffer
395    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
396        let properties_len = self
397            .properties
398            .as_ref()
399            .map(|properties| properties.encoded_len())
400            .unwrap_or(0) as u32;
401
402        encode_variable_integer(buf, properties_len)?;
403
404        if let Some(properties) = self.properties.as_ref() {
405            properties.encode(buf);
406        }
407
408        encode_string(buf, &self.topic);
409        encode_bytes(buf, &self.payload);
410        Ok(())
411    }
412
413    /// Decodes a Will message from a byte buffer
414    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
415        if !flags.get_bit(WILL_FLAG) {
416            // No 'Will'
417            return Ok(None);
418        }
419
420        let properties_len = decode_variable_integer(buf)? as usize;
421        if buf.len() < properties_len + len_bytes(properties_len) {
422            return Err(Error::MalformedPacket);
423        }
424
425        // Skip properties len
426        buf.advance(len_bytes(properties_len));
427        let mut properties_buf = buf.split_to(properties_len);
428        let properties = WillProperties::decode(&mut properties_buf)?;
429        let qos = flags.get_bits(WILL_QOS).try_into()?;
430        let retain = flags.get_bit(WILL_RETAIN);
431
432        let topic = decode_string(buf)?;
433        let payload = decode_bytes(buf)?;
434
435        Ok(Some(Will {
436            properties,
437            topic,
438            payload,
439            qos,
440            retain,
441        }))
442    }
443}
444
445// Defines the `Connect` packet for MQTT V5
446connect!(Connect<ConnectProperties, Will>, Protocol::V5);
447
448impl Connect {
449    /// Creates a new Connect packet with properties
450    ///
451    /// # Panics
452    ///
453    /// Panics if the value of the "keep alive" parameter exceeds 65535
454    pub fn with_properties<S: Into<String>>(
455        client_id: S,
456        auth: Option<Credentials>,
457        will: Option<Will>,
458        properties: ConnectProperties,
459        keep_alive: Duration,
460        clean_session: bool,
461    ) -> Self {
462        Self::from_scratch(
463            client_id,
464            auth,
465            will,
466            Some(properties),
467            keep_alive,
468            clean_session,
469        )
470    }
471
472    /// Returns the Connect properties if present
473    pub fn properties(&self) -> Option<ConnectProperties> {
474        self.header.properties.clone()
475    }
476}