mqute_codec/protocol/v4/
connect.rs

1//! # Connect Packet V4
2//!
3//! This module defines the `Will` struct, which represents the Last Will and Testament (LWT)
4//! feature in MQTT. It also implements the `WillFrame` trait for encoding and decoding the `Will`
5//! payload.
6
7use crate::Error;
8use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::common::{ConnectHeader, connect};
11use crate::protocol::{Protocol, QoS, util};
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20/// Represents the Last Will and Testament (LWT) feature in MQTT.
21///
22/// The `Will` struct includes the topic, payload, QoS level, and retain flag for the LWT message.
23/// This message will be published by the broker if the client disconnects unexpectedly.
24///
25/// # Example
26///
27/// ```rust
28/// use mqute_codec::protocol::v4::Will;
29/// use mqute_codec::protocol::QoS;
30/// use bytes::Bytes;
31///
32/// let will = Will::new("topic", Bytes::from("message"), QoS::AtLeastOnce, true);
33/// assert_eq!(will.topic, "topic");
34/// assert_eq!(will.payload, Bytes::from("message"));
35/// assert_eq!(will.qos, QoS::AtLeastOnce);
36/// assert_eq!(will.retain, true);
37/// ```
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct Will {
40    /// The topic to which the LWT message will be published.
41    pub topic: String,
42
43    /// The payload of the LWT message.
44    pub payload: Bytes,
45
46    /// The QoS level for the LWT message.
47    pub qos: QoS,
48
49    /// Whether the LWT message should be retained by the broker.
50    pub retain: bool,
51}
52
53impl Will {
54    /// Creates a new `Will` instance with the specified parameters.
55    ///
56    /// # Panics
57    ///
58    /// Panics if the topic name is invalid according to MQTT topic naming rules.
59    pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
60        let topic = topic.into();
61
62        if !util::is_valid_topic_name(&topic) {
63            panic!("Invalid topic name: '{}'", topic);
64        }
65
66        Will {
67            topic,
68            payload,
69            qos,
70            retain,
71        }
72    }
73}
74
75impl WillFrame for Will {
76    /// Calculates the encoded length of the `Will` payload.
77    ///
78    /// The length includes the topic length, payload length, and their respective size prefixes.
79    fn encoded_len(&self) -> usize {
80        2 + self.topic.len() + 2 + self.payload.len()
81    }
82
83    /// Updates the connection flags to reflect the `Will` settings.
84    fn update_flags(&self, flags: &mut u8) {
85        // Update the 'Will' flag
86        flags.set_bit(WILL_FLAG, true);
87
88        // Update 'Qos' flags
89        flags.set_bits(WILL_QOS, self.qos as u8);
90
91        // Update the 'Will Retain' flag
92        flags.set_bit(WILL_RETAIN, self.retain);
93    }
94
95    /// Encodes the `Will` payload into a byte buffer.
96    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
97        encode_string(buf, &self.topic);
98        encode_bytes(buf, &self.payload);
99        Ok(())
100    }
101
102    /// Decodes a `Will` payload from a byte buffer.
103    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
104        if !flags.get_bit(WILL_FLAG) {
105            // No 'Will' payload
106            return Ok(None);
107        }
108
109        let qos = flags.get_bits(WILL_QOS).try_into()?;
110        let retain = flags.get_bit(WILL_RETAIN);
111
112        let topic = decode_string(buf)?;
113        if !util::is_valid_topic_name(&topic) {
114            return Err(Error::InvalidTopicName(topic));
115        }
116
117        let message = decode_bytes(buf)?;
118        Ok(Some(Will::new(topic, message, qos, retain)))
119    }
120}
121
122/// A placeholder struct indicating that no properties are associated with the `Connect` packet.
123#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
124pub(crate) struct Propertyless;
125
126impl ConnectFrame for ConnectHeader<Propertyless> {
127    /// Calculates the encoded length of the `ConnectHeader`.
128    fn encoded_len(&self) -> usize {
129        self.primary_encoded_len()
130    }
131
132    /// Encodes the `ConnectHeader` into a byte buffer.
133    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
134        self.primary_encode(buf);
135        Ok(())
136    }
137
138    /// Decodes a `ConnectHeader` from a byte buffer.
139    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
140        Self::primary_decode(buf)
141    }
142}
143
144// Defines the `Connect` packet for MQTT V4
145connect!(Connect<Propertyless, Will>, Protocol::V4);
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::codec::PacketCodec;
151    use crate::codec::*;
152    use crate::protocol::*;
153    use bytes::{Bytes, BytesMut};
154    use std::time::Duration;
155    use tokio_util::codec::Decoder;
156
157    fn connect_sample() -> [u8; 43] {
158        [
159            (PacketType::Connect as u8) << 4, // Packet type
160            0x29,                             // Remaining len
161            0x00,                             // Protocol name len
162            0x04,
163            b'M', // Protocol name
164            b'Q',
165            b'T',
166            b'T',
167            Protocol::V4.into(), // Protocol level
168            0b1101_0110,         // Flags
169            0x00,                // Keep alive
170            0x10,
171            0x00, // Client ID
172            0x06,
173            b'c',
174            b'l',
175            b'i',
176            b'e',
177            b'n',
178            b't',
179            0x00, // Will topic
180            0x04,
181            b'/',
182            b'a',
183            b'b',
184            b'c',
185            0x00, // Will message
186            0x03,
187            b'b',
188            b'y',
189            b'e',
190            0x00, // Username
191            0x04,
192            b'u',
193            b's',
194            b'e',
195            b'r',
196            0x00, // Password
197            0x04,
198            b'p',
199            b'a',
200            b's',
201            b's',
202        ]
203    }
204
205    fn connect_packet() -> Connect {
206        let auth = Some(Credentials::full("user", "pass"));
207        let will = Some(Will::new(
208            "/abc",
209            Bytes::from("bye"),
210            QoS::ExactlyOnce,
211            false,
212        ));
213
214        Connect::new("client", auth, will, Duration::from_secs(16), true)
215    }
216
217    #[test]
218    fn connect_decode() {
219        let mut codec = PacketCodec::new(None, None);
220
221        let mut buf = BytesMut::new();
222
223        buf.extend_from_slice(&connect_sample());
224
225        let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
226        let packet = Connect::decode(raw_packet).unwrap();
227        assert_eq!(packet, connect_packet());
228    }
229
230    #[test]
231    fn connect_encode_v4() {
232        let packet = connect_packet();
233        let mut buf = BytesMut::new();
234        packet.encode(&mut buf).unwrap();
235        assert_eq!(buf, Vec::from(connect_sample()));
236    }
237}