mqtt_packet_3_5/
connect.rs1use crate::byte_reader::ByteReader;
2use crate::mqtt_writer::MqttWriter;
3use crate::structure::*;
4#[cfg(feature = "serde_support")]
5use serde::{Deserialize, Serialize};
6use std::io;
7
8const MQISDP_BUF: [u8; 6] = [b'M', b'Q', b'I', b's', b'd', b'p'];
9const MQTT_BUF: [u8; 4] = [b'M', b'Q', b'T', b'T'];
10
11impl Packet for ConnectPacket {
12 fn encode(&self, _: u8) -> Res<Vec<u8>> {
14 let ConnectPacket {
15 properties,
16 protocol_id,
17 protocol_version,
18 password,
19 client_id,
20 will,
21 clean_session,
22 keep_alive,
23 user_name,
24 ..
25 } = self;
26 let protocol_version = *protocol_version;
27 let mut length = 0;
28
29 length += 2 + match protocol_id {
31 Protocol::Mqtt => 4,
32 Protocol::MQIsdp => 6,
33 };
34
35 if let 3 | 4 | 5 = protocol_version {
37 length += 1;
38 } else {
39 return Err("Invalid protocol version".to_string());
40 }
41
42 if (client_id.is_empty() && protocol_version >= 4 && *clean_session)
44 || !client_id.is_empty()
45 {
46 length += client_id.len() + 2;
47 } else {
48 if protocol_version < 4 {
49 return Err("client_id must be supplied before 3.1.1".to_string());
50 }
51 if !clean_session {
52 return Err("client_id must be given if clean_session set to false".to_string());
53 }
54 }
55
56 length += 2 + 1;
59
60 let (props_len, properties_data) =
62 Properties::encode_option(properties.as_ref(), protocol_version)?;
63 length += properties_data.len() + props_len.len();
64
65 let mut will_retain = false;
67 let mut will_qos = None;
68 let mut has_will = false;
69 let mut will_properties = vec![];
70 let mut will_props_len = vec![];
71 let mut will_topic: &str = "";
72 let mut will_payload: &str = "";
73 if let Some(will) = will {
74 let LastWill {
75 topic,
76 payload,
77 properties,
78 qos,
79 retain,
80 } = will;
81 has_will = true;
82 will_retain = *retain;
83 will_qos = Some(qos);
84 if let Some(t) = topic.as_ref() {
87 if t.is_empty() {
88 return Err("Not allowed to use empty will topic".to_string());
89 }
90 will_topic = t;
91 length += t.len() + 2;
92 }
93 length += 2; if let Some(data) = payload.as_ref() {
96 will_payload = data;
97 length += data.len();
98 }
99 if protocol_version == 5 {
101 let (l, w) = Properties::encode_option(properties.as_ref(), protocol_version)?;
102 will_properties = w;
103 will_props_len = l;
104 length += will_properties.len() + will_props_len.len();
105 }
106 }
107
108 let mut has_username = false;
110 if let Some(user_name) = &user_name {
111 has_username = true;
112 length += user_name.len() + 2;
113 }
114
115 let mut has_password = false;
117 if let Some(pass) = &password {
118 if !has_username {
119 return Err("Username is required to use password".to_string());
120 }
121 has_password = true;
122 length += pass.len() + 2;
123 }
124
125 let mut writer = MqttWriter::new(length);
126 writer.write_u8(FixedHeader::for_type(PacketType::Connect).encode());
128 writer.write_variable_num(length as u32)?;
130 let proto_vec = match protocol_id {
132 Protocol::MQIsdp => MQISDP_BUF.to_vec(),
133 Protocol::Mqtt => MQTT_BUF.to_vec(),
134 };
135 writer.write_u16(proto_vec.len() as u16);
136 writer.write_vec(proto_vec);
137 writer.write_u8(protocol_version);
138 writer.write_u8(
140 ((has_username as u8) * 0x80) | ((has_password as u8) * 0x40) | ((will_retain as u8) * 0x20) | ((*will_qos.unwrap_or(&0) << 3) & 0x18) | ((has_will as u8) * 0x4) | ((*clean_session as u8) * 0x2), );
147 writer.write_u16(*keep_alive);
149
150 writer.write_sized(&properties_data, &props_len)?;
151 writer.write_utf8_str(client_id);
153 if protocol_version == 5 {
155 writer.write_sized(&will_properties, &will_props_len)?;
156 }
157 if has_will {
159 writer.write_utf8_str(will_topic);
160 writer.write_utf8_str(will_payload);
161 }
162
163 if let Some(u) = user_name {
165 writer.write_utf8_str(u);
166 }
167 if let Some(p) = password {
169 writer.write_utf8_str(p);
170 }
171 Ok(writer.into_vec())
172 }
173
174 fn decode<R: io::Read>(reader: &mut ByteReader<R>, _: FixedHeader, _: u32, _: u8) -> Res<Self> {
176 let protocol_id = reader.read_utf8_string()?;
178 let protocol_id = Protocol::from_source(&protocol_id)?;
179 let mut protocol_version = reader.read_u8()?;
181 if !reader.has_more() {
182 return Err("Packet too short".to_string());
183 }
184
185 if protocol_version >= 128 {
186 protocol_version -= 128
188 }
189
190 if protocol_version != 3 && protocol_version != 4 && protocol_version != 5 {
191 return Err("Invalid protocol version".to_string());
192 }
193
194 let (connect_flags, last_will) = ConnectFlags::from_byte(reader.read_u8()?)?;
195 let keep_alive = reader.read_u16()?;
197 let connect_properties = if protocol_version == 5 {
198 match reader.read_properties()? {
199 None => None,
200 Some(props) => Some(ConnectProperties::from_properties(props)?),
201 }
202 } else {
203 None
204 };
205 let client_id = reader.read_utf8_string()?;
208 let last_will = if let (Some(mut will), true) = (last_will, connect_flags.will) {
209 if protocol_version == 5 {
210 will.properties = match reader.read_properties()? {
211 None => None,
212 Some(props) => Some(WillProperties::from_properties(props)?),
213 };
214 }
215 will.topic = Some(reader.read_utf8_string()?);
217 will.payload = Some(reader.read_utf8_string()?);
219 Some(will)
220 } else {
221 None
223 };
224
225 let mut user_name = None;
227 if connect_flags.user_name {
228 user_name = Some(reader.read_utf8_string()?);
229 }
230
231 let mut password = None;
233 if connect_flags.password {
234 password = Some(reader.read_utf8_string()?);
235 }
236 Ok(ConnectPacket {
238 client_id,
239 protocol_version,
240 protocol_id,
241 clean_session: connect_flags.clean_session,
242 keep_alive,
243 properties: connect_properties,
244 user_name,
245 password,
246 will: last_will,
247 })
248 }
249}
250
251#[derive(Debug, PartialEq, Clone, Copy)]
252#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
253pub struct ConnectFlags {
254 pub user_name: bool,
255 pub password: bool,
256 pub will_retain: bool,
257 pub will_qos: u8,
258 pub will: bool,
259 pub clean_session: bool,
260}
261
262impl ConnectFlags {
263 pub fn new(byte: u8) -> ConnectFlags {
264 ConnectFlags {
265 user_name: (byte & 0x80) != 0, password: (byte & 0x40) != 0, will_retain: (byte & 0x20) != 0, will_qos: (byte & 0x18) >> 3, will: (byte & 0x4) != 0, clean_session: (byte & 0x2) != 0, }
272 }
273
274 pub fn from_byte(connect_flags: u8) -> Result<(ConnectFlags, Option<LastWill>), String> {
275 if connect_flags & 0x1 == 1 {
276 return Err("Connect flag bit 0 must be 0, but got 1".to_string());
278 }
279 let connect_flags = ConnectFlags::new(connect_flags);
280
281 if !connect_flags.will {
282 if connect_flags.will_retain {
283 return Err(
284 "Will Retain Flag must be set to zero when Will Flag is set to 0".to_string(),
285 );
286 }
287 if connect_flags.will_qos != 0 {
288 return Err("Will QoS must be set to zero when Will Flag is set to 0".to_string());
289 }
290 }
291
292 let will_flags = if connect_flags.will {
293 Some(LastWill {
294 topic: None,
295 payload: None,
296 qos: connect_flags.will_qos,
297 retain: connect_flags.will_retain,
298 properties: None,
299 })
300 } else {
301 None
302 };
303 Ok((connect_flags, will_flags))
304 }
305}