1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
use crate::{
    codec, Authentication, ClientID, Error, PropertiesDecoder, Property, QoS, Result as SageResult,
    DEFAULT_PAYLOAD_FORMAT_INDICATOR, DEFAULT_RECEIVE_MAXIMUM, DEFAULT_REQUEST_PROBLEM_INFORMATION,
    DEFAULT_REQUEST_RESPONSE_INFORMATION, DEFAULT_SESSION_EXPIRY_INTERVAL,
    DEFAULT_TOPIC_ALIAS_MAXIMUM, DEFAULT_WILL_DELAY_INTERVAL,
};
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use std::convert::TryInto;
use std::marker::Unpin;

/// Due to the unstable nature of a connexion, the client can loose its
/// connection to the server. This ungraceful disconnect can be notified
/// to every other clients by specifying a Last Will message that is given
/// upon connection.
/// When a client ungracefully disconnect from a server (when the keep alive
/// is reached), the server will publish the Last Will message to anyone
/// subscribed to its topic.
#[derive(Debug, PartialEq, Clone)]
pub struct Will {
    /// The quality of service for the will message.
    pub qos: QoS,

    /// If the message is to be retained. A retain message is kept
    /// in memory by a broker (one per topic) to sent to future subscriptions.
    pub retain: bool,

    /// Delay in seconds the broket will wait after a deconnction before
    /// publishing the will message. The will message can also be published
    /// at session expires if it happens first.
    pub delay_interval: u32,

    /// If true, the will message will be a valid UTF-8 encoded string. If not
    /// the will message can be anything, even a unicorn.
    pub payload_format_indicator: bool,

    /// Corresponds to the expiry interval of the `Publish` message sent.
    pub message_expiry_interval: Option<u32>,

    /// Describes the type of content of the payload. Is generally a MIME
    /// descriptor.
    pub content_type: String,

    /// Optional topic used as response if the Will message is a request.
    pub response_topic: Option<String>,

    /// Optional correlation optionaly used if the Will message is a request.
    pub correlation_data: Option<Vec<u8>>,

    /// General purpose properties
    pub user_properties: Vec<(String, String)>,

    /// The Last Will Topic. Cannot be empty.
    pub topic: String,

    /// The last will payload.
    pub message: Vec<u8>,
}

impl Default for Will {
    fn default() -> Self {
        Will {
            qos: QoS::AtMostOnce,
            retain: false,
            delay_interval: DEFAULT_WILL_DELAY_INTERVAL,
            payload_format_indicator: DEFAULT_PAYLOAD_FORMAT_INDICATOR,
            message_expiry_interval: None,
            content_type: Default::default(),
            response_topic: None,
            correlation_data: None,
            user_properties: Default::default(),
            topic: Default::default(),
            message: Default::default(),
        }
    }
}

/// The `Connect` control packet is used to open a session. It is the first
/// Packet a client must send to a server once the connection is established.
/// A _Connect_ packet can only be sent once for each connection.
///
/// # Session and connection
///
/// A unique connection can only send a _Connect_ packet once. If the server
/// received a second _Connect_ packet over a same connection, it is considered
/// as a protocol error.
/// Yet, a same session can continue accross different sequences of connections.
/// In that case, `clean_start` must be set to `false` (default) to continue the
/// session.
///
/// # Client identifier
///
/// The client identifier is a server-unique `String` used to identify the
/// client accross operations. It is possible not to give a client identifier
/// to the server by setting `client_id` to either `None` or an empty string.
/// In that case the server will decide itself for an identifier and return
/// it into the _CONNACK_ packet.
#[derive(PartialEq, Debug, Clone)]
pub struct Connect {
    /// If set, the server will start a new session and drop any existing one
    /// if any.
    pub clean_start: bool,

    /// An optional user name to send to the server.
    pub user_name: Option<String>,

    /// An option password to send to the server.
    pub password: Option<Vec<u8>>,

