mqute_codec/protocol/v4/
connect.rs

1//! # Connect Packet V4
2//!
3//! This module defines the `Will` struct, which represents the Last Will and Testament (LWT)
4//! feature in MQTT. It also implements the `WillFrame` trait for encoding and decoding the `Will`
5//! payload.
6
7use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
8use crate::protocol::common::{connect, ConnectHeader};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::{Protocol, QoS};
11use crate::Error;
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20/// Represents the Last Will and Testament (LWT) feature in MQTT.
21///
22/// The `Will` struct includes the topic, payload, QoS level, and retain flag for the LWT message.
23///
24/// # Example
25///
26/// ```rust
27/// use mqute_codec::protocol::v4::Will;
28/// use mqute_codec::protocol::QoS;
29/// use bytes::Bytes;
30///
31/// let will = Will::new("topic", Bytes::from("message"), QoS::AtLeastOnce, true);
32/// assert_eq!(will.topic, "topic");
33/// assert_eq!(will.payload, Bytes::from("message"));
34/// assert_eq!(will.qos, QoS::AtLeastOnce);
35/// assert_eq!(will.retain, true);
36/// ```
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct Will {
39    /// The topic to which the LWT message will be published.
40    pub topic: String,
41
42    /// The payload of the LWT message.
43    pub payload: Bytes,
44
45    /// The QoS level for the LWT message.
46    pub qos: QoS,
47
48    /// Whether the LWT message should be retained by the broker.
49    pub retain: bool,
50}
51
52impl Will {
53    /// Creates a new `Will` instance.
54    pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
55        Will {
56            topic: topic.into(),
57            payload,
58            qos,
59            retain,
60        }
61    }
62}
63
64impl WillFrame for Will {
65    /// Calculates the encoded length of the `Will` payload.
66    ///
67    /// The length includes the topic length, payload length, and their respective size prefixes.
68    fn encoded_len(&self) -> usize {
69        2 + self.topic.len() + 2 + self.payload.len()
70    }
71
72    /// Updates the connection flags to reflect the `Will` settings.
73    fn update_flags(&self, flags: &mut u8) {
74        // Update the 'Will' flag
75        flags.set_bit(WILL_FLAG, true);
76
77        // Update 'Qos' flags
78        flags.set_bits(WILL_QOS, self.qos as u8);
79
80        // Update the 'Will Retain' flag
81        flags.set_bit(WILL_RETAIN, self.retain);
82    }
83
84    /// Encodes the `Will` payload into a byte buffer.
85    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86        encode_string(buf, &self.topic);
87        encode_bytes(buf, &self.payload);
88        Ok(())
89    }
90
91    /// Decodes a `Will` payload from a byte buffer.
92    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
93        if !flags.get_bit(WILL_FLAG) {
94            // No 'Will' payload
95            return Ok(None);
96        }
97
98        let qos = flags.get_bits(WILL_QOS).try_into()?;
99        let retain = flags.get_bit(WILL_RETAIN);
100
101        let topic = decode_string(buf)?;
102        let message = decode_bytes(buf)?;
103        Ok(Some(Will::new(topic, message, qos, retain)))
104    }
105}
106
107/// A placeholder struct indicating that no properties are associated with the `Connect` packet.
108#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
109pub(crate) struct Propertyless;
110
111impl ConnectFrame for ConnectHeader<Propertyless> {
112    /// Calculates the encoded length of the `ConnectHeader`.
113    fn encoded_len(&self) -> usize {
114        self.primary_encoded_len()
115    }
116
117    /// Encodes the `ConnectHeader` into a byte buffer.
118    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
119        self.primary_encode(buf);
120        Ok(())
121    }
122
123    /// Decodes a `ConnectHeader` from a byte buffer.
124    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
125        Self::primary_decode(buf)
126    }
127}
128
129// Defines the `Connect` packet for MQTT V4
130connect!(Connect<Propertyless, Will>, Protocol::V4);
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::codec::PacketCodec;
136    use crate::codec::*;
137    use crate::protocol::*;
138    use bytes::{Bytes, BytesMut};
139    use tokio_util::codec::Decoder;
140
141    fn connect_sample() -> [u8; 43] {
142        [
143            (PacketType::Connect as u8) << 4, // Packet type
144            0x29,                             // Remaining len
145            0x00,                             // Protocol name len
146            0x04,
147            b'M', // Protocol name
148            b'Q',
149            b'T',
150            b'T',
151            Protocol::V4.into(), // Protocol level
152            0b1101_0110,         // Flags
153            0x00,                // Keep alive
154            0x10,
155            0x00, // Client ID
156            0x06,
157            b'c',
158            b'l',
159            b'i',
160            b'e',
161            b'n',
162            b't',
163            0x00, // Will topic
164            0x04,
165            b'/',
166            b'a',
167            b'b',
168            b'c',
169            0x00, // Will message
170            0x03,
171            b'b',
172            b'y',
173            b'e',
174            0x00, // Username
175            0x04,
176            b'u',
177            b's',
178            b'e',
179            b'r',
180            0x00, // Password
181            0x04,
182            b'p',
183            b'a',
184            b's',
185            b's',
186        ]
187    }
188
189    fn connect_packet() -> Connect {
190        let auth = Some(Credentials::login("user", "pass"));
191        let will = Some(Will::new(
192            "/abc",
193            Bytes::from("bye"),
194            QoS::ExactlyOnce,
195            false,
196        ));
197
198        Connect::new("client", auth, will, 16, true)
199    }
200
201    #[test]
202    fn connect_decode() {
203        let mut codec = PacketCodec::new(None, None);
204
205        let mut buf = BytesMut::new();
206
207        buf.extend_from_slice(&connect_sample());
208
209        let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
210        let packet = Connect::decode(raw_packet).unwrap();
211        assert_eq!(packet, connect_packet());
212    }
213
214    #[test]
215    fn connect_encode_v4() {
216        let packet = connect_packet();
217        let mut buf = BytesMut::new();
218        packet.encode(&mut buf).unwrap();
219        assert_eq!(buf, Vec::from(connect_sample()));
220    }
221}