mqtt_packet_3_5/
connect.rs

1use crate::byte_reader::ByteReader;
2use crate::mqtt_writer::MqttWriter;
3use crate::structure::*;
4#[cfg(feature = "serde_support")]
5use serde::{Deserialize, Serialize};
6use std::io;
7
8const MQISDP_BUF: [u8; 6] = [b'M', b'Q', b'I', b's', b'd', b'p'];
9const MQTT_BUF: [u8; 4] = [b'M', b'Q', b'T', b'T'];
10
11impl Packet for ConnectPacket {
12    /// This
13    fn encode(&self, _: u8) -> Res<Vec<u8>> {
14        let ConnectPacket {
15            properties,
16            protocol_id,
17            protocol_version,
18            password,
19            client_id,
20            will,
21            clean_session,
22            keep_alive,
23            user_name,
24            ..
25        } = self;
26        let protocol_version = *protocol_version;
27        let mut length = 0;
28
29        // add protocol length
30        length += 2 + match protocol_id {
31            Protocol::Mqtt => 4,
32            Protocol::MQIsdp => 6,
33        };
34
35        // Must be 3 or 4 or 5
36        if let 3 | 4 | 5 = protocol_version {
37            length += 1;
38        } else {
39            return Err("Invalid protocol version".to_string());
40        }
41
42        // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
43        if (client_id.is_empty() && protocol_version >= 4 && *clean_session)
44            || !client_id.is_empty()
45        {
46            length += client_id.len() + 2;
47        } else {
48            if protocol_version < 4 {
49                return Err("client_id must be supplied before 3.1.1".to_string());
50            }
51            if !clean_session {
52                return Err("client_id must be given if clean_session set to false".to_string());
53            }
54        }
55
56        // "keep_alive" Must be a two byte number
57        // also add connect flags
58        length += 2 + 1;
59
60        // mqtt5 properties
61        let (props_len, properties_data) =
62            Properties::encode_option(properties.as_ref(), protocol_version)?;
63        length += properties_data.len() + props_len.len();
64
65        // If will exists...
66        let mut will_retain = false;
67        let mut will_qos = None;
68        let mut has_will = false;
69        let mut will_properties = vec![];
70        let mut will_props_len = vec![];
71        let mut will_topic: &str = "";
72        let mut will_payload: &str = "";
73        if let Some(will) = will {
74            let LastWill {
75                topic,
76                payload,
77                properties,
78                qos,
79                retain,
80            } = will;
81            has_will = true;
82            will_retain = *retain;
83            will_qos = Some(qos);
84            // It must have non-empty topic
85            // add topic length if any
86            if let Some(t) = topic.as_ref() {
87                if t.is_empty() {
88                    return Err("Not allowed to use empty will topic".to_string());
89                }
90                will_topic = t;
91                length += t.len() + 2;
92            }
93            // Payload
94            length += 2; // payload length
95            if let Some(data) = payload.as_ref() {
96                will_payload = data;
97                length += data.len();
98            }
99            // will properties
100            if protocol_version == 5 {
101                let (l, w) = Properties::encode_option(properties.as_ref(), protocol_version)?;
102                will_properties = w;
103                will_props_len = l;
104                length += will_properties.len() + will_props_len.len();
105            }
106        }
107
108        // Username
109        let mut has_username = false;
110        if let Some(user_name) = &user_name {
111            has_username = true;
112            length += user_name.len() + 2;
113        }
114
115        // Password
116        let mut has_password = false;
117        if let Some(pass) = &password {
118            if !has_username {
119                return Err("Username is required to use password".to_string());
120            }
121            has_password = true;
122            length += pass.len() + 2;
123        }
124
125        let mut writer = MqttWriter::new(length);
126        // write header
127        writer.write_u8(FixedHeader::for_type(PacketType::Connect).encode());
128        // length
129        writer.write_variable_num(length as u32)?;
130        // protocol id and protocol version
131        let proto_vec = match protocol_id {
132            Protocol::MQIsdp => MQISDP_BUF.to_vec(),
133            Protocol::Mqtt => MQTT_BUF.to_vec(),
134        };
135        writer.write_u16(proto_vec.len() as u16);
136        writer.write_vec(proto_vec);
137        writer.write_u8(protocol_version);
138        // write connect flags
139        writer.write_u8(
140            ((has_username as u8) * 0x80) //user_name:  0x80 = (1 << 7)
141            | ((has_password as u8) * 0x40) //password:  0x40 = (1 << 6)
142            | ((will_retain as u8) * 0x20)  //will_retain:  0x20 = (1 << 5)
143            | ((*will_qos.unwrap_or(&0) << 3) & 0x18)     //will_qos:  0x18 = 24 = ((1 << 4) + (1 << 3)),
144            | ((has_will as u8) * 0x4) //will:  0x4 = 1 << 2
145            | ((*clean_session as u8) * 0x2), //clean_session:  0x2 = 1 << 2)
146        );
147        // write keep alive
148        writer.write_u16(*keep_alive);
149
150        writer.write_sized(&properties_data, &props_len)?;
151        // client id
152        writer.write_utf8_str(client_id);
153        // will properties
154        if protocol_version == 5 {
155            writer.write_sized(&will_properties, &will_props_len)?;
156        }
157        // will topic and payload
158        if has_will {
159            writer.write_utf8_str(will_topic);
160            writer.write_utf8_str(will_payload);
161        }
162
163        // username
164        if let Some(u) = user_name {
165            writer.write_utf8_str(u);
166        }
167        // password
168        if let Some(p) = password {
169            writer.write_utf8_str(p);
170        }
171        Ok(writer.into_vec())
172    }
173
174    /// Decode connect packet
175    fn decode<R: io::Read>(reader: &mut ByteReader<R>, _: FixedHeader, _: u32, _: u8) -> Res<Self> {
176        // Parse protocolId
177        let protocol_id = reader.read_utf8_string()?;
178        let protocol_id = Protocol::from_source(&protocol_id)?;
179        // Parse constants version number
180        let mut protocol_version = reader.read_u8()?;
181        if !reader.has_more() {
182            return Err("Packet too short".to_string());
183        }
184
185        if protocol_version >= 128 {
186            //   packet.bridgeMode = true
187            protocol_version -= 128
188        }
189
190        if protocol_version != 3 && protocol_version != 4 && protocol_version != 5 {
191            return Err("Invalid protocol version".to_string());
192        }
193
194        let (connect_flags, last_will) = ConnectFlags::from_byte(reader.read_u8()?)?;
195        // Parse keepalive
196        let keep_alive = reader.read_u16()?;
197        let connect_properties = if protocol_version == 5 {
198            match reader.read_properties()? {
199                None => None,
200                Some(props) => Some(ConnectProperties::from_properties(props)?),
201            }
202        } else {
203            None
204        };
205        // Start parsing payload
206        // Parse client_id
207        let client_id = reader.read_utf8_string()?;
208        let last_will = if let (Some(mut will), true) = (last_will, connect_flags.will) {
209            if protocol_version == 5 {
210                will.properties = match reader.read_properties()? {
211                    None => None,
212                    Some(props) => Some(WillProperties::from_properties(props)?),
213                };
214            }
215            // Parse will topic
216            will.topic = Some(reader.read_utf8_string()?);
217            // Parse will payload
218            will.payload = Some(reader.read_utf8_string()?);
219            Some(will)
220        } else {
221            // since connect_flags.will = false, we don't really care about the last will
222            None
223        };
224
225        // Parse username
226        let mut user_name = None;
227        if connect_flags.user_name {
228            user_name = Some(reader.read_utf8_string()?);
229        }
230
231        // Parse password
232        let mut password = None;
233        if connect_flags.password {
234            password = Some(reader.read_utf8_string()?);
235        }
236        // need for right parse auth packet and self set up
237        Ok(ConnectPacket {
238            client_id,
239            protocol_version,
240            protocol_id,
241            clean_session: connect_flags.clean_session,
242            keep_alive,
243            properties: connect_properties,
244            user_name,
245            password,
246            will: last_will,
247        })
248    }
249}
250
251#[derive(Debug, PartialEq, Clone, Copy)]
252#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
253pub struct ConnectFlags {
254    pub user_name: bool,
255    pub password: bool,
256    pub will_retain: bool,
257    pub will_qos: u8,
258    pub will: bool,
259    pub clean_session: bool,
260}
261
262impl ConnectFlags {
263    pub fn new(byte: u8) -> ConnectFlags {
264        ConnectFlags {
265            user_name: (byte & 0x80) != 0,    // 0x80 = (1 << 7)
266            password: (byte & 0x40) != 0,     // 0x40 = (1 << 6)
267            will_retain: (byte & 0x20) != 0,  // 0x20 = (1 << 5)
268            will_qos: (byte & 0x18) >> 3,     // 0x18 = 24 = ((1 << 4) + (1 << 3)),
269            will: (byte & 0x4) != 0,          // 0x4 = 1 << 2
270            clean_session: (byte & 0x2) != 0, // 0x2 = 1 << 2
271        }
272    }
273
274    pub fn from_byte(connect_flags: u8) -> Result<(ConnectFlags, Option<LastWill>), String> {
275        if connect_flags & 0x1 == 1 {
276            // The Server MUST validate that the reserved flag in the CONNECT Control Packet is set to zero and disconnect the Client if it is not zero [MQTT-3.1.2-3]
277            return Err("Connect flag bit 0 must be 0, but got 1".to_string());
278        }
279        let connect_flags = ConnectFlags::new(connect_flags);
280
281        if !connect_flags.will {
282            if connect_flags.will_retain {
283                return Err(
284                    "Will Retain Flag must be set to zero when Will Flag is set to 0".to_string(),
285                );
286            }
287            if connect_flags.will_qos != 0 {
288                return Err("Will QoS must be set to zero when Will Flag is set to 0".to_string());
289            }
290        }
291
292        let will_flags = if connect_flags.will {
293            Some(LastWill {
294                topic: None,
295                payload: None,
296                qos: connect_flags.will_qos,
297                retain: connect_flags.will_retain,
298                properties: None,
299            })
300        } else {
301            None
302        };
303        Ok((connect_flags, will_flags))
304    }
305}