    /// Specifies the maximum amount of time the client and the server may not
    /// communicate with each other. This value is expressed in seconds.
    /// If the server does not receive any packet from the client in one and
    /// a half times this interval, it will close the connection. Likewise, the
    /// client will close the connection under the same condition. The default
    /// keep alive value is `600` (10mn).
    /// Not that the keep alive mechanism is deactivated if the value is `0`.
    pub keep_alive: u16,

    /// Once the connection is closed, the client and server still keep the
    /// session active during a certain amount of time expressed in seconds.
    /// - If the value is `0` (default) the session ends when the connection is closed.
    /// - If the value is `0xFFFFFFFF` the session never expires.
    /// The client can override the session expiry interval within the
    /// DISCONNECT packet.
    pub session_expiry_interval: u32,

    /// This value sets the maximum number of _AtLeastOnce_ and _ExactlyOnce_
    /// packets that should be processed concurrently.
    /// There is no such limit for QoS `AtMostOnce` packets.
    /// The default value is `65_535`
    pub receive_maximum: u16,

    /// Defines the maximum size per packet the client is willing to receive
    /// from the server. It is a procotol error to send a packet which size
    /// exceeds this value and the client is expected to disconnect from the
    /// server with a `PacketTooLarge` error.
    /// This value cannot be `0`. Sending or receiving a CONNECT packet with a
    /// `maximum_packet_size` of value `0` is a procotol error.
    /// `maximum_packet_size` is `None` (default), there is no size limit.
    pub maximum_packet_size: Option<u32>,

    /// Topic aliases are a way to reduce the size of packets by substituting
    /// aliases (which are strings) to integer values.
    /// The number of aliases allowed by the client from the server is defined
    /// with the `topic_alias_maximum`. It can be `0`, meaning aliases are
    /// entirely disallowed.
    pub topic_alias_maximum: u16,

    /// This flag can be set to ask the server to send back response information
    /// that can be used as an hint by the client to determine a response topic
    /// used in Request/Response type communication.
    /// This is only an optional hint and the server is allowed not to send any
    /// reponse information even if the value of the field is `true`.
    /// By default, `request_response_information` is `false`.
    pub request_response_information: bool,

    /// In any packet sent by the server that contains a `ReasonCode`, the
    /// latter can be described using a reason string or user properties. These
    /// are called "problem information".
    /// If `request_problem_information` is `true` the server is allowed to
    /// sent problem information in any packet with a `ReasonCode`.
    /// If `false` (default), the server is only allowed to send problem
    /// information on `Publish`, `Connack` and `Disconnect` packets.
    pub request_problem_information: bool,

    /// General purpose properties
    /// By default, a Connect packet has no properties.
    pub user_properties: Vec<(String, String)>,

    /// Define an `Authentication` structure to provide enhanced authentication.
    /// By default, `authentication` is `None`, which means no or basic
    /// authentication using only `user_name` and `password`.
    pub authentication: Option<Authentication>,

    /// The client id is an identifier that uniquely represents the client
    /// from the server point of view. The client id is used to ensure `AtLeastOnce`
    /// and `ExactlyOnce` qualities of service.
    /// A client id is mandatory within a session. Yet, the `Connect` packet
    /// may omit if by setting `client_id` to `None` (default). In that case
    /// the id is created by the server and returns to the client with the
    /// `Connack`  packet.
    pub client_id: Option<ClientID>,

    /// The client's Last Will to send in case of ungraceful disconnection.
    /// This is optional and default is `None`.
    pub will: Option<Will>,
}

impl Default for Connect {
    fn default() -> Self {
        Connect {
            clean_start: false,
            user_name: None,
            password: Default::default(),
            keep_alive: 600,
            session_expiry_interval: DEFAULT_SESSION_EXPIRY_INTERVAL,
            receive_maximum: DEFAULT_RECEIVE_MAXIMUM,
            maximum_packet_size: None,
            topic_alias_maximum: DEFAULT_TOPIC_ALIAS_MAXIMUM,
            request_response_information: DEFAULT_REQUEST_RESPONSE_INFORMATION,
            request_problem_information: DEFAULT_REQUEST_PROBLEM_INFORMATION,
            user_properties: Default::default(),
            authentication: None,
            client_id: None,
            will: None,
        }
    }
}

