mqute_codec/protocol/v4/
connect.rs

1//! # Connect Packet V4
2//!
3//! This module defines the `Will` struct, which represents the Last Will and Testament (LWT)
4//! feature in MQTT. It also implements the `WillFrame` trait for encoding and decoding the `Will`
5//! payload.
6
7use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
8use crate::protocol::common::{connect, ConnectHeader};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::{Protocol, QoS};
11use crate::Error;
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20/// Represents the Last Will and Testament (LWT) feature in MQTT.
21///
22/// The `Will` struct includes the topic, payload, QoS level, and retain flag for the LWT message.
23///
24/// # Example
25///
26/// ```rust
27/// use mqute_codec::protocol::v4::Will;
28/// use mqute_codec::protocol::QoS;
29/// use bytes::Bytes;
30///
31/// let will = Will::new("topic", Bytes::from("message"), QoS::AtLeastOnce, true);
32/// assert_eq!(will.topic, "topic");
33/// assert_eq!(will.payload, Bytes::from("message"));
34/// assert_eq!(will.qos, QoS::AtLeastOnce);
35/// assert_eq!(will.retain, true);
36/// ```
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct Will {
39    /// The topic to which the LWT message will be published.
40    pub topic: String,
41
42    /// The payload of the LWT message.
43    pub payload: Bytes,
44
45    /// The QoS level for the LWT message.
46    pub qos: QoS,
47
48    /// Whether the LWT message should be retained by the broker.
49    pub retain: bool,
50}
51
52impl Will {
53    /// Creates a new `Will` instance.
54    pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
55        Will {
56            topic: topic.into(),
57            payload,
58            qos,
59            retain,
60        }
61    }
62}
63
64impl WillFrame for Will {
65    /// Calculates the encoded length of the `Will` payload.
66    ///
67    /// The length includes the topic length, payload length, and their respective size prefixes.
68    fn encoded_len(&self) -> usize {
69        2 + self.topic.len() + 2 + self.payload.len()
70    }
71
72    /// Updates the connection flags to reflect the `Will` settings.
73    fn update_flags(&self, flags: &mut u8) {
74        // Update the 'Will' flag
75        flags.set_bit(WILL_FLAG, true);
76
77        // Update 'Qos' flags
78        flags.set_bits(WILL_QOS, self.qos as u8);
79
80        // Update the 'Will Retain' flag
81        flags.set_bit(WILL_RETAIN, self.retain);
82    }
83
84    /// Encodes the `Will` payload into a byte buffer.
85    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86        encode_string(buf, &self.topic);
87        encode_bytes(buf, &self.payload);
88        Ok(())
89    }
90
91    /// Decodes a `Will` payload from a byte buffer.
92    fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
93        if !flags.get_bit(WILL_FLAG) {
94            // No 'Will' payload
95            return Ok(None);
96        }
97
98        let qos = flags.get_bits(WILL_QOS).try_into()?;
99        let retain = flags.get_bit(WILL_RETAIN);
100
101        let topic = decode_string(buf)?;
102        let message = decode_bytes(buf)?;
103        Ok(Some(Will::new(topic, message, qos, retain)))
104    }
105}
106
107/// A placeholder struct indicating that no properties are associated with the `Connect` packet.
108#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
109pub(crate) struct Propertyless;
110
111impl ConnectFrame for ConnectHeader<Propertyless> {
112    /// Calculates the encoded length of the `ConnectHeader`.
113    fn encoded_len(&self) -> usize {
114        self.primary_encoded_len()
115    }
116
117    /// Encodes the `ConnectHeader` into a byte buffer.
118    fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
119        self.primary_encode(buf);
120        Ok(())
121    }
122
123    /// Decodes a `ConnectHeader` from a byte buffer.
124    fn decode(buf: &mut Bytes) -> Result<Self, Error> {
125        Self::primary_decode(buf)
126    }
127}
128
129// Defines the `Connect` packet for MQTT V4
130connect!(Connect<Propertyless, Will>, Protocol::V4);
131
132#[cfg(test)]
133mod tests {
134    use std::time::Duration;
135    use super::*;
136    use crate::codec::PacketCodec;
137    use crate::codec::*;
138    use crate::protocol::*;
139    use bytes::{Bytes, BytesMut};
140    use tokio_util::codec::Decoder;
141
142    fn connect_sample() -> [u8; 43] {
143        [
144            (PacketType::Connect as u8) << 4, // Packet type
145            0x29,                             // Remaining len
146            0x00,                             // Protocol name len
147            0x04,
148            b'M', // Protocol name
149            b'Q',
150            b'T',
151            b'T',
152            Protocol::V4.into(), // Protocol level
153            0b1101_0110,         // Flags
154            0x00,                // Keep alive
155            0x10,
156            0x00, // Client ID
157            0x06,
158            b'c',
159            b'l',
160            b'i',
161            b'e',
162            b'n',
163            b't',
164            0x00, // Will topic
165            0x04,
166            b'/',
167            b'a',
168            b'b',
169            b'c',
170            0x00, // Will message
171            0x03,
172            b'b',
173            b'y',
174            b'e',
175            0x00, // Username
176            0x04,
177            b'u',
178            b's',
179            b'e',
180            b'r',
181            0x00, // Password
182            0x04,
183            b'p',
184            b'a',
185            b's',
186            b's',
187        ]
188    }
189
190    fn connect_packet() -> Connect {
191        let auth = Some(Credentials::login("user", "pass"));
192        let will = Some(Will::new(
193            "/abc",
194            Bytes::from("bye"),
195            QoS::ExactlyOnce,
196            false,
197        ));
198
199        Connect::new("client", auth, will, Duration::from_secs(16), true)
200    }
201
202    #[test]
203    fn connect_decode() {
204        let mut codec = PacketCodec::new(None, None);
205
206        let mut buf = BytesMut::new();
207
208        buf.extend_from_slice(&connect_sample());
209
210        let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
211        let packet = Connect::decode(raw_packet).unwrap();
212        assert_eq!(packet, connect_packet());
213    }
214
215    #[test]
216    fn connect_encode_v4() {
217        let packet = connect_packet();
218        let mut buf = BytesMut::new();
219        packet.encode(&mut buf).unwrap();
220        assert_eq!(buf, Vec::from(connect_sample()));
221    }
222}