rumqttc_dev_patched/v5/
mod.rs

1use bytes::Bytes;
2use std::fmt::{self, Debug, Formatter};
3use std::time::Duration;
4#[cfg(feature = "websocket")]
5use std::{
6    future::{Future, IntoFuture},
7    pin::Pin,
8    sync::Arc,
9};
10
11mod client;
12mod eventloop;
13mod framed;
14pub mod mqttbytes;
15mod state;
16
17use crate::tokens::Resolver;
18use crate::{NetworkOptions, Outgoing, Transport};
19
20use mqttbytes::v5::*;
21
22pub use client::{
23    AsyncClient, Client, ClientError, Connection, Iter, RecvError, RecvTimeoutError, TryRecvError,
24};
25pub use eventloop::{ConnectionError, Event, EventLoop};
26pub use state::{MqttState, StateError};
27
28#[cfg(feature = "use-rustls-no-provider")]
29pub use crate::tls::Error as TlsError;
30
31#[cfg(feature = "proxy")]
32pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
33
34pub type Incoming = Packet;
35
36/// Used to encapsulate all publish acknowledgents in v5
37#[derive(Debug)]
38pub enum AckOfPub {
39    PubAck(PubAck),
40    PubComp(PubComp),
41    None,
42}
43
44/// Used to encapsulate all ack/pubrel acknowledgements in v5
45#[derive(Debug)]
46pub enum AckOfAck {
47    None,
48    PubRel(PubRel),
49}
50
51/// Requests by the client to mqtt event loop. Request are
52/// handled one by one.
53#[derive(Debug)]
54pub enum Request {
55    Publish(Publish, Resolver<AckOfPub>),
56    PubAck(PubAck, Resolver<AckOfAck>),
57    PubRec(PubRec, Resolver<AckOfAck>),
58    PubRel(PubRel, Resolver<AckOfPub>),
59    Subscribe(Subscribe, Resolver<SubAck>),
60    Unsubscribe(Unsubscribe, Resolver<UnsubAck>),
61    Disconnect(Resolver<()>),
62    PingReq,
63}
64
65#[cfg(feature = "websocket")]
66type RequestModifierFn = Arc<
67    dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
68        + Send
69        + Sync,
70>;
71
72// TODO: Should all the options be exposed as public? Drawback
73// would be loosing the ability to panic when the user options
74// are wrong (e.g empty client id) or aggressive (keep alive time)
75/// Options to configure the behaviour of MQTT connection
76#[derive(Clone)]
77pub struct MqttOptions {
78    /// broker address that you want to connect to
79    broker_addr: String,
80    /// broker port
81    port: u16,
82    // What transport protocol to use
83    transport: Transport,
84    /// keep alive time to send pingreq to broker when the connection is idle
85    keep_alive: Duration,
86    /// clean (or) persistent session
87    clean_start: bool,
88    /// client identifier
89    client_id: String,
90    /// username and password
91    credentials: Option<Login>,
92    /// request (publish, subscribe) channel capacity
93    request_channel_capacity: usize,
94    /// Max internal request batching
95    max_request_batch: usize,
96    /// Minimum delay time between consecutive outgoing packets
97    /// while retransmitting pending packets
98    pending_throttle: Duration,
99    /// Last will that will be issued on unexpected disconnect
100    last_will: Option<LastWill>,
101    /// Connection timeout
102    conn_timeout: u64,
103    /// Default value of for maximum incoming packet size.
104    /// Used when `max_incomming_size` in `connect_properties` is NOT available.
105    default_max_incoming_size: u32,
106    /// Connect Properties
107    connect_properties: Option<ConnectProperties>,
108    /// If set to `true` MQTT acknowledgements are not sent automatically.
109    /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
110    manual_acks: bool,
111    network_options: NetworkOptions,
112    #[cfg(feature = "proxy")]
113    /// Proxy configuration.
114    proxy: Option<Proxy>,
115    /// Upper limit on maximum number of inflight requests.
116    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
117    outgoing_inflight_upper_limit: Option<u16>,
118    #[cfg(feature = "websocket")]
119    request_modifier: Option<RequestModifierFn>,
120}
121
122impl MqttOptions {
123    /// Create an [`MqttOptions`] object that contains default values for all settings other than
124    /// - id: A string to identify the device connecting to a broker
125    /// - host: The broker's domain name or IP address
126    /// - port: The port number on which broker must be listening for incoming connections
127    ///
128    /// ```
129    /// # use rumqttc_dev_patched::v5::MqttOptions;
130    /// let options = MqttOptions::new("123", "localhost", 1883);
131    /// ```
132    pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
133        MqttOptions {
134            broker_addr: host.into(),
135            port,
136            transport: Transport::tcp(),
137            keep_alive: Duration::from_secs(60),
138            clean_start: true,
139            client_id: id.into(),
140            credentials: None,
141            request_channel_capacity: 10,
142            max_request_batch: 0,
143            pending_throttle: Duration::from_micros(0),
144            last_will: None,
145            conn_timeout: 5,
146            default_max_incoming_size: 10 * 1024,
147            connect_properties: None,
148            manual_acks: false,
149            network_options: NetworkOptions::new(),
150            #[cfg(feature = "proxy")]
151            proxy: None,
152            outgoing_inflight_upper_limit: None,
153            #[cfg(feature = "websocket")]
154            request_modifier: None,
155        }
156    }
157
158    #[cfg(feature = "url")]
159    /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's
160    /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature.
161    ///
162    /// ```
163    /// # use rumqttc_dev_patched::MqttOptions;
164    /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap();
165    /// ```
166    ///
167    /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`,
168    /// `ws://` or `wss://` to denote the protocol for establishing a connection with the broker.
169    ///
170    /// **NOTE:** Encrypted connections(i.e. `mqtts://`, `ssl://`, `wss://`) by default use the
171    /// system's root certificates. To configure with custom certificates, one may use the
172    /// [`set_transport`](MqttOptions::set_transport) method.
173    ///
174    /// ```ignore
175    /// # use rumqttc_dev_patched::{MqttOptions, Transport};
176    /// # use tokio_rustls::rustls::ClientConfig;
177    /// # let root_cert_store = rustls::RootCertStore::empty();
178    /// # let client_config = ClientConfig::builder()
179    /// #    .with_root_certificates(root_cert_store)
180    /// #    .with_no_client_auth();
181    /// let mut options = MqttOptions::parse_url("mqtts://example.com?client_id=123").unwrap();
182    /// options.set_transport(Transport::tls_with_config(client_config.into()));
183    /// ```
184    pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
185        let url = url::Url::parse(&url.into())?;
186        let options = MqttOptions::try_from(url)?;
187
188        Ok(options)
189    }
190
191    /// Broker address
192    pub fn broker_address(&self) -> (String, u16) {
193        (self.broker_addr.clone(), self.port)
194    }
195
196    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
197        self.last_will = Some(will);
198        self
199    }
200
201    pub fn last_will(&self) -> Option<LastWill> {
202        self.last_will.clone()
203    }
204
205    #[cfg(feature = "websocket")]
206    pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
207    where
208        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
209        O: IntoFuture<Output = http::Request<()>> + 'static,
210        O::IntoFuture: Send,
211    {
212        self.request_modifier = Some(Arc::new(move |request| {
213            let request_modifier = request_modifier(request).into_future();
214            Box::pin(request_modifier)
215        }));
216
217        self
218    }
219
220    #[cfg(feature = "websocket")]
221    pub fn request_modifier(&self) -> Option<RequestModifierFn> {
222        self.request_modifier.clone()
223    }
224
225    pub fn set_client_id(&mut self, client_id: String) -> &mut Self {
226        self.client_id = client_id;
227        self
228    }
229
230    pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
231        self.transport = transport;
232        self
233    }
234
235    pub fn transport(&self) -> Transport {
236        self.transport.clone()
237    }
238
239    /// Set number of seconds after which client should ping the broker
240    /// if there is no other data exchange
241    pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
242        assert!(duration.as_secs() >= 5, "Keep alives should be >= 5 secs");
243
244        self.keep_alive = duration;
245        self
246    }
247
248    /// Keep alive time
249    pub fn keep_alive(&self) -> Duration {
250        self.keep_alive
251    }
252
253    /// Client identifier
254    pub fn client_id(&self) -> String {
255        self.client_id.clone()
256    }
257
258    /// `clean_start = true` removes all the state from queues & instructs the broker
259    /// to clean all the client state when client disconnects.
260    ///
261    /// When set `false`, broker will hold the client state and performs pending
262    /// operations on the client when reconnection with same `client_id`
263    /// happens. Local queue state is also held to retransmit packets after reconnection.
264    pub fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
265        self.clean_start = clean_start;
266        self
267    }
268
269    /// Clean session
270    pub fn clean_start(&self) -> bool {
271        self.clean_start
272    }
273
274    /// Username and password
275    pub fn set_credentials<U: Into<String>, P: Into<String>>(
276        &mut self,
277        username: U,
278        password: P,
279    ) -> &mut Self {
280        self.credentials = Some(Login::new(username, password));
281        self
282    }
283
284    /// Security options
285    pub fn credentials(&self) -> Option<Login> {
286        self.credentials.clone()
287    }
288
289    /// Set request channel capacity
290    pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
291        self.request_channel_capacity = capacity;
292        self
293    }
294
295    /// Request channel capacity
296    pub fn request_channel_capacity(&self) -> usize {
297        self.request_channel_capacity
298    }
299
300    /// Enables throttling and sets outoing message rate to the specified 'rate'
301    pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
302        self.pending_throttle = duration;
303        self
304    }
305
306    /// Outgoing message rate
307    pub fn pending_throttle(&self) -> Duration {
308        self.pending_throttle
309    }
310
311    /// set connection timeout in secs
312    pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
313        self.conn_timeout = timeout;
314        self
315    }
316
317    /// get timeout in secs
318    pub fn connection_timeout(&self) -> u64 {
319        self.conn_timeout
320    }
321
322    /// set connection properties
323    pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
324        self.connect_properties = Some(properties);
325        self
326    }
327
328    /// get connection properties
329    pub fn connect_properties(&self) -> Option<ConnectProperties> {
330        self.connect_properties.clone()
331    }
332
333    /// set session expiry interval on connection properties
334    pub fn set_session_expiry_interval(&mut self, interval: Option<u32>) -> &mut Self {
335        if let Some(conn_props) = &mut self.connect_properties {
336            conn_props.session_expiry_interval = interval;
337            self
338        } else {
339            let mut conn_props = ConnectProperties::new();
340            conn_props.session_expiry_interval = interval;
341            self.set_connect_properties(conn_props)
342        }
343    }
344
345    /// get session expiry interval on connection properties
346    pub fn session_expiry_interval(&self) -> Option<u32> {
347        if let Some(conn_props) = &self.connect_properties {
348            conn_props.session_expiry_interval
349        } else {
350            None
351        }
352    }
353
354    /// set receive maximum on connection properties
355    pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
356        if let Some(conn_props) = &mut self.connect_properties {
357            conn_props.receive_maximum = recv_max;
358            self
359        } else {
360            let mut conn_props = ConnectProperties::new();
361            conn_props.receive_maximum = recv_max;
362            self.set_connect_properties(conn_props)
363        }
364    }
365
366    /// get receive maximum from connection properties
367    pub fn receive_maximum(&self) -> Option<u16> {
368        if let Some(conn_props) = &self.connect_properties {
369            conn_props.receive_maximum
370        } else {
371            None
372        }
373    }
374
375    /// set max packet size on connection properties
376    pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
377        if let Some(conn_props) = &mut self.connect_properties {
378            conn_props.max_packet_size = max_size;
379            self
380        } else {
381            let mut conn_props = ConnectProperties::new();
382            conn_props.max_packet_size = max_size;
383            self.set_connect_properties(conn_props)
384        }
385    }
386
387    /// get max packet size from connection properties
388    pub fn max_packet_size(&self) -> Option<u32> {
389        if let Some(conn_props) = &self.connect_properties {
390            conn_props.max_packet_size
391        } else {
392            None
393        }
394    }
395
396    /// set max topic alias on connection properties
397    pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
398        if let Some(conn_props) = &mut self.connect_properties {
399            conn_props.topic_alias_max = topic_alias_max;
400            self
401        } else {
402            let mut conn_props = ConnectProperties::new();
403            conn_props.topic_alias_max = topic_alias_max;
404            self.set_connect_properties(conn_props)
405        }
406    }
407
408    /// get max topic alias from connection properties
409    pub fn topic_alias_max(&self) -> Option<u16> {
410        if let Some(conn_props) = &self.connect_properties {
411            conn_props.topic_alias_max
412        } else {
413            None
414        }
415    }
416
417    /// set request response info on connection properties
418    pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
419        if let Some(conn_props) = &mut self.connect_properties {
420            conn_props.request_response_info = request_response_info;
421            self
422        } else {
423            let mut conn_props = ConnectProperties::new();
424            conn_props.request_response_info = request_response_info;
425            self.set_connect_properties(conn_props)
426        }
427    }
428
429    /// get request response info from connection properties
430    pub fn request_response_info(&self) -> Option<u8> {
431        if let Some(conn_props) = &self.connect_properties {
432            conn_props.request_response_info
433        } else {
434            None
435        }
436    }
437
438    /// set request problem info on connection properties
439    pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
440        if let Some(conn_props) = &mut self.connect_properties {
441            conn_props.request_problem_info = request_problem_info;
442            self
443        } else {
444            let mut conn_props = ConnectProperties::new();
445            conn_props.request_problem_info = request_problem_info;
446            self.set_connect_properties(conn_props)
447        }
448    }
449
450    /// get request problem info from connection properties
451    pub fn request_problem_info(&self) -> Option<u8> {
452        if let Some(conn_props) = &self.connect_properties {
453            conn_props.request_problem_info
454        } else {
455            None
456        }
457    }
458
459    /// set user properties on connection properties
460    pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
461        if let Some(conn_props) = &mut self.connect_properties {
462            conn_props.user_properties = user_properties;
463            self
464        } else {
465            let mut conn_props = ConnectProperties::new();
466            conn_props.user_properties = user_properties;
467            self.set_connect_properties(conn_props)
468        }
469    }
470
471    /// get user properties from connection properties
472    pub fn user_properties(&self) -> Vec<(String, String)> {
473        if let Some(conn_props) = &self.connect_properties {
474            conn_props.user_properties.clone()
475        } else {
476            Vec::new()
477        }
478    }
479
480    /// set authentication method on connection properties
481    pub fn set_authentication_method(
482        &mut self,
483        authentication_method: Option<String>,
484    ) -> &mut Self {
485        if let Some(conn_props) = &mut self.connect_properties {
486            conn_props.authentication_method = authentication_method;
487            self
488        } else {
489            let mut conn_props = ConnectProperties::new();
490            conn_props.authentication_method = authentication_method;
491            self.set_connect_properties(conn_props)
492        }
493    }
494
495    /// get authentication method from connection properties
496    pub fn authentication_method(&self) -> Option<String> {
497        if let Some(conn_props) = &self.connect_properties {
498            conn_props.authentication_method.clone()
499        } else {
500            None
501        }
502    }
503
504    /// set authentication data on connection properties
505    pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
506        if let Some(conn_props) = &mut self.connect_properties {
507            conn_props.authentication_data = authentication_data;
508            self
509        } else {
510            let mut conn_props = ConnectProperties::new();
511            conn_props.authentication_data = authentication_data;
512            self.set_connect_properties(conn_props)
513        }
514    }
515
516    /// get authentication data from connection properties
517    pub fn authentication_data(&self) -> Option<Bytes> {
518        if let Some(conn_props) = &self.connect_properties {
519            conn_props.authentication_data.clone()
520        } else {
521            None
522        }
523    }
524
525    /// set manual acknowledgements
526    pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
527        self.manual_acks = manual_acks;
528        self
529    }
530
531    /// get manual acknowledgements
532    pub fn manual_acks(&self) -> bool {
533        self.manual_acks
534    }
535
536    pub fn network_options(&self) -> NetworkOptions {
537        self.network_options.clone()
538    }
539
540    pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
541        self.network_options = network_options;
542        self
543    }
544
545    #[cfg(feature = "proxy")]
546    pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
547        self.proxy = Some(proxy);
548        self
549    }
550
551    #[cfg(feature = "proxy")]
552    pub fn proxy(&self) -> Option<Proxy> {
553        self.proxy.clone()
554    }
555
556    /// Get the upper limit on maximum number of inflight outgoing publishes.
557    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
558    pub fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
559        self.outgoing_inflight_upper_limit = Some(limit);
560        self
561    }
562
563    /// Set the upper limit on maximum number of inflight outgoing publishes.
564    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
565    pub fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
566        self.outgoing_inflight_upper_limit
567    }
568}
569
570#[cfg(feature = "url")]
571#[derive(Debug, PartialEq, Eq, thiserror::Error)]
572pub enum OptionError {
573    #[error("Unsupported URL scheme.")]
574    Scheme,
575
576    #[error("Missing client ID.")]
577    ClientId,
578
579    #[error("Invalid keep-alive value.")]
580    KeepAlive,
581
582    #[error("Invalid clean-start value.")]
583    CleanStart,
584
585    #[error("Invalid max-incoming-packet-size value.")]
586    MaxIncomingPacketSize,
587
588    #[error("Invalid max-outgoing-packet-size value.")]
589    MaxOutgoingPacketSize,
590
591    #[error("Invalid request-channel-capacity value.")]
592    RequestChannelCapacity,
593
594    #[error("Invalid max-request-batch value.")]
595    MaxRequestBatch,
596
597    #[error("Invalid pending-throttle value.")]
598    PendingThrottle,
599
600    #[error("Invalid inflight value.")]
601    Inflight,
602
603    #[error("Invalid conn-timeout value.")]
604    ConnTimeout,
605
606    #[error("Unknown option: {0}")]
607    Unknown(String),
608
609    #[error("Couldn't parse option from url: {0}")]
610    Parse(#[from] url::ParseError),
611}
612
613#[cfg(feature = "url")]
614impl std::convert::TryFrom<url::Url> for MqttOptions {
615    type Error = OptionError;
616
617    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
618        use std::collections::HashMap;
619
620        let host = url.host_str().unwrap_or_default().to_owned();
621
622        let (transport, default_port) = match url.scheme() {
623            // Encrypted connections are supported, but require explicit TLS configuration. We fall
624            // back to the unencrypted transport layer, so that `set_transport` can be used to
625            // configure the encrypted transport layer with the provided TLS configuration.
626            #[cfg(feature = "use-rustls-no-provider")]
627            "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
628            "mqtt" | "tcp" => (Transport::Tcp, 1883),
629            #[cfg(feature = "websocket")]
630            "ws" => (Transport::Ws, 8000),
631            #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
632            "wss" => (Transport::wss_with_default_config(), 8000),
633            _ => return Err(OptionError::Scheme),
634        };
635
636        let port = url.port().unwrap_or(default_port);
637
638        let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
639
640        let id = queries
641            .remove("client_id")
642            .ok_or(OptionError::ClientId)?
643            .into_owned();
644
645        let mut options = MqttOptions::new(id, host, port);
646        let mut connect_props = ConnectProperties::new();
647        options.set_transport(transport);
648
649        if let Some(keep_alive) = queries
650            .remove("keep_alive_secs")
651            .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
652            .transpose()?
653        {
654            options.set_keep_alive(Duration::from_secs(keep_alive));
655        }
656
657        if let Some(clean_start) = queries
658            .remove("clean_start")
659            .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
660            .transpose()?
661        {
662            options.set_clean_start(clean_start);
663        }
664
665        if let Some((username, password)) = {
666            match url.username() {
667                "" => None,
668                username => Some((
669                    username.to_owned(),
670                    url.password().unwrap_or_default().to_owned(),
671                )),
672            }
673        } {
674            options.set_credentials(username, password);
675        }
676
677        connect_props.max_packet_size = queries
678            .remove("max_incoming_packet_size_bytes")
679            .map(|v| {
680                v.parse::<u32>()
681                    .map_err(|_| OptionError::MaxIncomingPacketSize)
682            })
683            .transpose()?;
684
685        if let Some(request_channel_capacity) = queries
686            .remove("request_channel_capacity_num")
687            .map(|v| {
688                v.parse::<usize>()
689                    .map_err(|_| OptionError::RequestChannelCapacity)
690            })
691            .transpose()?
692        {
693            options.request_channel_capacity = request_channel_capacity;
694        }
695
696        if let Some(max_request_batch) = queries
697            .remove("max_request_batch_num")
698            .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
699            .transpose()?
700        {
701            options.max_request_batch = max_request_batch;
702        }
703
704        if let Some(pending_throttle) = queries
705            .remove("pending_throttle_usecs")
706            .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
707            .transpose()?
708        {
709            options.set_pending_throttle(Duration::from_micros(pending_throttle));
710        }
711
712        connect_props.receive_maximum = queries
713            .remove("inflight_num")
714            .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
715            .transpose()?;
716
717        if let Some(conn_timeout) = queries
718            .remove("conn_timeout_secs")
719            .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
720            .transpose()?
721        {
722            options.set_connection_timeout(conn_timeout);
723        }
724
725        if let Some((opt, _)) = queries.into_iter().next() {
726            return Err(OptionError::Unknown(opt.into_owned()));
727        }
728
729        options.connect_properties = Some(connect_props);
730        Ok(options)
731    }
732}
733
734// Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't
735// work.
736impl Debug for MqttOptions {
737    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
738        f.debug_struct("MqttOptions")
739            .field("broker_addr", &self.broker_addr)
740            .field("port", &self.port)
741            .field("keep_alive", &self.keep_alive)
742            .field("clean_start", &self.clean_start)
743            .field("client_id", &self.client_id)
744            .field("credentials", &self.credentials)
745            .field("request_channel_capacity", &self.request_channel_capacity)
746            .field("max_request_batch", &self.max_request_batch)
747            .field("pending_throttle", &self.pending_throttle)
748            .field("last_will", &self.last_will)
749            .field("conn_timeout", &self.conn_timeout)
750            .field("manual_acks", &self.manual_acks)
751            .field("connect properties", &self.connect_properties)
752            .finish()
753    }
754}
755
756#[cfg(test)]
757mod test {
758    use super::*;
759
760    #[test]
761    #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
762    fn no_scheme() {
763        use crate::{TlsConfiguration, Transport};
764        let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
765
766        mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
767
768        if let Transport::Wss(TlsConfiguration::Simple {
769            ca,
770            client_auth,
771            alpn,
772        }) = mqttoptions.transport
773        {
774            assert_eq!(ca, Vec::from("Test CA"));
775            assert_eq!(client_auth, None);
776            assert_eq!(alpn, None);
777        } else {
778            panic!("Unexpected transport!");
779        }
780
781        assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
782    }
783
784    #[test]
785    #[cfg(feature = "url")]
786    fn from_url() {
787        fn opt(s: &str) -> Result<MqttOptions, OptionError> {
788            MqttOptions::parse_url(s)
789        }
790        fn ok(s: &str) -> MqttOptions {
791            opt(s).expect("valid options")
792        }
793        fn err(s: &str) -> OptionError {
794            opt(s).expect_err("invalid options")
795        }
796
797        let v = ok("mqtt://host:42?client_id=foo");
798        assert_eq!(v.broker_address(), ("host".to_owned(), 42));
799        assert_eq!(v.client_id(), "foo".to_owned());
800
801        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
802        assert_eq!(v.keep_alive, Duration::from_secs(5));
803
804        assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
805        assert_eq!(
806            err("mqtt://host:42?client_id=foo&foo=bar"),
807            OptionError::Unknown("foo".to_owned())
808        );
809        assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
810        assert_eq!(
811            err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
812            OptionError::KeepAlive
813        );
814        assert_eq!(
815            err("mqtt://host:42?client_id=foo&clean_start=foo"),
816            OptionError::CleanStart
817        );
818        assert_eq!(
819            err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
820            OptionError::MaxIncomingPacketSize
821        );
822        assert_eq!(
823            err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
824            OptionError::RequestChannelCapacity
825        );
826        assert_eq!(
827            err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
828            OptionError::MaxRequestBatch
829        );
830        assert_eq!(
831            err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
832            OptionError::PendingThrottle
833        );
834        assert_eq!(
835            err("mqtt://host:42?client_id=foo&inflight_num=foo"),
836            OptionError::Inflight
837        );
838        assert_eq!(
839            err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
840            OptionError::ConnTimeout
841        );
842    }
843
844    #[test]
845    fn allow_empty_client_id() {
846        let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_start(true);
847    }
848}