struct ConnectFlags {
    pub clean_start: bool,
    pub will: bool,
    pub will_qos: QoS,
    pub will_retain: bool,
    pub user_name: bool,
    pub password: bool,
}

impl Connect {
    /// Encodes the variable header and payload of the `Connect` packet into
    /// `w`, returning the number of bytes written in case of success.
    /// This function does not encode the fixed header part. To generate a
    /// full write, use `ControlPacket`.
    ///
    /// In case of failure, the underlying system can raise a `std::io::Error`.
    /// If the data are not valid according to MQTT 5 specifications, the
    /// function will return a `ProtocolError`.
    pub async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
        // Variable Header (into content)
        let mut n_bytes = codec::write_utf8_string("MQTT", writer).await?;
        n_bytes += codec::write_byte(0x05, writer).await?;

        n_bytes += ConnectFlags {
            clean_start: self.clean_start,
            will: self.will.is_some(),
            will_qos: if let Some(w) = &self.will {
                w.qos
            } else {
                QoS::AtMostOnce
            },
            will_retain: if let Some(w) = &self.will {
                w.retain
            } else {
                false
            },
            user_name: self.user_name.is_some(),
            password: self.password.is_some(),
        }
        .write(writer)
        .await?;

        n_bytes += codec::write_two_byte_integer(self.keep_alive, writer).await?;

        // Properties
        let mut properties = Vec::new();
        n_bytes += Property::SessionExpiryInterval(self.session_expiry_interval)
            .encode(&mut properties)
            .await?;
        n_bytes += Property::ReceiveMaximum(self.receive_maximum)
            .encode(&mut properties)
            .await?;
        if let Some(maximum_packet_size) = self.maximum_packet_size {
            n_bytes += Property::MaximumPacketSize(maximum_packet_size)
                .encode(&mut properties)
                .await?;
        }
        n_bytes += Property::TopicAliasMaximum(self.topic_alias_maximum)
            .encode(&mut properties)
            .await?;
        n_bytes += Property::RequestResponseInformation(self.request_response_information)
            .encode(&mut properties)
            .await?;
        n_bytes += Property::RequestProblemInformation(self.request_problem_information)
            .encode(&mut properties)
            .await?;
        for (k, v) in self.user_properties {
            n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
        }

        if let Some(authentication) = self.authentication {
            n_bytes += authentication.write(writer).await?;
        }

        n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
        writer.write_all(&properties).await?;

        // Payload
        if let Some(client_id) = self.client_id {
            if client_id.len() > 23 || client_id.chars().any(|c| c < '0' || c > 'z') {
                return Err(Error::MalformedPacket);
            }
            n_bytes += codec::write_utf8_string(&client_id, writer).await?;
        } else {
            // Still write empty client id
            n_bytes += codec::write_utf8_string("", writer).await?;
        }

        if let Some(w) = self.will {
            let mut properties = Vec::new();

            n_bytes += Property::WillDelayInterval(w.delay_interval)
                .encode(&mut properties)
                .await?;
            n_bytes += Property::PayloadFormatIndicator(w.payload_format_indicator)
                .encode(&mut properties)
                .await?;
            if let Some(v) = w.message_expiry_interval {
                n_bytes += Property::MessageExpiryInterval(v)
                    .encode(&mut properties)
                    .await?;
            }
            n_bytes += Property::ContentType(w.content_type)
                .encode(&mut properties)
                .await?;
            if let Some(response_topic) = w.response_topic {
                n_bytes += Property::ResponseTopic(response_topic)
                    .encode(&mut properties)
                    .await?;
            }
            if let Some(v) = w.correlation_data {
                n_bytes += Property::CorrelationData(v).encode(&mut properties).await?;
            }
            for (k, v) in w.user_properties {
                n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
            }

            n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
            writer.write_all(&properties).await?;

            if w.topic.is_empty() {
                return Err(Error::ProtocolError);
            }
            n_bytes += codec::write_utf8_string(&w.topic, writer).await?;
            n_bytes += codec::write_binary_data(&w.message, writer).await?;
        }

