spotflow_rumqttc_fork/
lib.rs

1//! A pure rust MQTT client which strives to be robust, efficient and easy to use.
2//! This library is backed by an async (tokio) eventloop which handles all the
3//! robustness and and efficiency parts of MQTT but naturally fits into both sync
4//! and async worlds as we'll see
5//!
6//! Let's jump into examples right away
7//!
8//! A simple synchronous publish and subscribe
9//! ----------------------------
10//!
11//! ```no_run
12//! use rumqttc::{MqttOptions, Client, QoS};
13//! use std::time::Duration;
14//! use std::thread;
15//!
16//! let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
17//! mqttoptions.set_keep_alive(Duration::from_secs(5));
18//!
19//! let (mut client, mut connection) = Client::new(mqttoptions, 10);
20//! client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
21//! thread::spawn(move || for i in 0..10 {
22//!    client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
23//!    thread::sleep(Duration::from_millis(100));
24//! });
25//!
26//! // Iterate to poll the eventloop for connection progress
27//! for (i, notification) in connection.iter().enumerate() {
28//!     println!("Notification = {:?}", notification);
29//! }
30//! ```
31//!
32//! A simple asynchronous publish and subscribe
33//! ------------------------------
34//!
35//! ```no_run
36//! use rumqttc::{MqttOptions, AsyncClient, QoS};
37//! use tokio::{task, time};
38//! use std::time::Duration;
39//! use std::error::Error;
40//!
41//! # #[tokio::main(worker_threads = 1)]
42//! # async fn main() {
43//! let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
44//! mqttoptions.set_keep_alive(Duration::from_secs(5));
45//!
46//! let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
47//! client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();
48//!
49//! task::spawn(async move {
50//!     for i in 0..10 {
51//!         client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
52//!         time::sleep(Duration::from_millis(100)).await;
53//!     }
54//! });
55//!
56//! loop {
57//!     let notification = eventloop.poll().await.unwrap();
58//!     println!("Received = {:?}", notification);
59//! }
60//! # }
61//! ```
62//!
63//! Quick overview of features
64//! - Eventloop orchestrates outgoing/incoming packets concurrently and hadles the state
65//! - Pings the broker when necessary and detects client side half open connections as well
66//! - Throttling of outgoing packets (todo)
67//! - Queue size based flow control on outgoing packets
68//! - Automatic reconnections by just continuing the `eventloop.poll()/connection.iter()` loop`
69//! - Natural backpressure to client APIs during bad network
70//! - Immediate cancellation with `client.cancel()`
71//!
72//! In short, everything necessary to maintain a robust connection
73//!
74//! Since the eventloop is externally polled (with `iter()/poll()` in a loop)
75//! out side the library and `Eventloop` is accessible, users can
76//! - Distribute incoming messages based on topics
77//! - Stop it when required
78//! - Access internal state for use cases like graceful shutdown or to modify options before reconnection
79//!
80//! ## Important notes
81//!
82//! - Looping on `connection.iter()`/`eventloop.poll()` is necessary to run the
83//!   event loop and make progress. It yields incoming and outgoing activity
84//!   notifications which allows customization as you see fit.
85//!
86//! - Blocking inside the `connection.iter()`/`eventloop.poll()` loop will block
87//!   connection progress.
88//!
89//! ## FAQ
90//! **Connecting to a broker using raw ip doesn't work**
91//!
92//! You cannot create a TLS connection to a bare IP address with a self-signed
93//! certificate. This is a [limitation of rustls](https://github.com/ctz/rustls/issues/184).
94//! One workaround, which only works under *nix/BSD-like systems, is to add an
95//! entry to wherever your DNS resolver looks (e.g. `/etc/hosts`) for the bare IP
96//! address and use that name in your code.
97#![cfg_attr(docsrs, feature(doc_cfg))]
98
99#[macro_use]
100extern crate log;
101
102use std::fmt::{self, Debug, Formatter};
103#[cfg(feature = "use-rustls")]
104use std::sync::Arc;
105use std::time::Duration;
106
107mod client;
108mod eventloop;
109mod framed;
110pub mod mqttbytes;
111mod state;
112#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
113mod tls;
114
115pub use async_channel::{SendError, Sender, TrySendError};
116pub use client::{AsyncClient, Client, ClientError, Connection};
117pub use eventloop::{ConnectionError, Event, EventLoop};
118pub use mqttbytes::v4::*;
119pub use mqttbytes::*;
120pub use state::{MqttState, StateError};
121#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
122pub use tls::Error as TlsError;
123#[cfg(feature = "use-rustls")]
124pub use tokio_rustls::rustls::ClientConfig;
125
126pub type Incoming = Packet;
127
128/// Current outgoing activity on the eventloop
129#[derive(Debug, Eq, PartialEq, Clone)]
130pub enum Outgoing {
131    /// Publish packet with packet identifier. 0 implies QoS 0
132    Publish(u16, String),
133    /// Subscribe packet with packet identifier
134    Subscribe(u16),
135    /// Unsubscribe packet with packet identifier
136    Unsubscribe(u16),
137    /// PubAck packet
138    PubAck(u16),
139    /// PubRec packet
140    PubRec(u16),
141    /// PubRel packet
142    PubRel(u16),
143    /// PubComp packet
144    PubComp(u16),
145    /// Ping request packet
146    PingReq,
147    /// Ping response packet
148    PingResp,
149    /// Disconnect packet
150    Disconnect,
151    /// Await for an ack for more outgoing progress
152    AwaitAck(u16),
153}
154
155/// Requests by the client to mqtt event loop. Request are
156/// handled one by one.
157#[derive(Clone, Debug, PartialEq)]
158pub enum Request {
159    Publish(Publish),
160    PubAck(PubAck),
161    PubRec(PubRec),
162    PubComp(PubComp),
163    PubRel(PubRel),
164    PingReq,
165    PingResp,
166    Subscribe(Subscribe),
167    SubAck(SubAck),
168    Unsubscribe(Unsubscribe),
169    UnsubAck(UnsubAck),
170    Disconnect,
171}
172
173/// Key type for TLS authentication
174#[derive(Clone, Debug, Eq, PartialEq)]
175pub enum Key {
176    RSA(Vec<u8>),
177    ECC(Vec<u8>),
178}
179
180impl From<Publish> for Request {
181    fn from(publish: Publish) -> Request {
182        Request::Publish(publish)
183    }
184}
185
186impl From<Subscribe> for Request {
187    fn from(subscribe: Subscribe) -> Request {
188        Request::Subscribe(subscribe)
189    }
190}
191
192impl From<Unsubscribe> for Request {
193    fn from(unsubscribe: Unsubscribe) -> Request {
194        Request::Unsubscribe(unsubscribe)
195    }
196}
197
198#[derive(Clone)]
199pub enum Transport {
200    Tcp,
201    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
202    Tls(TlsConfiguration),
203    #[cfg(unix)]
204    Unix,
205    #[cfg(feature = "websocket")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
207    Ws,
208    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
209    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
210    Wss(TlsConfiguration),
211}
212
213impl Default for Transport {
214    fn default() -> Self {
215        Self::tcp()
216    }
217}
218
219impl Transport {
220    /// Use regular tcp as transport (default)
221    pub fn tcp() -> Self {
222        Self::Tcp
223    }
224
225    #[cfg(feature = "use-rustls")]
226    /// Use secure tcp with tls as transport
227    #[cfg(feature = "use-rustls")]
228    pub fn tls(
229        ca: Vec<u8>,
230        client_auth: Option<(Vec<u8>, Key)>,
231        alpn: Option<Vec<Vec<u8>>>,
232    ) -> Self {
233        let config = TlsConfiguration::Simple {
234            ca,
235            alpn,
236            client_auth,
237        };
238
239        Self::tls_with_config(config)
240    }
241
242    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
243    pub fn tls_with_config(tls_config: TlsConfiguration) -> Self {
244        Self::Tls(tls_config)
245    }
246
247    #[cfg(unix)]
248    pub fn unix() -> Self {
249        Self::Unix
250    }
251
252    /// Use websockets as transport
253    #[cfg(feature = "websocket")]
254    #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
255    pub fn ws() -> Self {
256        Self::Ws
257    }
258
259    /// Use secure websockets with tls as transport
260    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
261    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
262    pub fn wss(
263        ca: Vec<u8>,
264        client_auth: Option<(Vec<u8>, Key)>,
265        alpn: Option<Vec<Vec<u8>>>,
266    ) -> Self {
267        let config = TlsConfiguration::Simple {
268            ca,
269            client_auth,
270            alpn,
271        };
272
273        Self::wss_with_config(config)
274    }
275
276    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
277    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
278    pub fn wss_with_config(tls_config: TlsConfiguration) -> Self {
279        Self::Wss(tls_config)
280    }
281}
282
283#[derive(Clone)]
284#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
285pub enum TlsConfiguration {
286    #[cfg(feature = "use-rustls")]
287    Simple {
288        /// connection method
289        ca: Vec<u8>,
290        /// alpn settings
291        alpn: Option<Vec<Vec<u8>>>,
292        /// tls client_authentication
293        client_auth: Option<(Vec<u8>, Key)>,
294    },
295    #[cfg(feature = "use-rustls")]
296    /// Injected rustls ClientConfig for TLS, to allow more customisation.
297    Rustls(Arc<ClientConfig>),
298    #[cfg(feature = "use-native-tls")]
299    Native,
300}
301
302#[cfg(feature = "use-rustls")]
303impl From<ClientConfig> for TlsConfiguration {
304    fn from(config: ClientConfig) -> Self {
305        TlsConfiguration::Rustls(Arc::new(config))
306    }
307}
308
309// TODO: Should all the options be exposed as public? Drawback
310// would be loosing the ability to panic when the user options
311// are wrong (e.g empty client id) or aggressive (keep alive time)
312/// Options to configure the behaviour of mqtt connection
313#[derive(Clone)]
314pub struct MqttOptions {
315    /// broker address that you want to connect to
316    broker_addr: String,
317    /// broker port
318    port: u16,
319    // What transport protocol to use
320    transport: Transport,
321    /// keep alive time to send pingreq to broker when the connection is idle
322    keep_alive: Duration,
323    /// clean (or) persistent session
324    clean_session: bool,
325    /// client identifier
326    client_id: String,
327    /// username and password
328    credentials: Option<(String, String)>,
329    /// maximum incoming packet size (verifies remaining length of the packet)
330    max_incoming_packet_size: usize,
331    /// Maximum outgoing packet size (only verifies publish payload size)
332    // TODO Verify this with all packets. This can be packet.write but message left in
333    // the state might be a footgun as user has to explicitly clean it. Probably state
334    // has to be moved to network
335    max_outgoing_packet_size: usize,
336    /// request (publish, subscribe) channel capacity
337    request_channel_capacity: usize,
338    /// Max internal request batching
339    max_request_batch: usize,
340    /// Minimum delay time between consecutive outgoing packets
341    /// while retransmitting pending packets
342    pending_throttle: Duration,
343    /// maximum number of outgoing inflight messages
344    inflight: u16,
345    /// Last will that will be issued on unexpected disconnect
346    last_will: Option<LastWill>,
347    /// Connection timeout
348    conn_timeout: u64,
349    /// If set to `true` MQTT acknowledgements are not sent automatically.
350    /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
351    manual_acks: bool,
352}
353
354impl MqttOptions {
355    /// New mqtt options
356    pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
357        let id = id.into();
358        if id.starts_with(' ') || id.is_empty() {
359            panic!("Invalid client id")
360        }
361
362        MqttOptions {
363            broker_addr: host.into(),
364            port,
365            transport: Transport::tcp(),
366            keep_alive: Duration::from_secs(60),
367            clean_session: true,
368            client_id: id,
369            credentials: None,
370            max_incoming_packet_size: 10 * 1024,
371            max_outgoing_packet_size: 10 * 1024,
372            request_channel_capacity: 10,
373            max_request_batch: 0,
374            pending_throttle: Duration::from_micros(0),
375            inflight: 100,
376            last_will: None,
377            conn_timeout: 5,
378            manual_acks: false,
379        }
380    }
381
382    /// Broker address
383    pub fn broker_address(&self) -> (String, u16) {
384        (self.broker_addr.clone(), self.port)
385    }
386
387    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
388        self.last_will = Some(will);
389        self
390    }
391
392    pub fn last_will(&self) -> Option<LastWill> {
393        self.last_will.clone()
394    }
395
396    pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
397        self.transport = transport;
398        self
399    }
400
401    pub fn transport(&self) -> Transport {
402        self.transport.clone()
403    }
404
405    /// Set number of seconds after which client should ping the broker
406    /// if there is no other data exchange
407    pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
408        if duration.as_secs() < 5 {
409            panic!("Keep alives should be >= 5  secs");
410        }
411
412        self.keep_alive = duration;
413        self
414    }
415
416    /// Keep alive time
417    pub fn keep_alive(&self) -> Duration {
418        self.keep_alive
419    }
420
421    /// Client identifier
422    pub fn client_id(&self) -> String {
423        self.client_id.clone()
424    }
425
426    /// Set packet size limit for outgoing an incoming packets
427    pub fn set_max_packet_size(&mut self, incoming: usize, outgoing: usize) -> &mut Self {
428        self.max_incoming_packet_size = incoming;
429        self.max_outgoing_packet_size = outgoing;
430        self
431    }
432
433    /// Maximum packet size
434    pub fn max_packet_size(&self) -> usize {
435        self.max_incoming_packet_size
436    }
437
438    /// `clean_session = true` removes all the state from queues & instructs the broker
439    /// to clean all the client state when client disconnects.
440    ///
441    /// When set `false`, broker will hold the client state and performs pending
442    /// operations on the client when reconnection with same `client_id`
443    /// happens. Local queue state is also held to retransmit packets after reconnection.
444    pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
445        self.clean_session = clean_session;
446        self
447    }
448
449    /// Clean session
450    pub fn clean_session(&self) -> bool {
451        self.clean_session
452    }
453
454    /// Username and password
455    pub fn set_credentials<U: Into<String>, P: Into<String>>(
456        &mut self,
457        username: U,
458        password: P,
459    ) -> &mut Self {
460        self.credentials = Some((username.into(), password.into()));
461        self
462    }
463
464    /// Security options
465    pub fn credentials(&self) -> Option<(String, String)> {
466        self.credentials.clone()
467    }
468
469    /// Set request channel capacity
470    pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
471        self.request_channel_capacity = capacity;
472        self
473    }
474
475    /// Request channel capacity
476    pub fn request_channel_capacity(&self) -> usize {
477        self.request_channel_capacity
478    }
479
480    /// Enables throttling and sets outoing message rate to the specified 'rate'
481    pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
482        self.pending_throttle = duration;
483        self
484    }
485
486    /// Outgoing message rate
487    pub fn pending_throttle(&self) -> Duration {
488        self.pending_throttle
489    }
490
491    /// Set number of concurrent in flight messages
492    pub fn set_inflight(&mut self, inflight: u16) -> &mut Self {
493        if inflight == 0 {
494            panic!("zero in flight is not allowed")
495        }
496
497        self.inflight = inflight;
498        self
499    }
500
501    /// Number of concurrent in flight messages
502    pub fn inflight(&self) -> u16 {
503        self.inflight
504    }
505
506    /// set connection timeout in secs
507    pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
508        self.conn_timeout = timeout;
509        self
510    }
511
512    /// get timeout in secs
513    pub fn connection_timeout(&self) -> u64 {
514        self.conn_timeout
515    }
516
517    /// set manual acknowledgements
518    pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
519        self.manual_acks = manual_acks;
520        self
521    }
522
523    /// get manual acknowledgements
524    pub fn manual_acks(&self) -> bool {
525        self.manual_acks
526    }
527}
528
529#[cfg(feature = "url")]
530#[derive(Debug, PartialEq, thiserror::Error)]
531pub enum OptionError {
532    #[error("Unsupported URL scheme.")]
533    Scheme,
534
535    #[error("Missing client ID.")]
536    ClientId,
537
538    #[error("Invalid keep-alive value.")]
539    KeepAlive,
540
541    #[error("Invalid clean-session value.")]
542    CleanSession,
543
544    #[error("Invalid max-incoming-packet-size value.")]
545    MaxIncomingPacketSize,
546
547    #[error("Invalid max-outgoing-packet-size value.")]
548    MaxOutgoingPacketSize,
549
550    #[error("Invalid request-channel-capacity value.")]
551    RequestChannelCapacity,
552
553    #[error("Invalid max-request-batch value.")]
554    MaxRequestBatch,
555
556    #[error("Invalid pending-throttle value.")]
557    PendingThrottle,
558
559    #[error("Invalid inflight value.")]
560    Inflight,
561
562    #[error("Invalid conn-timeout value.")]
563    ConnTimeout,
564
565    #[error("Unknown option: {0}")]
566    Unknown(String),
567}
568
569#[cfg(feature = "url")]
570impl std::convert::TryFrom<url::Url> for MqttOptions {
571    type Error = OptionError;
572
573    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
574        use std::collections::HashMap;
575
576        let broker_addr = url.host_str().unwrap_or_default().to_owned();
577
578        let (transport, default_port) = match url.scheme() {
579            // Encrypted connections are supported, but require explicit TLS configuration. We fall
580            // back to the unencrypted transport layer, so that `set_transport` can be used to
581            // configure the encrypted transport layer with the provided TLS configuration.
582            "mqtts" | "ssl" => (Transport::Tcp, 8883),
583            "mqtt" | "tcp" => (Transport::Tcp, 1883),
584            _ => return Err(OptionError::Scheme),
585        };
586
587        let port = url.port().unwrap_or(default_port);
588
589        let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
590
591        let keep_alive = Duration::from_secs(
592            queries
593                .remove("keep_alive_secs")
594                .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
595                .transpose()?
596                .unwrap_or(60),
597        );
598
599        let client_id = queries
600            .remove("client_id")
601            .ok_or(OptionError::ClientId)?
602            .into_owned();
603
604        let clean_session = queries
605            .remove("clean_session")
606            .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanSession))
607            .transpose()?
608            .unwrap_or(true);
609
610        let credentials = {
611            match url.username() {
612                "" => None,
613                username => Some((
614                    username.to_owned(),
615                    url.password().unwrap_or_default().to_owned(),
616                )),
617            }
618        };
619
620        let max_incoming_packet_size = queries
621            .remove("max_incoming_packet_size_bytes")
622            .map(|v| {
623                v.parse::<usize>()
624                    .map_err(|_| OptionError::MaxIncomingPacketSize)
625            })
626            .transpose()?
627            .unwrap_or(10 * 1024);
628
629        let max_outgoing_packet_size = queries
630            .remove("max_outgoing_packet_size_bytes")
631            .map(|v| {
632                v.parse::<usize>()
633                    .map_err(|_| OptionError::MaxOutgoingPacketSize)
634            })
635            .transpose()?
636            .unwrap_or(10 * 1024);
637
638        let request_channel_capacity = queries
639            .remove("request_channel_capacity_num")
640            .map(|v| {
641                v.parse::<usize>()
642                    .map_err(|_| OptionError::RequestChannelCapacity)
643            })
644            .transpose()?
645            .unwrap_or(10);
646
647        let max_request_batch = queries
648            .remove("max_request_batch_num")
649            .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
650            .transpose()?
651            .unwrap_or(0);
652
653        let pending_throttle = Duration::from_micros(
654            queries
655                .remove("pending_throttle_usecs")
656                .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
657                .transpose()?
658                .unwrap_or(0),
659        );
660
661        let inflight = queries
662            .remove("inflight_num")
663            .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
664            .transpose()?
665            .unwrap_or(100);
666
667        let conn_timeout = queries
668            .remove("conn_timeout_secs")
669            .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
670            .transpose()?
671            .unwrap_or(5);
672
673        if let Some((opt, _)) = queries.into_iter().next() {
674            return Err(OptionError::Unknown(opt.into_owned()));
675        }
676
677        Ok(Self {
678            broker_addr,
679            port,
680            transport,
681            keep_alive,
682            clean_session,
683            client_id,
684            credentials,
685            max_incoming_packet_size,
686            max_outgoing_packet_size,
687            request_channel_capacity,
688            max_request_batch,
689            pending_throttle,
690            inflight,
691            last_will: None,
692            conn_timeout,
693            manual_acks: false,
694        })
695    }
696}
697
698// Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't
699// work.
700impl Debug for MqttOptions {
701    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
702        f.debug_struct("MqttOptions")
703            .field("broker_addr", &self.broker_addr)
704            .field("port", &self.port)
705            .field("keep_alive", &self.keep_alive)
706            .field("clean_session", &self.clean_session)
707            .field("client_id", &self.client_id)
708            .field("credentials", &self.credentials)
709            .field("max_packet_size", &self.max_incoming_packet_size)
710            .field("request_channel_capacity", &self.request_channel_capacity)
711            .field("max_request_batch", &self.max_request_batch)
712            .field("pending_throttle", &self.pending_throttle)
713            .field("inflight", &self.inflight)
714            .field("last_will", &self.last_will)
715            .field("conn_timeout", &self.conn_timeout)
716            .field("manual_acks", &self.manual_acks)
717            .finish()
718    }
719}
720
721#[cfg(test)]
722mod test {
723    use super::*;
724
725    #[test]
726    #[should_panic]
727    fn client_id_startswith_space() {
728        let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
729    }
730
731    #[test]
732    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
733    fn no_scheme() {
734        let mut _mqtt_opts = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
735
736        _mqtt_opts.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None));
737
738        if let crate::Transport::Wss(TlsConfiguration::Simple {
739            ca,
740            client_auth,
741            alpn,
742        }) = _mqtt_opts.transport
743        {
744            assert_eq!(ca, Vec::from("Test CA"));
745            assert_eq!(client_auth, None);
746            assert_eq!(alpn, None);
747        } else {
748            panic!("Unexpected transport!");
749        }
750
751        assert_eq!(_mqtt_opts.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
752    }
753
754    #[test]
755    #[cfg(feature = "url")]
756    fn from_url() {
757        use std::convert::TryInto;
758        use std::str::FromStr;
759
760        fn opt(s: &str) -> Result<MqttOptions, OptionError> {
761            url::Url::from_str(s).expect("valid url").try_into()
762        }
763        fn ok(s: &str) -> MqttOptions {
764            opt(s).expect("valid options")
765        }
766        fn err(s: &str) -> OptionError {
767            opt(s).expect_err("invalid options")
768        }
769
770        let v = ok("mqtt://host:42?client_id=foo");
771        assert_eq!(v.broker_address(), ("host".to_owned(), 42));
772        assert_eq!(v.client_id(), "foo".to_owned());
773
774        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
775        assert_eq!(v.keep_alive, Duration::from_secs(5));
776
777        assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
778        assert_eq!(
779            err("mqtt://host:42?client_id=foo&foo=bar"),
780            OptionError::Unknown("foo".to_owned())
781        );
782        assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
783        assert_eq!(
784            err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
785            OptionError::KeepAlive
786        );
787        assert_eq!(
788            err("mqtt://host:42?client_id=foo&clean_session=foo"),
789            OptionError::CleanSession
790        );
791        assert_eq!(
792            err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
793            OptionError::MaxIncomingPacketSize
794        );
795        assert_eq!(
796            err("mqtt://host:42?client_id=foo&max_outgoing_packet_size_bytes=foo"),
797            OptionError::MaxOutgoingPacketSize
798        );
799        assert_eq!(
800            err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
801            OptionError::RequestChannelCapacity
802        );
803        assert_eq!(
804            err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
805            OptionError::MaxRequestBatch
806        );
807        assert_eq!(
808            err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
809            OptionError::PendingThrottle
810        );
811        assert_eq!(
812            err("mqtt://host:42?client_id=foo&inflight_num=foo"),
813            OptionError::Inflight
814        );
815        assert_eq!(
816            err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
817            OptionError::ConnTimeout
818        );
819    }
820
821    #[test]
822    #[should_panic]
823    fn no_client_id() {
824        let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
825    }
826}