hebo_codec/v5/connect.rs
1// Copyright (c) 2020 Xu Shaohua <shaohua@biofan.org>. All rights reserved.
2// Use of this source is governed by Apache-2.0 License that can be found
3// in the LICENSE file.
4
5use std::convert::TryFrom;
6
7use super::property::check_property_type_list;
8use super::{Properties, PropertyType};
9use crate::base::PROTOCOL_NAME;
10use crate::connect_flags::ConnectFlags;
11use crate::utils::validate_client_id;
12use crate::{
13 validate_keep_alive, BinaryData, ByteArray, DecodeError, DecodePacket, EncodeError,
14 EncodePacket, FixedHeader, KeepAlive, Packet, PacketType, ProtocolLevel, PubTopic, QoS,
15 StringData, VarIntError,
16};
17
18/// `ConnectPacket` consists of three parts:
19/// * `FixedHeader`
20/// * `VariableHeader`
21/// * `Payload`
22///
23/// Note that fixed header part is same in all packets so that we just ignore it.
24///
25/// Basic struct of `ConnectPacket` is as below:
26/// ```txt
27/// 7 0
28/// +----------------------------+
29/// | Fixed header |
30/// | |
31/// +----------------------------+
32/// | Protocol level |
33/// +----------------------------+
34/// | Connect flags |
35/// +----------------------------+
36/// | Keep alive |
37/// | |
38/// +----------------------------+
39/// | Properties Length |
40/// +----------------------------+
41/// | Properties |
42/// | |
43/// +----------------------------+
44/// | Client id length |
45/// | |
46/// +----------------------------+
47/// | Client id string ... |
48/// +----------------------------+
49/// | Will Properties Length |
50/// +----------------------------+
51/// | Will Properties |
52/// | |
53/// +----------------------------+
54/// | Will topic length |
55/// | |
56/// +----------------------------+
57/// | Will topic string ... |
58/// +----------------------------+
59/// | Will message length |
60/// | |
61/// +----------------------------+
62/// | Will message bytes ... |
63/// +----------------------------+
64/// | Username length |
65/// | |
66/// +----------------------------+
67/// | Username string ... |
68/// +----------------------------+
69/// | Password length |
70/// | |
71/// +----------------------------+
72/// | Password bytes ... |
73/// +----------------------------+
74/// ```
75/// After a Network Connection is established by a Client to a Server, the first packet
76/// sent from the Client to the Server MUST be a CONNECT packet [MQTT-3.1.0-1].
77///
78/// A Client can only send the CONNECT packet once over a Network Connection. The Server MUST
79/// process a second CONNECT packet sent from a Client as a Protocol Error and close the Network
80/// Connection [MQTT-3.1.0-2].
81///
82/// The Payload of the CONNECT packet contains one or more length-prefixed fields,
83/// whose presence is determined by the flags in the Variable Header. These fields,
84/// if present, MUST appear in the order Client Identifier, Will Properties, Will Topic,
85/// Will Payload, User Name, Password [MQTT-3.1.3-1].
86#[allow(clippy::module_name_repetitions)]
87#[derive(Clone, Debug, Default, PartialEq, Eq)]
88pub struct ConnectPacket {
89 /// Protocol name can only be `MQTT` in specification.
90 protocol_name: StringData,
91
92 protocol_level: ProtocolLevel,
93
94 connect_flags: ConnectFlags,
95
96 /// The Keep Alive is a Two Byte Integer which is a time interval measured in seconds.
97 ///
98 /// It is the maximum time interval that is permitted to elapse between the point
99 /// at which the Client finishes transmitting one MQTT Control Packet and the point
100 /// it starts sending the next. It is the responsibility of the Client to ensure
101 /// that the interval between MQTT Control Packets being sent does not exceed the Keep Alive value.
102 /// If Keep Alive is non-zero and in the absence of sending any other MQTT Control Packets,
103 /// the Client MUST send a PINGREQ packet [MQTT-3.1.2-20].
104 ///
105 /// If the Server returns a Server Keep Alive on the CONNACK packet, the Client MUST
106 /// use that value instead of the value it sent as the Keep Alive [MQTT-3.1.2-21].
107 ///
108 /// The Client can send PINGREQ at any time, irrespective of the Keep Alive value,
109 /// and check for a corresponding PINGRESP to determine that the network and
110 /// the Server are available.
111 ///
112 /// If the Keep Alive value is non-zero and the Server does not receive an MQTT Control Packet
113 /// from the Client within one and a half times the Keep Alive time period,
114 /// it MUST close the Network Connection to the Client as if the network had failed [MQTT-3.1.2-22].
115 ///
116 /// If a Client does not receive a PINGRESP packet within a reasonable amount of time
117 /// after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server.
118 ///
119 /// A Keep Alive value of 0 has the effect of turning off the Keep Alive mechanism.
120 /// If Keep Alive is 0 the Client is not obliged to send MQTT Control Packets
121 /// on any particular schedule.
122 keep_alive: KeepAlive,
123
124 properties: Properties,
125
126 // <-- variable body begins -->
127 /// Payload is `client_id`.
128 /// `client_id` is generated in client side. Normally it can be `device_id` or just
129 /// randomly generated string.
130 /// `client_id` is used to identify client connections in server. Session is based on this field.
131 /// It must be valid UTF-8 string, length shall be between 1 and 23 bytes.
132 /// It can only contain the characters: "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
133 /// If `client_id` is invalid, the Server will reply ConnectAck Packet with return code
134 /// 0x02(Identifier rejected).
135 ///
136 /// The Client Identifier (ClientID) identifies the Client to the Server. Each Client
137 /// connecting to the Server has a unique ClientID. The ClientID MUST be used by Clients
138 /// and by Servers to identify state that they hold relating to this MQTT Session
139 /// between the Client and the Server [MQTT-3.1.3-2].
140 ///
141 /// The ClientID MUST be present and is the first field in the CONNECT packet Payload [MQTT-3.1.3-3].
142 ///
143 /// The ClientID MUST be a UTF-8 Encoded String [MQTT-3.1.3-4].
144 ///
145 /// The Server MUST allow ClientID’s which are between 1 and 23 UTF-8 encoded bytes
146 /// in length, and that contain only the characters
147 /// "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" [MQTT-3.1.3-5].
148 ///
149 /// A Server MAY allow a Client to supply a ClientID that has a length of zero bytes,
150 /// however if it does so the Server MUST treat this as a special case and
151 /// assign a unique ClientID to that Client [MQTT-3.1.3-6].
152 ///
153 /// It MUST then process the CONNECT packet as if the Client had provided
154 /// that unique ClientID, and MUST return the Assigned Client Identifier
155 /// in the CONNACK packet [MQTT-3.1.3-7]
156 ///
157 /// If the Server rejects the ClientID it MAY respond to the CONNECT packet
158 /// with a CONNACK using Reason Code 0x85 (Client Identifier not valid),
159 /// and then it MUST close the Network Connection [MQTT-3.1.3-8].
160 client_id: StringData,
161
162 /// If the Will Flag is set to 1, the Will Properties is the next field in the Payload.
163 ///
164 /// The Will Properties field defines the Application Message properties to be sent
165 /// with the Will Message when it is published, and properties which define
166 /// when to publish the Will Message. The Will Properties consists of
167 /// a Property Length and the Properties.
168 will_properties: Properties,
169
170 /// If the `will` flag is true in `connect_flags`, then `will_topic` field must be set.
171 /// It will be used as the topic of Will Message.
172 will_topic: Option<PubTopic>,
173
174 /// If the `will` flag is true in `connect_flags`, then `will_message` field must be set.
175 /// It will be used as the payload of Will Message.
176 /// It consists of 0 to 64k bytes of binary data.
177 will_message: BinaryData,
178
179 /// If the `username` flag is true in `connect_flags`, then `username` field must be set.
180 /// It is a valid UTF-8 string.
181 username: StringData,
182
183 /// If the `password` flag is true in `connect_flags`, then `password` field must be set.
184 /// It consists of 0 to 64k bytes of binary data.
185 password: BinaryData,
186}
187
188/// Properties available in connect packet.
189pub const CONNECT_PROPERTIES: &[PropertyType] = &[
190 PropertyType::SessionExpiryInterval,
191 PropertyType::ReceiveMaximum,
192 PropertyType::MaximumPacketSize,
193 PropertyType::TopicAliasMaximum,
194 PropertyType::RequestProblemInformation,
195 PropertyType::UserProperty,
196 PropertyType::AuthenticationMethod,
197 PropertyType::AuthenticationData,
198];
199
200/// Properties available in connect-will.
201pub const CONNECT_WILL_PROPERTIES: &[PropertyType] = &[
202 PropertyType::WillDelayInterval,
203 PropertyType::PayloadFormatIndicator,
204 PropertyType::MessageExpiryInterval,
205 PropertyType::ContentType,
206 PropertyType::ResponseTopic,
207 PropertyType::CorrelationData,
208 PropertyType::UserProperty,
209];
210
211impl ConnectPacket {
212 /// Create a new connect packet.
213 ///
214 /// # Errors
215 ///
216 /// Returns error if `client_id` is invalid.
217 pub fn new(client_id: &str) -> Result<Self, EncodeError> {
218 let protocol_name = StringData::from(PROTOCOL_NAME)?;
219 validate_client_id(client_id).map_err(|_err| EncodeError::InvalidClientId)?;
220 let client_id = StringData::from(client_id)?;
221 Ok(Self {
222 protocol_name,
223 keep_alive: KeepAlive::new(60),
224 client_id,
225 ..Self::default()
226 })
227 }
228
229 /// Update protocol level.
230 pub fn set_protcol_level(&mut self, level: ProtocolLevel) -> &mut Self {
231 self.protocol_level = level;
232 self
233 }
234
235 /// Get current mqtt protocol level.
236 #[must_use]
237 pub const fn protocol_level(&self) -> ProtocolLevel {
238 self.protocol_level
239 }
240
241 /// Update connect flags
242 pub fn set_connect_flags(&mut self, flags: ConnectFlags) -> &Self {
243 self.connect_flags = flags;
244 self
245 }
246
247 /// Get current connect flags.
248 #[must_use]
249 #[inline]
250 pub const fn connect_flags(&self) -> &ConnectFlags {
251 &self.connect_flags
252 }
253
254 /// Update keep-alive value.
255 pub fn set_keep_alive(&mut self, keep_alive: u16) -> &mut Self {
256 self.keep_alive = KeepAlive::new(keep_alive);
257 self
258 }
259
260 /// Get current keep-alive value.
261 #[must_use]
262 pub const fn keep_alive(&self) -> u16 {
263 self.keep_alive.value()
264 }
265
266 /// Update will-retain flag.
267 pub fn set_will_retain(&mut self, will_retain: bool) -> &mut Self {
268 self.connect_flags.set_will_retain(will_retain);
269 self
270 }
271
272 /// Get current will-retain flag.
273 #[must_use]
274 pub const fn will_retain(&self) -> bool {
275 self.connect_flags.will_retain()
276 }
277
278 /// Update will-qos value.
279 pub fn set_will_qos(&mut self, qos: QoS) -> &mut Self {
280 self.connect_flags.set_will_qos(qos);
281 self
282 }
283
284 /// Get current will-qos value.
285 #[must_use]
286 pub const fn will_qos(&self) -> QoS {
287 self.connect_flags.will_qos()
288 }
289
290 /// Update will flag.
291 pub fn set_will(&mut self, will: bool) -> &mut Self {
292 self.connect_flags.set_will(will);
293 self
294 }
295
296 /// Get current will flag.
297 #[must_use]
298 pub const fn will(&self) -> bool {
299 self.connect_flags.will()
300 }
301
302 /// Update clean-session flag.
303 pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
304 self.connect_flags.set_clean_session(clean_session);
305 self
306 }
307
308 /// Get clean-session flag.
309 #[must_use]
310 pub const fn clean_session(&self) -> bool {
311 self.connect_flags.clean_session()
312 }
313
314 /// Get a mutable reference to property list.
315 pub fn properties_mut(&mut self) -> &mut Properties {
316 &mut self.properties
317 }
318
319 /// Get a reference to property list.
320 #[must_use]
321 pub const fn properties(&self) -> &Properties {
322 &self.properties
323 }
324
325 /// Update client id.
326 ///
327 /// # Errors
328 ///
329 /// Returns error if `client_id` is invalid.
330 pub fn set_client_id(&mut self, client_id: &str) -> Result<&mut Self, EncodeError> {
331 validate_client_id(client_id).map_err(|_err| EncodeError::InvalidClientId)?;
332 self.client_id = StringData::from(client_id)?;
333 Ok(self)
334 }
335
336 /// Get current client id.
337 #[must_use]
338 pub fn client_id(&self) -> &str {
339 self.client_id.as_ref()
340 }
341
342 /// Update username value.
343 ///
344 /// # Errors
345 ///
346 /// Returns error if `username` is out of range.
347 pub fn set_username(&mut self, username: Option<&str>) -> Result<&mut Self, DecodeError> {
348 if let Some(username) = username {
349 self.username = StringData::from(username)?;
350 self.connect_flags.set_has_username(true);
351 } else {
352 self.connect_flags.set_has_username(false);
353 self.username = StringData::new();
354 }
355 Ok(self)
356 }
357
358 /// Get current username value.
359 #[must_use]
360 pub fn username(&self) -> &str {
361 self.username.as_ref()
362 }
363
364 /// Update password value.
365 ///
366 /// # Errors
367 ///
368 /// Returns error if `password` is out of range.
369 pub fn set_password(&mut self, password: Option<&[u8]>) -> Result<&mut Self, EncodeError> {
370 if let Some(password) = password {
371 self.connect_flags.set_has_password(true);
372 self.password = BinaryData::from_slice(password)?;
373 } else {
374 self.connect_flags.set_has_password(false);
375 self.password.clear();
376 }
377 Ok(self)
378 }
379
380 /// Get current password value.
381 #[must_use]
382 pub fn password(&self) -> &[u8] {
383 self.password.as_ref()
384 }
385
386 /// Get a mutable reference to will property list.
387 pub fn will_properties_mut(&mut self) -> &mut Properties {
388 &mut self.will_properties
389 }
390
391 /// Get a reference to will property list.
392 #[must_use]
393 pub const fn will_properties(&self) -> &Properties {
394 &self.will_properties
395 }
396
397 /// Update will topic.
398 ///
399 /// # Errors
400 ///
401 /// Returns error if `topic` is invalid.
402 pub fn set_will_topic(&mut self, topic: &str) -> Result<&mut Self, EncodeError> {
403 if topic.is_empty() {
404 self.will_topic = None;
405 } else {
406 self.will_topic = Some(PubTopic::new(topic)?);
407 }
408 Ok(self)
409 }
410
411 /// Get current will topic.
412 pub fn will_topic(&self) -> Option<&str> {
413 self.will_topic.as_ref().map(AsRef::as_ref)
414 }
415
416 /// Update will message bytes.
417 ///
418 /// # Errors
419 ///
420 /// Returns error if `message` is out of range.
421 pub fn set_will_message(&mut self, message: &[u8]) -> Result<&mut Self, EncodeError> {
422 self.will_message = BinaryData::from_slice(message)?;
423 Ok(self)
424 }
425
426 /// Get will message bytes.
427 #[must_use]
428 pub fn will_message(&self) -> &[u8] {
429 self.will_message.as_ref()
430 }
431
432 fn get_fixed_header(&self) -> Result<FixedHeader, VarIntError> {
433 let mut remaining_length = self.protocol_name.bytes()
434 + ProtocolLevel::bytes()
435 + ConnectFlags::bytes()
436 + KeepAlive::bytes()
437 + self.client_id.bytes();
438
439 // Check username/password/topic/message.
440 if self.connect_flags.will() {
441 assert!(self.will_topic.is_some());
442 if let Some(will_topic) = &self.will_topic {
443 remaining_length += will_topic.bytes();
444 }
445 remaining_length += self.will_message.bytes();
446 }
447 if self.connect_flags.has_username() {
448 remaining_length += self.username.bytes();
449 }
450 if self.connect_flags.has_password() {
451 remaining_length += self.password.bytes();
452 }
453
454 FixedHeader::new(PacketType::Connect, remaining_length)
455 }
456}
457
458impl EncodePacket for ConnectPacket {
459 fn encode(&self, v: &mut Vec<u8>) -> Result<usize, EncodeError> {
460 let old_len = v.len();
461
462 // Write fixed header
463 let fixed_header = self.get_fixed_header()?;
464 fixed_header.encode(v)?;
465
466 // Write variable header
467 self.protocol_name.encode(v)?;
468 self.protocol_level.encode(v)?;
469 self.connect_flags.encode(v)?;
470 self.keep_alive.encode(v)?;
471
472 // Write payload
473 self.client_id.encode(v)?;
474
475 if self.connect_flags.will() {
476 assert!(self.will_topic.is_some());
477 if let Some(will_topic) = &self.will_topic {
478 will_topic.encode(v)?;
479 }
480
481 self.will_message.encode(v)?;
482 }
483 if self.connect_flags.has_username() {
484 self.username.encode(v)?;
485 }
486 if self.connect_flags.has_password() {
487 self.password.encode(v)?;
488 }
489
490 Ok(v.len() - old_len)
491 }
492}
493
494impl DecodePacket for ConnectPacket {
495 fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
496 let fixed_header = FixedHeader::decode(ba)?;
497 if fixed_header.packet_type() != PacketType::Connect {
498 return Err(DecodeError::InvalidPacketType);
499 }
500
501 // A Server which support multiple protocols uses the Protocol Name to determine
502 // whether the data is MQTT. The protocol name MUST be the UTF-8 String "MQTT".
503 // If the Server does not want to accept the CONNECT, and wishes to reveal that
504 // it is an MQTT Server it MAY send a CONNACK packet with
505 // Reason Code of 0x84 (Unsupported Protocol Version), and then
506 // it MUST close the Network Connection [MQTT-3.1.2-1].
507 let protocol_name = StringData::decode(ba)?;
508 if protocol_name.as_ref() != PROTOCOL_NAME {
509 return Err(DecodeError::InvalidProtocolName);
510 }
511
512 // A Server which supports multiple versions of the MQTT protocol
513 // uses the Protocol Version to determine which version of MQTT
514 // the Client is using. If the Protocol Version is not 5 and the Server does not want
515 // to accept the CONNECT packet, the Server MAY send a CONNACK packet
516 // with Reason Code 0x84 (Unsupported Protocol Version) and then
517 // MUST close the Network Connection [MQTT-3.1.2-2].
518 let protocol_level = ProtocolLevel::try_from(ba.read_byte()?)?;
519
520 let connect_flags = ConnectFlags::decode(ba)?;
521 if !connect_flags.will()
522 && (connect_flags.will_qos() != QoS::AtMostOnce || connect_flags.will_retain())
523 {
524 return Err(DecodeError::InvalidConnectFlags);
525 }
526
527 // If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].
528 if !connect_flags.has_username() && connect_flags.has_password() {
529 return Err(DecodeError::InvalidConnectFlags);
530 }
531
532 let keep_alive = KeepAlive::decode(ba)?;
533 validate_keep_alive(keep_alive)?;
534
535 let properties = Properties::decode(ba);
536 let properties = match properties {
537 Ok(properties) => properties,
538 Err(err) => {
539 log::error!("err: {:?}", err);
540 return Err(DecodeError::InvalidPropertyType);
541 }
542 };
543 if let Err(property_type) = check_property_type_list(properties.props(), CONNECT_PROPERTIES)
544 {
545 log::error!(
546 "v5/ConnectPacket: property type {:?} cannot be used in properties!",
547 property_type
548 );
549 return Err(DecodeError::InvalidPropertyType);
550 }
551
552 let client_id = StringData::decode(ba).map_err(|_err| DecodeError::InvalidClientId)?;
553 if client_id.is_empty() && !connect_flags.clean_session() {
554 // If clean_session is false, a client_id is always required.
555 return Err(DecodeError::InvalidClientId);
556 }
557 validate_client_id(client_id.as_ref())?;
558
559 let will_properties = if connect_flags.will() {
560 Properties::decode(ba)?
561 } else {
562 Properties::new()
563 };
564 if let Err(property_type) =
565 check_property_type_list(will_properties.props(), CONNECT_WILL_PROPERTIES)
566 {
567 log::error!(
568 "v5/ConnectPacket: property type {:?} cannot be used in will properties!",
569 property_type
570 );
571 return Err(DecodeError::InvalidPropertyType);
572 }
573
574 let will_topic = if connect_flags.will() {
575 Some(PubTopic::decode(ba)?)
576 } else {
577 None
578 };
579 let will_message = if connect_flags.will() {
580 BinaryData::decode(ba)?
581 } else {
582 BinaryData::new()
583 };
584
585 let username = if connect_flags.has_username() {
586 StringData::decode(ba)?
587 } else {
588 StringData::new()
589 };
590
591 let password = if connect_flags.has_password() {
592 BinaryData::decode(ba)?
593 } else {
594 BinaryData::new()
595 };
596
597 Ok(Self {
598 protocol_name,
599 protocol_level,
600 connect_flags,
601 keep_alive,
602 properties,
603 client_id,
604 will_properties,
605 will_topic,
606 will_message,
607 username,
608 password,
609 })
610 }
611}
612
613impl Packet for ConnectPacket {
614 fn packet_type(&self) -> PacketType {
615 PacketType::Connect
616 }
617
618 fn bytes(&self) -> Result<usize, VarIntError> {
619 let fixed_header = self.get_fixed_header()?;
620 Ok(fixed_header.bytes() + fixed_header.remaining_length())
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::{ByteArray, ConnectPacket, DecodePacket};
627
628 #[test]
629 fn test_decode() {
630 let buf: Vec<u8> = vec![
631 0x10, 0x15, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05, 0x02, 0x00, 0x3c, 0x00, 0x00,
632 0x08, 0x77, 0x76, 0x50, 0x54, 0x58, 0x63, 0x43, 0x77,
633 ];
634 let mut ba = ByteArray::new(&buf);
635 let packet = ConnectPacket::decode(&mut ba);
636 assert!(packet.is_ok());
637 let packet = packet.unwrap();
638 assert_eq!(packet.client_id(), "wvPTXcCw");
639 }
640}