        if let Some(v) = self.user_name {
            n_bytes += codec::write_utf8_string(&v, writer).await?;
        }

        if let Some(v) = self.password {
            n_bytes += codec::write_binary_data(&v, writer).await?;
        }

        Ok(n_bytes)
    }

    /// Read the variable header and payload part of a connect packet from `reader`
    /// and returns a `Connect` in case of success.
    /// This function does not read the fixed header part of the packet, which
    /// is read using `ControlPacket`.
    ///
    /// The function can send a `ProtocolError` in case of invalid data or
    /// any `std::io::Error` returned by the underlying system.
    pub async fn read<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
        let protocol_name = codec::read_utf8_string(reader).await?;
        if protocol_name != "MQTT" {
            return Err(Error::MalformedPacket);
        }

        let protocol_version = codec::read_byte(reader).await?;
        if protocol_version != 0x05 {
            return Err(Error::MalformedPacket);
        }

        let flags = ConnectFlags::read(reader).await?;

        let clean_start = flags.clean_start;

        let keep_alive = codec::read_two_byte_integer(reader).await?;

        let mut session_expiry_interval = DEFAULT_SESSION_EXPIRY_INTERVAL;
        let mut receive_maximum = DEFAULT_RECEIVE_MAXIMUM;
        let mut maximum_packet_size = None;
        let mut topic_alias_maximum = DEFAULT_TOPIC_ALIAS_MAXIMUM;
        let mut request_response_information = DEFAULT_REQUEST_RESPONSE_INFORMATION;
        let mut request_problem_information = DEFAULT_REQUEST_PROBLEM_INFORMATION;
        let mut user_properties = Vec::new();
        let mut authentication_method = None;
        let mut authentication_data = Default::default();

        let mut decoder = PropertiesDecoder::take(reader).await?;

        while decoder.has_properties() {
            match decoder.read().await? {
                Property::SessionExpiryInterval(v) => session_expiry_interval = v,
                Property::ReceiveMaximum(v) => receive_maximum = v,
                Property::MaximumPacketSize(v) => maximum_packet_size = Some(v),
                Property::TopicAliasMaximum(v) => topic_alias_maximum = v,
                Property::RequestResponseInformation(v) => request_response_information = v,
                Property::RequestProblemInformation(v) => request_problem_information = v,
                Property::AuthenticationMethod(v) => authentication_method = Some(v),
                Property::AuthenticationData(v) => authentication_data = v,
                Property::UserProperty(k, v) => user_properties.push((k, v)),
                _ => return Err(Error::ProtocolError),
            };
        }
        let reader = decoder.into_inner();

        let authentication = if let Some(method) = authentication_method {
            Some(Authentication {
                method,
                data: authentication_data,
            })
        } else {
            if !authentication_data.is_empty() {
                return Err(Error::ProtocolError);
            }
            None
        };

        // Payload
        let client_id = {
            let client_id = codec::read_utf8_string(reader).await?;
            if client_id.is_empty() {
                None
            } else {
                if client_id.len() > 23 || client_id.chars().any(|c| c < '0' || c > 'z') {
                    return Err(Error::MalformedPacket);
                }
                Some(client_id)
            }
        };

        let (reader, will) = if flags.will {
            let mut decoder = PropertiesDecoder::take(reader).await?;
            let mut w = Will::default();
            w.qos = flags.will_qos;
            while decoder.has_properties() {
                match decoder.read().await? {
                    Property::WillDelayInterval(v) => w.delay_interval = v,
                    Property::PayloadFormatIndicator(v) => w.payload_format_indicator = v,
                    Property::MessageExpiryInterval(v) => w.message_expiry_interval = Some(v),
                    Property::ContentType(v) => w.content_type = v,
                    Property::ResponseTopic(v) => w.response_topic = Some(v),
                    Property::CorrelationData(v) => w.correlation_data = Some(v),
                    Property::UserProperty(k, v) => w.user_properties.push((k, v)),
                    _ => return Err(Error::ProtocolError),
                }
            }
            let reader = decoder.into_inner();
            w.topic = codec::read_utf8_string(reader).await?;
            if w.topic.is_empty() {
                return Err(Error::ProtocolError);
            }
            w.message = codec::read_binary_data(reader).await?;
            (reader, Some(w))
        } else {
            (reader, None)
        };

        let user_name = if flags.user_name {
            Some(codec::read_utf8_string(reader).await?)
        } else {
            None
        };

        let password = if flags.password {
            Some(codec::read_binary_data(reader).await?)
        } else {
            None
        };

        Ok(Connect {
            clean_start,
            user_name,
            password,
            keep_alive,
            session_expiry_interval,
            receive_maximum,
            maximum_packet_size,
            topic_alias_maximum,
            request_response_information,
            request_problem_information,
            authentication,
            user_properties,
            client_id,
            will,
        })
    }
}

impl ConnectFlags {
    pub async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
        let bits = ((self.user_name as u8) << 7)
            | ((self.password as u8) << 6)
            | ((self.will_retain as u8) << 5)
            | (self.will_qos as u8) << 3
            | ((self.will as u8) << 2)
            | ((self.clean_start as u8) << 1);
        codec::write_byte(bits, writer).await
    }

    pub async fn read<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
        let bits = codec::read_byte(reader).await?;

        if bits & 0x01 != 0 {
            Err(Error::MalformedPacket)
        } else {
            Ok(ConnectFlags {
                user_name: (bits & 0b1000_0000) >> 7 > 0,
                password: (bits & 0b0100_0000) >> 6 > 0,
                will_retain: (bits & 0b0010_0000) >> 5 > 0,
                will_qos: ((bits & 0b0001_1000) >> 3).try_into()?,
                will: (bits & 0b0000_00100) >> 2 > 0,
                clean_start: (bits & 0b0000_00010) >> 1 > 0,
            })
        }
    }
}

#[cfg(test)]
mod unit_connect {

    use super::*;
    use async_std::io::Cursor;

    fn encoded() -> Vec<u8> {
        vec![
            0, 4, 77, 81, 84, 84, 5, 206, 0, 10, 5, 17, 0, 0, 0, 10, 0, 0, 3, 3, 0, 0, 0, 6, 67,
            108, 111, 90, 101, 101, 0, 0, 0, 6, 87, 105, 108, 108, 111, 119, 0, 5, 74, 97, 100,
            101, 110,
        ]
    }

    fn decoded() -> Connect {
        let keep_alive = 10;
        let session_expiry_interval = 10;

        Connect {
            keep_alive,
            clean_start: true,
            session_expiry_interval,
            user_name: Some("Willow".into()),
            password: Some("Jaden".into()),
            will: Some(Will {
                qos: QoS::AtLeastOnce,
                topic: "CloZee".into(),
                ..Default::default()
            }),
            ..Default::default()
        }
    }

    #[async_std::test]
    async fn encode() {
        let test_data = decoded();
        let mut tested_result = Vec::new();

        let n_bytes = test_data.write(&mut tested_result).await.unwrap();
        assert_eq!(tested_result, encoded());
        assert_eq!(n_bytes, 47);
    }

    #[async_std::test]
    async fn decode() {
        let mut test_data = Cursor::new(encoded());
        let tested_result = Connect::read(&mut test_data).await.unwrap();
        assert_eq!(tested_result, decoded());
    }
}