Skip to main content

rumqttc/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4#[cfg(all(feature = "use-rustls-ring", feature = "use-rustls-aws-lc"))]
5compile_error!(
6    "Features `use-rustls-ring` and `use-rustls-aws-lc` are mutually exclusive. Enable only one rustls provider feature."
7);
8
9#[macro_use]
10extern crate log;
11
12use bytes::Bytes;
13use std::fmt::{self, Debug, Formatter};
14use std::io;
15use std::net::SocketAddr;
16use std::path::PathBuf;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::net::{TcpStream, lookup_host};
20use tokio::task::JoinSet;
21
22#[cfg(all(feature = "url", unix))]
23use percent_encoding::percent_decode_str;
24
25#[cfg(all(feature = "url", unix))]
26use std::{ffi::OsString, os::unix::ffi::OsStringExt};
27
28#[cfg(feature = "websocket")]
29use std::{
30    future::{Future, IntoFuture},
31    pin::Pin,
32};
33
34mod auth;
35mod client;
36mod eventloop;
37mod framed;
38pub mod mqttbytes;
39mod notice;
40mod state;
41mod transport;
42
43#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
44mod tls;
45
46#[cfg(feature = "websocket")]
47mod websockets;
48
49#[cfg(feature = "proxy")]
50mod proxy;
51
52pub use client::{
53    AsyncClient, AsyncClientBuilder, Client, ClientBuilder, ClientError, Connection, InvalidTopic,
54    Iter, ManualAck, PublishTopic, RecvError, RecvTimeoutError, TryRecvError, ValidatedTopic,
55};
56pub use eventloop::{ConnectionError, Event, EventLoop};
57pub use mqttbytes::v5::*;
58pub use mqttbytes::*;
59pub use notice::{
60    AuthNotice, AuthNoticeError, NoticeFailureReason, PublishNotice, PublishNoticeError,
61    PublishResult, SubscribeNotice, SubscribeNoticeError, UnsubscribeNotice,
62    UnsubscribeNoticeError,
63};
64pub use rumqttc_core::NetworkOptions;
65#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
66pub use rumqttc_core::TlsConfiguration;
67pub use rumqttc_core::default_socket_connect;
68pub use state::{MqttState, MqttStateBuilder, StateError};
69pub use transport::Transport;
70
71/// Policy used for automatic MQTT 5 client-side topic alias assignment.
72#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
73pub enum TopicAliasPolicy {
74    /// Assign aliases monotonically until the broker's Topic Alias Maximum is reached.
75    #[default]
76    Monotonic,
77    /// Recycle automatically assigned aliases using least-recently-used eviction.
78    Lru,
79}
80
81#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
82pub use crate::tls::Error as TlsError;
83
84#[cfg(feature = "proxy")]
85pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
86
87#[cfg(feature = "use-native-tls")]
88pub use tokio_native_tls;
89#[cfg(feature = "use-rustls-no-provider")]
90pub use tokio_rustls;
91
92pub type Incoming = Packet;
93
94/// Current outgoing activity on the eventloop
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum Outgoing {
97    /// Publish packet with packet identifier. 0 implies `QoS` 0
98    Publish(u16),
99    /// Subscribe packet with packet identifier
100    Subscribe(u16),
101    /// Unsubscribe packet with packet identifier
102    Unsubscribe(u16),
103    /// `PubAck` packet
104    PubAck(u16),
105    /// `PubRec` packet
106    PubRec(u16),
107    /// `PubRel` packet
108    PubRel(u16),
109    /// `PubComp` packet
110    PubComp(u16),
111    /// Ping request packet
112    PingReq,
113    /// Ping response packet
114    PingResp,
115    /// Disconnect packet
116    Disconnect,
117    /// Await for an ack for more outgoing progress
118    AwaitAck(u16),
119    /// Auth packet
120    Auth,
121}
122
123/// Custom socket connector used to establish the underlying stream before optional proxy/TLS layers.
124pub(crate) type SocketConnector = rumqttc_core::SocketConnector;
125
126const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(100);
127
128async fn first_success_with_stagger<T, I, F, Fut>(
129    items: I,
130    attempt_delay: Duration,
131    connect_fn: F,
132) -> io::Result<T>
133where
134    T: Send + 'static,
135    I: IntoIterator,
136    I::Item: Send + 'static,
137    F: Fn(I::Item) -> Fut + Send + Sync + Clone + 'static,
138    Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
139{
140    let mut join_set = JoinSet::new();
141    let mut item_count = 0usize;
142
143    for (index, item) in items.into_iter().enumerate() {
144        item_count += 1;
145        let delay = attempt_delay.saturating_mul(u32::try_from(index).unwrap_or(u32::MAX));
146        let connect_fn = connect_fn.clone();
147        join_set.spawn(async move {
148            tokio::time::sleep(delay).await;
149            connect_fn(item).await
150        });
151    }
152
153    if item_count == 0 {
154        return Err(io::Error::new(
155            io::ErrorKind::InvalidInput,
156            "could not resolve to any address",
157        ));
158    }
159
160    let mut last_err = None;
161
162    while let Some(task_result) = join_set.join_next().await {
163        match task_result {
164            Ok(Ok(stream)) => {
165                join_set.abort_all();
166                return Ok(stream);
167            }
168            Ok(Err(err)) => {
169                last_err = Some(err);
170            }
171            Err(err) => {
172                last_err = Some(io::Error::other(format!(
173                    "concurrent connect task failed: {err}"
174                )));
175            }
176        }
177    }
178
179    Err(last_err.unwrap_or_else(|| {
180        io::Error::new(
181            io::ErrorKind::InvalidInput,
182            "could not resolve to any address",
183        )
184    }))
185}
186
187async fn first_success_sequential<T, I, F, Fut>(items: I, connect_fn: F) -> io::Result<T>
188where
189    I: IntoIterator,
190    F: Fn(I::Item) -> Fut,
191    Fut: std::future::Future<Output = io::Result<T>>,
192{
193    let mut item_count = 0usize;
194    let mut last_err = None;
195
196    for item in items {
197        item_count += 1;
198        match connect_fn(item).await {
199            Ok(stream) => return Ok(stream),
200            Err(err) => last_err = Some(err),
201        }
202    }
203
204    if item_count == 0 {
205        return Err(io::Error::new(
206            io::ErrorKind::InvalidInput,
207            "could not resolve to any address",
208        ));
209    }
210
211    Err(last_err.unwrap_or_else(|| {
212        io::Error::new(
213            io::ErrorKind::InvalidInput,
214            "could not resolve to any address",
215        )
216    }))
217}
218
219fn should_stagger_connect_attempts(network_options: &NetworkOptions) -> bool {
220    network_options
221        .bind_addr()
222        .is_none_or(|bind_addr| bind_addr.port() == 0)
223}
224
225async fn connect_with_retry_mode<T, I, F, Fut>(
226    items: I,
227    network_options: NetworkOptions,
228    connect_fn: F,
229) -> io::Result<T>
230where
231    T: Send + 'static,
232    I: IntoIterator,
233    I::Item: Send + 'static,
234    F: Fn(I::Item, NetworkOptions) -> Fut + Send + Sync + Clone + 'static,
235    Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
236{
237    connect_with_retry_mode_and_delay(items, network_options, CONNECTION_ATTEMPT_DELAY, connect_fn)
238        .await
239}
240
241async fn connect_with_retry_mode_and_delay<T, I, F, Fut>(
242    items: I,
243    network_options: NetworkOptions,
244    connection_attempt_delay: Duration,
245    connect_fn: F,
246) -> io::Result<T>
247where
248    T: Send + 'static,
249    I: IntoIterator,
250    I::Item: Send + 'static,
251    F: Fn(I::Item, NetworkOptions) -> Fut + Send + Sync + Clone + 'static,
252    Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
253{
254    if should_stagger_connect_attempts(&network_options) {
255        first_success_with_stagger(items, connection_attempt_delay, move |item| {
256            let network_options = network_options.clone();
257            let connect_fn = connect_fn.clone();
258            async move { connect_fn(item, network_options).await }
259        })
260        .await
261    } else {
262        first_success_sequential(items, move |item| {
263            let network_options = network_options.clone();
264            let connect_fn = connect_fn.clone();
265            async move { connect_fn(item, network_options).await }
266        })
267        .await
268    }
269}
270
271async fn connect_resolved_addrs_staggered(
272    addrs: Vec<SocketAddr>,
273    network_options: NetworkOptions,
274) -> io::Result<TcpStream> {
275    connect_with_retry_mode(
276        addrs,
277        network_options,
278        move |addr, network_options| async move {
279            rumqttc_core::connect_socket_addr(addr, network_options).await
280        },
281    )
282    .await
283}
284
285async fn default_socket_connect_staggered(
286    host: String,
287    network_options: NetworkOptions,
288) -> io::Result<TcpStream> {
289    let addrs = lookup_host(host).await?.collect::<Vec<_>>();
290    connect_resolved_addrs_staggered(addrs, network_options).await
291}
292
293fn default_socket_connector() -> SocketConnector {
294    Arc::new(|host, network_options| {
295        Box::pin(async move {
296            let tcp = default_socket_connect_staggered(host, network_options).await?;
297            Ok(Box::new(tcp) as Box<dyn crate::framed::AsyncReadWrite>)
298        })
299    })
300}
301
302const DEFAULT_BROKER_PORT: u16 = 1883;
303
304/// Broker target used to construct [`MqttOptions`].
305#[derive(Clone, Debug, PartialEq, Eq)]
306pub struct Broker {
307    inner: BrokerInner,
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311enum BrokerInner {
312    Tcp {
313        host: String,
314        port: u16,
315    },
316    #[cfg(unix)]
317    Unix {
318        path: PathBuf,
319    },
320    #[cfg(feature = "websocket")]
321    Websocket {
322        url: String,
323    },
324}
325
326impl Broker {
327    #[must_use]
328    pub fn tcp<S: Into<String>>(host: S, port: u16) -> Self {
329        Self {
330            inner: BrokerInner::Tcp {
331                host: host.into(),
332                port,
333            },
334        }
335    }
336
337    #[cfg(unix)]
338    #[must_use]
339    pub fn unix<P: Into<PathBuf>>(path: P) -> Self {
340        Self {
341            inner: BrokerInner::Unix { path: path.into() },
342        }
343    }
344
345    #[cfg(feature = "websocket")]
346    /// # Errors
347    ///
348    /// Returns [`OptionError::WebsocketUrl`] when `url` is not a valid websocket URL or cannot
349    /// be split into broker components, [`OptionError::WssRequiresExplicitTransport`] for `wss://`
350    /// URLs, and [`OptionError::Scheme`] for unsupported schemes.
351    pub fn websocket<S: Into<String>>(url: S) -> Result<Self, OptionError> {
352        let url = url.into();
353        let uri = url
354            .parse::<http::Uri>()
355            .map_err(|_| OptionError::WebsocketUrl)?;
356
357        match uri.scheme_str() {
358            Some("ws") => {
359                rumqttc_core::split_url(&url).map_err(|_| OptionError::WebsocketUrl)?;
360                Ok(Self {
361                    inner: BrokerInner::Websocket { url },
362                })
363            }
364            Some("wss") => Err(OptionError::WssRequiresExplicitTransport),
365            _ => Err(OptionError::Scheme),
366        }
367    }
368
369    #[must_use]
370    pub const fn tcp_address(&self) -> Option<(&str, u16)> {
371        match &self.inner {
372            BrokerInner::Tcp { host, port } => Some((host.as_str(), *port)),
373            #[cfg(unix)]
374            BrokerInner::Unix { .. } => None,
375            #[cfg(feature = "websocket")]
376            BrokerInner::Websocket { .. } => None,
377        }
378    }
379
380    #[cfg(unix)]
381    #[must_use]
382    pub fn unix_path(&self) -> Option<&std::path::Path> {
383        match &self.inner {
384            BrokerInner::Unix { path } => Some(path.as_path()),
385            BrokerInner::Tcp { .. } => None,
386            #[cfg(feature = "websocket")]
387            BrokerInner::Websocket { .. } => None,
388        }
389    }
390
391    #[cfg(feature = "websocket")]
392    #[must_use]
393    pub const fn websocket_url(&self) -> Option<&str> {
394        match &self.inner {
395            BrokerInner::Websocket { url } => Some(url.as_str()),
396            BrokerInner::Tcp { .. } => None,
397            #[cfg(unix)]
398            BrokerInner::Unix { .. } => None,
399        }
400    }
401
402    pub(crate) const fn default_transport(&self) -> Transport {
403        match &self.inner {
404            BrokerInner::Tcp { .. } => Transport::tcp(),
405            #[cfg(unix)]
406            BrokerInner::Unix { .. } => Transport::unix(),
407            #[cfg(feature = "websocket")]
408            BrokerInner::Websocket { .. } => Transport::Ws,
409        }
410    }
411}
412
413impl From<&str> for Broker {
414    fn from(host: &str) -> Self {
415        Self::tcp(host, DEFAULT_BROKER_PORT)
416    }
417}
418
419impl From<String> for Broker {
420    fn from(host: String) -> Self {
421        Self::tcp(host, DEFAULT_BROKER_PORT)
422    }
423}
424
425impl<S: Into<String>> From<(S, u16)> for Broker {
426    fn from((host, port): (S, u16)) -> Self {
427        Self::tcp(host, port)
428    }
429}
430
431/// Controls how incoming packet size limits are enforced locally.
432#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
433pub enum IncomingPacketSizeLimit {
434    /// Enforce the default incoming packet size limit.
435    #[default]
436    Default,
437    /// Disable incoming packet size checks.
438    Unlimited,
439    /// Enforce a user-specified maximum size.
440    Bytes(u32),
441}
442
443/// Identifies which MQTT 5 enhanced-authentication exchange is active.
444#[derive(Clone, Copy, Debug, PartialEq, Eq)]
445pub enum AuthExchangeKind {
446    InitialConnect,
447    Reauthentication,
448}
449
450/// Context supplied to MQTT 5 enhanced-authentication callbacks.
451#[derive(Clone, Copy, Debug, PartialEq, Eq)]
452pub struct AuthContext<'a> {
453    pub kind: AuthExchangeKind,
454    pub method: &'a str,
455}
456
457/// Action returned by an [`Authenticator`] after an AUTH Continue packet.
458#[derive(Clone, Debug, PartialEq, Eq)]
459pub enum AuthAction {
460    Send(AuthProperties),
461    Complete,
462    Fail(String),
463}
464
465/// Errors returned by MQTT 5 enhanced-authentication callbacks.
466#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
467pub enum AuthError {
468    #[error("authentication failed: {0}")]
469    Failed(String),
470}
471
472impl From<String> for AuthError {
473    fn from(value: String) -> Self {
474        Self::Failed(value)
475    }
476}
477
478impl From<&str> for AuthError {
479    fn from(value: &str) -> Self {
480        Self::Failed(value.to_owned())
481    }
482}
483
484/// Structured reason emitted when an authentication exchange fails.
485#[derive(Clone, Debug, PartialEq, Eq)]
486pub enum AuthFailureReason {
487    SessionReset,
488    ProtocolError,
489    AuthenticationFailed(String),
490    ConnectionClosed,
491    OverlappingReauth,
492    MissingAuthenticationMethod,
493    NoticeDropped,
494}
495
496/// Structured result of a completed MQTT 5 authentication exchange.
497#[derive(Clone, Debug, PartialEq, Eq)]
498pub enum AuthOutcome {
499    Success,
500}
501
502/// Structured authentication lifecycle event yielded by the event loop.
503#[derive(Clone, Debug, PartialEq, Eq)]
504pub enum AuthEvent {
505    Started {
506        kind: AuthExchangeKind,
507        method: String,
508    },
509    Continue {
510        kind: AuthExchangeKind,
511        method: String,
512    },
513    Succeeded {
514        kind: AuthExchangeKind,
515        method: String,
516    },
517    Failed {
518        kind: AuthExchangeKind,
519        method: String,
520        reason: AuthFailureReason,
521    },
522}
523
524/// MQTT 5 enhanced-authentication callback interface.
525pub trait Authenticator: std::fmt::Debug + Send {
526    /// Called when an enhanced-authentication exchange starts.
527    ///
528    /// # Errors
529    ///
530    /// Return an error to abort the exchange locally.
531    fn start(&mut self, context: AuthContext<'_>) -> Result<Option<AuthProperties>, AuthError>;
532
533    /// Called when the broker sends AUTH Continue.
534    ///
535    /// # Errors
536    ///
537    /// Return an error to abort the exchange locally.
538    fn continue_auth(
539        &mut self,
540        context: AuthContext<'_>,
541        incoming: Option<AuthProperties>,
542    ) -> Result<AuthAction, AuthError>;
543
544    /// Called when the broker sends AUTH Success.
545    ///
546    /// # Errors
547    ///
548    /// Return an error if the successful server response cannot be accepted by
549    /// the authentication mechanism.
550    fn success(
551        &mut self,
552        context: AuthContext<'_>,
553        incoming: Option<AuthProperties>,
554    ) -> Result<(), AuthError>;
555
556    /// Called when an active exchange fails or is aborted.
557    fn failure(&mut self, context: AuthContext<'_>, error: AuthError);
558}
559
560/// Requests by the client to mqtt event loop. Request are
561/// handled one by one.
562#[derive(Clone, Debug, PartialEq, Eq)]
563pub enum Request {
564    Publish(Publish),
565    PubAck(PubAck),
566    PubRec(PubRec),
567    PubComp(PubComp),
568    PubRel(PubRel),
569    PingReq,
570    PingResp,
571    Subscribe(Subscribe),
572    SubAck(SubAck),
573    Unsubscribe(Unsubscribe),
574    UnsubAck(UnsubAck),
575    Auth(Auth),
576    Disconnect(Disconnect),
577    DisconnectNow(Disconnect),
578    DisconnectWithTimeout(Disconnect, Duration),
579}
580
581impl From<Subscribe> for Request {
582    fn from(subscribe: Subscribe) -> Self {
583        Self::Subscribe(subscribe)
584    }
585}
586
587#[cfg(feature = "websocket")]
588type RequestModifierFn = Arc<
589    dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
590        + Send
591        + Sync,
592>;
593
594#[cfg(feature = "websocket")]
595type RequestModifierError = Box<dyn std::error::Error + Send + Sync>;
596
597#[cfg(feature = "websocket")]
598type FallibleRequestModifierFn = Arc<
599    dyn Fn(
600            http::Request<()>,
601        )
602            -> Pin<Box<dyn Future<Output = Result<http::Request<()>, RequestModifierError>> + Send>>
603        + Send
604        + Sync,
605>;
606
607/// Options to configure the behaviour of MQTT connection
608#[derive(Clone)]
609pub struct MqttOptions {
610    /// broker target that you want to connect to
611    broker: Broker,
612    transport: Transport,
613    /// keep alive time to send pingreq to broker when the connection is idle
614    keep_alive: Duration,
615    /// clean (or) persistent session
616    clean_start: bool,
617    /// client identifier
618    client_id: String,
619    /// CONNECT authentication fields
620    auth: ConnectAuth,
621    /// request (publish, subscribe) channel capacity
622    request_channel_capacity: usize,
623    /// Max internal request batching
624    max_request_batch: usize,
625    /// Maximum number of packets processed in a single network read batch.
626    /// `0` enables adaptive batching.
627    read_batch_size: usize,
628    /// Minimum delay time between consecutive outgoing packets
629    /// while retransmitting pending packets
630    pending_throttle: Duration,
631    /// Last will that will be issued on unexpected disconnect
632    last_will: Option<LastWill>,
633    /// Connection timeout
634    connect_timeout: Duration,
635    /// Default value of for maximum incoming packet size.
636    /// Used when `max_incomming_size` in `connect_properties` is NOT available.
637    default_max_incoming_size: u32,
638    /// Local incoming packet size policy.
639    incoming_packet_size_limit: IncomingPacketSizeLimit,
640    /// Connect Properties
641    connect_properties: Option<ConnectProperties>,
642    /// Automatically assign outgoing MQTT 5 topic aliases when the broker supports them.
643    auto_topic_aliases: bool,
644    /// Policy used when automatically assigning outgoing MQTT 5 topic aliases.
645    topic_alias_policy: TopicAliasPolicy,
646    /// If set to `true` MQTT acknowledgements are not sent automatically.
647    /// Every incoming publish packet must be acknowledged manually with either
648    /// `client.ack(...)` or the `prepare_ack(...)` + `manual_ack(...)` flow.
649    manual_acks: bool,
650    network_options: NetworkOptions,
651    #[cfg(feature = "proxy")]
652    /// Proxy configuration.
653    proxy: Option<Proxy>,
654    /// Upper limit on maximum number of inflight requests.
655    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
656    outgoing_inflight_upper_limit: Option<u16>,
657    #[cfg(feature = "websocket")]
658    request_modifier: Option<RequestModifierFn>,
659    #[cfg(feature = "websocket")]
660    fallible_request_modifier: Option<FallibleRequestModifierFn>,
661    socket_connector: Option<SocketConnector>,
662
663    authenticator: Option<Arc<Mutex<dyn Authenticator>>>,
664}
665
666impl MqttOptions {
667    /// Create an [`MqttOptions`] object that contains default values for all settings other than
668    /// - id: A string to identify the device connecting to a broker
669    /// - broker: The broker target to connect to
670    ///
671    /// ```
672    /// # use rumqttc::MqttOptions;
673    /// let options = MqttOptions::new("123", "localhost");
674    /// ```
675    pub fn new<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> Self {
676        let broker = broker.into();
677        Self {
678            transport: broker.default_transport(),
679            broker,
680            keep_alive: Duration::from_secs(60),
681            clean_start: true,
682            client_id: id.into(),
683            auth: ConnectAuth::None,
684            request_channel_capacity: 10,
685            max_request_batch: 0,
686            read_batch_size: 0,
687            pending_throttle: Duration::from_micros(0),
688            last_will: None,
689            connect_timeout: Duration::from_secs(5),
690            default_max_incoming_size: 10 * 1024,
691            incoming_packet_size_limit: IncomingPacketSizeLimit::Default,
692            connect_properties: None,
693            auto_topic_aliases: false,
694            topic_alias_policy: TopicAliasPolicy::default(),
695            manual_acks: false,
696            network_options: NetworkOptions::new(),
697            #[cfg(feature = "proxy")]
698            proxy: None,
699            outgoing_inflight_upper_limit: None,
700            #[cfg(feature = "websocket")]
701            request_modifier: None,
702            #[cfg(feature = "websocket")]
703            fallible_request_modifier: None,
704            socket_connector: None,
705            authenticator: None,
706        }
707    }
708
709    /// Create a builder for [`MqttOptions`].
710    ///
711    /// ```
712    /// # use rumqttc::MqttOptions;
713    /// let options = MqttOptions::builder("123", "localhost")
714    ///     .keep_alive(5)
715    ///     .clean_start(true)
716    ///     .build();
717    /// ```
718    #[must_use]
719    pub fn builder<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> MqttOptionsBuilder {
720        MqttOptionsBuilder::new(id, broker)
721    }
722
723    #[cfg(feature = "url")]
724    /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's
725    /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature.
726    ///
727    /// ```
728    /// # use rumqttc::MqttOptions;
729    /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap();
730    /// ```
731    ///
732    /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://` or `ws://` to
733    /// denote the protocol for establishing a connection with the broker. On Unix platforms,
734    /// `unix:///path/to/socket` is also supported.
735    ///
736    /// **NOTE:** Secure transports are configured explicitly with
737    /// [`set_transport`](MqttOptions::set_transport). Secure URL schemes such as `mqtts://`,
738    /// `ssl://`, and `wss://` are rejected.
739    ///
740    /// ```ignore
741    /// # use rumqttc::{MqttOptions, Transport};
742    /// # use tokio_rustls::rustls::ClientConfig;
743    /// # let root_cert_store = rustls::RootCertStore::empty();
744    /// # let client_config = ClientConfig::builder()
745    /// #    .with_root_certificates(root_cert_store)
746    /// #    .with_no_client_auth();
747    /// let mut options = MqttOptions::parse_url("mqtt://example.com?client_id=123").unwrap();
748    /// options.set_transport(Transport::tls_with_config(client_config.into()));
749    /// ```
750    ///
751    /// On Unix platforms, `unix:///tmp/mqtt.sock?client_id=123` is also supported.
752    ///
753    /// # Errors
754    ///
755    /// Returns any [`OptionError`] produced while parsing the URL, validating its scheme,
756    /// interpreting query parameters, or constructing the broker options from it.
757    pub fn parse_url<S: Into<String>>(url: S) -> Result<Self, OptionError> {
758        let url = url::Url::parse(&url.into())?;
759        let options = Self::try_from(url)?;
760
761        Ok(options)
762    }
763
764    /// Broker target
765    pub const fn broker(&self) -> &Broker {
766        &self.broker
767    }
768
769    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
770        self.last_will = Some(will);
771        self
772    }
773
774    pub fn last_will(&self) -> Option<LastWill> {
775        self.last_will.clone()
776    }
777
778    /// Sets an infallible handler for modifying the websocket HTTP request before it is sent.
779    ///
780    /// Calling this method replaces any previously configured fallible request modifier.
781    #[cfg(feature = "websocket")]
782    pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
783    where
784        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
785        O: IntoFuture<Output = http::Request<()>> + 'static,
786        O::IntoFuture: Send,
787    {
788        self.request_modifier = Some(Arc::new(move |request| {
789            let request_modifier = request_modifier(request).into_future();
790            Box::pin(request_modifier)
791        }));
792        self.fallible_request_modifier = None;
793        self
794    }
795
796    /// Sets a fallible handler for modifying the websocket HTTP request before it is sent.
797    ///
798    /// Calling this method replaces any previously configured infallible request modifier.
799    /// If the modifier returns an error, the connection fails with
800    /// [`ConnectionError::RequestModifier`].
801    #[cfg(feature = "websocket")]
802    pub fn set_fallible_request_modifier<F, O, E>(&mut self, request_modifier: F) -> &mut Self
803    where
804        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
805        O: IntoFuture<Output = Result<http::Request<()>, E>> + 'static,
806        O::IntoFuture: Send,
807        E: std::error::Error + Send + Sync + 'static,
808    {
809        self.fallible_request_modifier = Some(Arc::new(move |request| {
810            let request_modifier = request_modifier(request).into_future();
811            Box::pin(async move {
812                request_modifier
813                    .await
814                    .map_err(|error| Box::new(error) as RequestModifierError)
815            })
816        }));
817        self.request_modifier = None;
818        self
819    }
820
821    #[cfg(feature = "websocket")]
822    pub fn request_modifier(&self) -> Option<RequestModifierFn> {
823        self.request_modifier.clone()
824    }
825
826    #[cfg(feature = "websocket")]
827    pub(crate) fn fallible_request_modifier(&self) -> Option<FallibleRequestModifierFn> {
828        self.fallible_request_modifier.clone()
829    }
830
831    /// Sets a custom socket connector, overriding the default TCP socket creation logic.
832    ///
833    /// The connector is used to create the base stream before optional proxy/TLS/WebSocket layers
834    /// managed by `MqttOptions` are applied.
835    ///
836    /// If the connector already performs TLS/proxy work, configure `MqttOptions` transport/proxy
837    /// to avoid layering those concerns twice.
838    ///
839    /// Custom connectors are also responsible for honoring `network_options` themselves. To keep
840    /// `NetworkOptions` behavior such as `set_bind_addr(...)`, forward `network_options` into
841    /// `rumqttc::default_socket_connect(...)` or apply the equivalent socket configuration before
842    /// connecting.
843    ///
844    /// # Example
845    /// ```
846    /// # use rumqttc::MqttOptions;
847    /// # let mut options = MqttOptions::new("test", "localhost");
848    /// options.set_socket_connector(|host, network_options| async move {
849    ///     rumqttc::default_socket_connect(host, network_options).await
850    /// });
851    /// ```
852    #[cfg(not(feature = "websocket"))]
853    pub fn set_socket_connector<F, Fut, S>(&mut self, f: F) -> &mut Self
854    where
855        F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
856        Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
857        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
858    {
859        self.socket_connector = Some(Arc::new(move |host, network_options| {
860            let stream_future = f(host, network_options);
861            let future = async move {
862                let stream = stream_future.await?;
863                Ok(Box::new(stream) as Box<dyn crate::framed::AsyncReadWrite>)
864            };
865            Box::pin(future)
866        }));
867        self
868    }
869
870    /// Sets a custom socket connector, overriding the default TCP socket creation logic.
871    ///
872    /// The connector is used to create the base stream before optional proxy/TLS/WebSocket layers
873    /// managed by `MqttOptions` are applied.
874    ///
875    /// If the connector already performs TLS/proxy work, configure `MqttOptions` transport/proxy
876    /// to avoid layering those concerns twice.
877    ///
878    /// Custom connectors are also responsible for honoring `network_options` themselves. To keep
879    /// `NetworkOptions` behavior such as `set_bind_addr(...)`, forward `network_options` into
880    /// `rumqttc::default_socket_connect(...)` or apply the equivalent socket configuration before
881    /// connecting.
882    ///
883    /// # Example
884    /// ```
885    /// # use rumqttc::MqttOptions;
886    /// # let mut options = MqttOptions::new("test", "localhost");
887    /// options.set_socket_connector(|host, network_options| async move {
888    ///     rumqttc::default_socket_connect(host, network_options).await
889    /// });
890    /// ```
891    #[cfg(feature = "websocket")]
892    pub fn set_socket_connector<F, Fut, S>(&mut self, f: F) -> &mut Self
893    where
894        F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
895        Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
896        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
897    {
898        self.socket_connector = Some(Arc::new(move |host, network_options| {
899            let stream_future = f(host, network_options);
900            let future = async move {
901                let stream = stream_future.await?;
902                Ok(Box::new(stream) as Box<dyn crate::framed::AsyncReadWrite>)
903            };
904            Box::pin(future)
905        }));
906        self
907    }
908
909    /// Returns whether a custom socket connector has been set.
910    pub fn has_socket_connector(&self) -> bool {
911        self.socket_connector.is_some()
912    }
913
914    pub fn set_client_id(&mut self, client_id: String) -> &mut Self {
915        self.client_id = client_id;
916        self
917    }
918
919    #[cfg(not(any(feature = "use-rustls-no-provider", feature = "use-native-tls")))]
920    pub const fn set_transport(&mut self, transport: Transport) -> &mut Self {
921        self.transport = transport;
922        self
923    }
924
925    #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
926    pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
927        self.transport = transport;
928        self
929    }
930
931    /// Returns the configured transport.
932    pub fn transport(&self) -> Transport {
933        self.transport.clone()
934    }
935
936    /// Set number of seconds after which client should ping the broker
937    /// if there is no other data exchange
938    /// Set to `0` to disable automatic keep-alive pings.
939    pub fn set_keep_alive(&mut self, seconds: u16) -> &mut Self {
940        self.keep_alive = Duration::from_secs(u64::from(seconds));
941        self
942    }
943
944    /// Keep alive time
945    pub const fn keep_alive(&self) -> Duration {
946        self.keep_alive
947    }
948
949    /// Client identifier
950    pub fn client_id(&self) -> String {
951        self.client_id.clone()
952    }
953
954    /// `clean_start = true` removes all the state from queues & instructs the broker
955    /// to clean all the client state when client disconnects.
956    ///
957    /// When set `false`, broker will hold the client state and performs pending
958    /// operations on the client when reconnection with same `client_id`
959    /// happens. Local queue state is also held to retransmit packets after reconnection.
960    pub const fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
961        self.clean_start = clean_start;
962        self
963    }
964
965    /// Clean session
966    pub const fn clean_start(&self) -> bool {
967        self.clean_start
968    }
969
970    /// Replace the current CONNECT authentication state.
971    ///
972    /// ```
973    /// use bytes::Bytes;
974    /// use rumqttc::{ConnectAuth, MqttOptions};
975    ///
976    /// let mut options = MqttOptions::new("client", "localhost");
977    /// options.set_auth(ConnectAuth::UsernamePassword {
978    ///     username: "user".into(),
979    ///     password: Bytes::from_static(b"pw"),
980    /// });
981    /// ```
982    pub fn set_auth(&mut self, auth: ConnectAuth) -> &mut Self {
983        self.auth = auth;
984        self
985    }
986
987    /// Clear CONNECT authentication fields.
988    pub fn clear_auth(&mut self) -> &mut Self {
989        self.auth = ConnectAuth::None;
990        self
991    }
992
993    /// Set only the MQTT username field.
994    ///
995    /// ```
996    /// use rumqttc::{ConnectAuth, MqttOptions};
997    ///
998    /// let mut options = MqttOptions::new("client", "localhost");
999    /// options.set_username("user");
1000    ///
1001    /// assert_eq!(
1002    ///     options.auth(),
1003    ///     &ConnectAuth::Username {
1004    ///         username: "user".into(),
1005    ///     }
1006    /// );
1007    /// ```
1008    pub fn set_username<U: Into<String>>(&mut self, username: U) -> &mut Self {
1009        self.auth = ConnectAuth::Username {
1010            username: username.into(),
1011        };
1012        self
1013    }
1014
1015    /// Set only the MQTT password field.
1016    ///
1017    /// ```
1018    /// use bytes::Bytes;
1019    /// use rumqttc::{ConnectAuth, MqttOptions};
1020    ///
1021    /// let mut options = MqttOptions::new("client", "localhost");
1022    /// options.set_password(Bytes::from_static(b"\x00\xfftoken"));
1023    ///
1024    /// assert_eq!(
1025    ///     options.auth(),
1026    ///     &ConnectAuth::Password {
1027    ///         password: Bytes::from_static(b"\x00\xfftoken"),
1028    ///     }
1029    /// );
1030    /// ```
1031    pub fn set_password<P: Into<Bytes>>(&mut self, password: P) -> &mut Self {
1032        self.auth = ConnectAuth::Password {
1033            password: password.into(),
1034        };
1035        self
1036    }
1037
1038    /// Set both MQTT username and binary password fields.
1039    ///
1040    /// ```
1041    /// use bytes::Bytes;
1042    /// use rumqttc::{ConnectAuth, MqttOptions};
1043    ///
1044    /// let mut options = MqttOptions::new("client", "localhost");
1045    /// options.set_credentials("user", Bytes::from_static(b"\x00\xfftoken"));
1046    ///
1047    /// assert_eq!(
1048    ///     options.auth(),
1049    ///     &ConnectAuth::UsernamePassword {
1050    ///         username: "user".into(),
1051    ///         password: Bytes::from_static(b"\x00\xfftoken"),
1052    ///     }
1053    /// );
1054    /// ```
1055    pub fn set_credentials<U: Into<String>, P: Into<Bytes>>(
1056        &mut self,
1057        username: U,
1058        password: P,
1059    ) -> &mut Self {
1060        self.auth = ConnectAuth::UsernamePassword {
1061            username: username.into(),
1062            password: password.into(),
1063        };
1064        self
1065    }
1066
1067    /// CONNECT authentication fields.
1068    ///
1069    /// ```
1070    /// use rumqttc::{ConnectAuth, MqttOptions};
1071    ///
1072    /// let mut options = MqttOptions::new("client", "localhost");
1073    /// options.set_password("pw");
1074    ///
1075    /// match options.auth() {
1076    ///     ConnectAuth::Password { password } => assert_eq!(password.as_ref(), b"pw"),
1077    ///     auth => panic!("unexpected auth state: {auth:?}"),
1078    /// }
1079    /// ```
1080    pub const fn auth(&self) -> &ConnectAuth {
1081        &self.auth
1082    }
1083
1084    /// Set request channel capacity
1085    pub const fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
1086        self.request_channel_capacity = capacity;
1087        self
1088    }
1089
1090    /// Request channel capacity
1091    pub const fn request_channel_capacity(&self) -> usize {
1092        self.request_channel_capacity
1093    }
1094
1095    /// Set maximum number of requests processed in one eventloop iteration.
1096    ///
1097    /// `0` preserves legacy behavior (effectively processes one request).
1098    pub const fn set_max_request_batch(&mut self, max: usize) -> &mut Self {
1099        self.max_request_batch = max;
1100        self
1101    }
1102
1103    /// Maximum number of requests processed in one eventloop iteration.
1104    pub const fn max_request_batch(&self) -> usize {
1105        self.max_request_batch
1106    }
1107
1108    /// Set maximum number of packets processed in one network read batch.
1109    ///
1110    /// `0` enables adaptive batching.
1111    pub const fn set_read_batch_size(&mut self, size: usize) -> &mut Self {
1112        self.read_batch_size = size;
1113        self
1114    }
1115
1116    /// Maximum number of packets processed in one network read batch.
1117    ///
1118    /// `0` means adaptive batching.
1119    pub const fn read_batch_size(&self) -> usize {
1120        self.read_batch_size
1121    }
1122
1123    /// Enables throttling and sets outoing message rate to the specified 'rate'
1124    pub const fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
1125        self.pending_throttle = duration;
1126        self
1127    }
1128
1129    /// Outgoing message rate
1130    pub const fn pending_throttle(&self) -> Duration {
1131        self.pending_throttle
1132    }
1133
1134    /// set connect timeout
1135    pub const fn set_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
1136        self.connect_timeout = timeout;
1137        self
1138    }
1139
1140    /// get connect timeout
1141    pub const fn connect_timeout(&self) -> Duration {
1142        self.connect_timeout
1143    }
1144
1145    /// set connection properties
1146    pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
1147        self.incoming_packet_size_limit = properties.max_packet_size.map_or(
1148            IncomingPacketSizeLimit::Default,
1149            IncomingPacketSizeLimit::Bytes,
1150        );
1151        self.connect_properties = Some(properties);
1152        self
1153    }
1154
1155    /// get connection properties
1156    pub fn connect_properties(&self) -> Option<ConnectProperties> {
1157        self.connect_properties.clone()
1158    }
1159
1160    /// set session expiry interval on connection properties
1161    pub fn set_session_expiry_interval(&mut self, interval: Option<u32>) -> &mut Self {
1162        if let Some(conn_props) = &mut self.connect_properties {
1163            conn_props.session_expiry_interval = interval;
1164            self
1165        } else {
1166            let mut conn_props = ConnectProperties::new();
1167            conn_props.session_expiry_interval = interval;
1168            self.set_connect_properties(conn_props)
1169        }
1170    }
1171
1172    /// get session expiry interval on connection properties
1173    pub const fn session_expiry_interval(&self) -> Option<u32> {
1174        if let Some(conn_props) = &self.connect_properties {
1175            conn_props.session_expiry_interval
1176        } else {
1177            None
1178        }
1179    }
1180
1181    /// set receive maximum on connection properties
1182    pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
1183        if let Some(conn_props) = &mut self.connect_properties {
1184            conn_props.receive_maximum = recv_max;
1185            self
1186        } else {
1187            let mut conn_props = ConnectProperties::new();
1188            conn_props.receive_maximum = recv_max;
1189            self.set_connect_properties(conn_props)
1190        }
1191    }
1192
1193    /// get receive maximum from connection properties
1194    pub const fn receive_maximum(&self) -> Option<u16> {
1195        if let Some(conn_props) = &self.connect_properties {
1196            conn_props.receive_maximum
1197        } else {
1198            None
1199        }
1200    }
1201
1202    /// set max packet size on connection properties
1203    pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
1204        self.incoming_packet_size_limit = max_size.map_or(
1205            IncomingPacketSizeLimit::Default,
1206            IncomingPacketSizeLimit::Bytes,
1207        );
1208
1209        if let Some(conn_props) = &mut self.connect_properties {
1210            conn_props.max_packet_size = max_size;
1211            self
1212        } else {
1213            let mut conn_props = ConnectProperties::new();
1214            conn_props.max_packet_size = max_size;
1215            self.set_connect_properties(conn_props)
1216        }
1217    }
1218
1219    /// get max packet size from connection properties
1220    pub const fn max_packet_size(&self) -> Option<u32> {
1221        if let Some(conn_props) = &self.connect_properties {
1222            conn_props.max_packet_size
1223        } else {
1224            None
1225        }
1226    }
1227
1228    /// set local incoming packet size policy
1229    ///
1230    /// This controls packet size enforcement in the decoder:
1231    /// - [`IncomingPacketSizeLimit::Default`] uses `default_max_incoming_size`
1232    /// - [`IncomingPacketSizeLimit::Unlimited`] disables incoming size checks
1233    /// - [`IncomingPacketSizeLimit::Bytes`] enforces an explicit limit
1234    pub fn set_incoming_packet_size_limit(&mut self, limit: IncomingPacketSizeLimit) -> &mut Self {
1235        self.incoming_packet_size_limit = limit;
1236
1237        if let Some(conn_props) = &mut self.connect_properties {
1238            conn_props.max_packet_size = match limit {
1239                IncomingPacketSizeLimit::Bytes(max_size) => Some(max_size),
1240                IncomingPacketSizeLimit::Default | IncomingPacketSizeLimit::Unlimited => None,
1241            };
1242            return self;
1243        }
1244
1245        if let IncomingPacketSizeLimit::Bytes(max_size) = limit {
1246            let mut conn_props = ConnectProperties::new();
1247            conn_props.max_packet_size = Some(max_size);
1248            self.set_connect_properties(conn_props)
1249        } else {
1250            self
1251        }
1252    }
1253
1254    /// disable local incoming packet size checks
1255    pub fn set_unlimited_incoming_packet_size(&mut self) -> &mut Self {
1256        self.set_incoming_packet_size_limit(IncomingPacketSizeLimit::Unlimited)
1257    }
1258
1259    /// get local incoming packet size policy
1260    pub const fn incoming_packet_size_limit(&self) -> IncomingPacketSizeLimit {
1261        self.incoming_packet_size_limit
1262    }
1263
1264    pub(crate) const fn max_incoming_packet_size(&self) -> Option<u32> {
1265        match self.incoming_packet_size_limit {
1266            IncomingPacketSizeLimit::Default => Some(self.default_max_incoming_size),
1267            IncomingPacketSizeLimit::Unlimited => None,
1268            IncomingPacketSizeLimit::Bytes(max_size) => Some(max_size),
1269        }
1270    }
1271
1272    /// set max topic alias on connection properties
1273    pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
1274        if let Some(conn_props) = &mut self.connect_properties {
1275            conn_props.topic_alias_max = topic_alias_max;
1276            self
1277        } else {
1278            let mut conn_props = ConnectProperties::new();
1279            conn_props.topic_alias_max = topic_alias_max;
1280            self.set_connect_properties(conn_props)
1281        }
1282    }
1283
1284    /// get max topic alias from connection properties
1285    pub const fn topic_alias_max(&self) -> Option<u16> {
1286        if let Some(conn_props) = &self.connect_properties {
1287            conn_props.topic_alias_max
1288        } else {
1289            None
1290        }
1291    }
1292
1293    /// Enable or disable automatic outgoing topic alias assignment.
1294    ///
1295    /// When enabled, the event loop assigns topic aliases immediately before
1296    /// sending publishes on connections where the broker advertises a non-zero
1297    /// Topic Alias Maximum. Publishes that already contain a topic alias are
1298    /// left unchanged.
1299    pub const fn set_auto_topic_aliases(&mut self, auto_topic_aliases: bool) -> &mut Self {
1300        self.auto_topic_aliases = auto_topic_aliases;
1301        self
1302    }
1303
1304    /// Returns whether automatic outgoing topic alias assignment is enabled.
1305    pub const fn auto_topic_aliases(&self) -> bool {
1306        self.auto_topic_aliases
1307    }
1308
1309    /// Set the policy used for automatic outgoing topic alias assignment.
1310    ///
1311    /// This only has an effect when automatic topic aliases are enabled with
1312    /// [`MqttOptions::set_auto_topic_aliases`].
1313    pub const fn set_topic_alias_policy(
1314        &mut self,
1315        topic_alias_policy: TopicAliasPolicy,
1316    ) -> &mut Self {
1317        self.topic_alias_policy = topic_alias_policy;
1318        self
1319    }
1320
1321    /// Returns the policy used for automatic outgoing topic alias assignment.
1322    pub const fn topic_alias_policy(&self) -> TopicAliasPolicy {
1323        self.topic_alias_policy
1324    }
1325
1326    /// set request response info on connection properties
1327    pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
1328        if let Some(conn_props) = &mut self.connect_properties {
1329            conn_props.request_response_info = request_response_info;
1330            self
1331        } else {
1332            let mut conn_props = ConnectProperties::new();
1333            conn_props.request_response_info = request_response_info;
1334            self.set_connect_properties(conn_props)
1335        }
1336    }
1337
1338    /// get request response info from connection properties
1339    pub const fn request_response_info(&self) -> Option<u8> {
1340        if let Some(conn_props) = &self.connect_properties {
1341            conn_props.request_response_info
1342        } else {
1343            None
1344        }
1345    }
1346
1347    /// set request problem info on connection properties
1348    pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
1349        if let Some(conn_props) = &mut self.connect_properties {
1350            conn_props.request_problem_info = request_problem_info;
1351            self
1352        } else {
1353            let mut conn_props = ConnectProperties::new();
1354            conn_props.request_problem_info = request_problem_info;
1355            self.set_connect_properties(conn_props)
1356        }
1357    }
1358
1359    /// get request problem info from connection properties
1360    pub const fn request_problem_info(&self) -> Option<u8> {
1361        if let Some(conn_props) = &self.connect_properties {
1362            conn_props.request_problem_info
1363        } else {
1364            None
1365        }
1366    }
1367
1368    /// set user properties on connection properties
1369    pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
1370        if let Some(conn_props) = &mut self.connect_properties {
1371            conn_props.user_properties = user_properties;
1372            self
1373        } else {
1374            let mut conn_props = ConnectProperties::new();
1375            conn_props.user_properties = user_properties;
1376            self.set_connect_properties(conn_props)
1377        }
1378    }
1379
1380    /// get user properties from connection properties
1381    pub fn user_properties(&self) -> Vec<(String, String)> {
1382        self.connect_properties
1383            .as_ref()
1384            .map_or_else(Vec::new, |conn_props| conn_props.user_properties.clone())
1385    }
1386
1387    /// set authentication method on connection properties
1388    pub fn set_authentication_method(
1389        &mut self,
1390        authentication_method: Option<String>,
1391    ) -> &mut Self {
1392        if let Some(conn_props) = &mut self.connect_properties {
1393            conn_props.authentication_method = authentication_method;
1394            self
1395        } else {
1396            let mut conn_props = ConnectProperties::new();
1397            conn_props.authentication_method = authentication_method;
1398            self.set_connect_properties(conn_props)
1399        }
1400    }
1401
1402    /// get authentication method from connection properties
1403    pub fn authentication_method(&self) -> Option<String> {
1404        self.connect_properties
1405            .as_ref()
1406            .and_then(|conn_props| conn_props.authentication_method.clone())
1407    }
1408
1409    /// set authentication data on connection properties
1410    pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
1411        if let Some(conn_props) = &mut self.connect_properties {
1412            conn_props.authentication_data = authentication_data;
1413            self
1414        } else {
1415            let mut conn_props = ConnectProperties::new();
1416            conn_props.authentication_data = authentication_data;
1417            self.set_connect_properties(conn_props)
1418        }
1419    }
1420
1421    /// get authentication data from connection properties
1422    pub fn authentication_data(&self) -> Option<Bytes> {
1423        self.connect_properties
1424            .as_ref()
1425            .map_or_else(|| None, |conn_props| conn_props.authentication_data.clone())
1426    }
1427
1428    /// Enables or disables manual acknowledgements for incoming publishes.
1429    ///
1430    /// When enabled, incoming `QoS1` and `QoS2` publishes are not acknowledged
1431    /// automatically. The application must acknowledge them explicitly with
1432    /// [`Client::ack`](`crate::Client::ack`), [`AsyncClient::ack`](`crate::AsyncClient::ack`),
1433    /// or by preparing a packet with `prepare_ack(...)` and sending it later
1434    /// with `manual_ack(...)`.
1435    pub const fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
1436        self.manual_acks = manual_acks;
1437        self
1438    }
1439
1440    /// get manual acknowledgements
1441    pub const fn manual_acks(&self) -> bool {
1442        self.manual_acks
1443    }
1444
1445    pub fn network_options(&self) -> NetworkOptions {
1446        self.network_options.clone()
1447    }
1448
1449    pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
1450        self.network_options = network_options;
1451        self
1452    }
1453
1454    #[cfg(feature = "proxy")]
1455    pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
1456        self.proxy = Some(proxy);
1457        self
1458    }
1459
1460    #[cfg(feature = "proxy")]
1461    pub fn proxy(&self) -> Option<Proxy> {
1462        self.proxy.clone()
1463    }
1464
1465    pub(crate) fn effective_socket_connector(&self) -> SocketConnector {
1466        self.socket_connector
1467            .clone()
1468            .unwrap_or_else(default_socket_connector)
1469    }
1470
1471    pub(crate) async fn socket_connect(
1472        &self,
1473        host: String,
1474        network_options: NetworkOptions,
1475    ) -> std::io::Result<Box<dyn crate::framed::AsyncReadWrite>> {
1476        let connector = self.effective_socket_connector();
1477        connector(host, network_options).await
1478    }
1479
1480    /// Get the upper limit on maximum number of inflight outgoing publishes.
1481    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
1482    pub const fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
1483        self.outgoing_inflight_upper_limit = Some(limit);
1484        self
1485    }
1486
1487    /// Set the upper limit on maximum number of inflight outgoing publishes.
1488    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
1489    pub const fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
1490        self.outgoing_inflight_upper_limit
1491    }
1492
1493    pub fn set_authenticator(&mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> &mut Self {
1494        self.authenticator = Some(authenticator);
1495        self
1496    }
1497
1498    pub fn authenticator(&self) -> Option<Arc<Mutex<dyn Authenticator>>> {
1499        self.authenticator.as_ref()?;
1500
1501        self.authenticator.clone()
1502    }
1503
1504    pub fn set_auth_manager(&mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> &mut Self {
1505        self.set_authenticator(authenticator)
1506    }
1507
1508    pub fn auth_manager(&self) -> Option<Arc<Mutex<dyn Authenticator>>> {
1509        self.authenticator()
1510    }
1511}
1512
1513/// Builder for [`MqttOptions`].
1514pub struct MqttOptionsBuilder {
1515    options: MqttOptions,
1516}
1517
1518impl MqttOptionsBuilder {
1519    /// Create a new [`MqttOptions`] builder.
1520    #[must_use]
1521    pub fn new<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> Self {
1522        Self {
1523            options: MqttOptions::new(id, broker),
1524        }
1525    }
1526
1527    /// Build the configured [`MqttOptions`].
1528    #[must_use]
1529    pub fn build(self) -> MqttOptions {
1530        self.options
1531    }
1532
1533    /// Set the last will.
1534    #[must_use]
1535    pub fn last_will(mut self, will: LastWill) -> Self {
1536        self.options.set_last_will(will);
1537        self
1538    }
1539
1540    /// Set an infallible handler for modifying the websocket HTTP request before it is sent.
1541    #[cfg(feature = "websocket")]
1542    #[must_use]
1543    pub fn request_modifier<F, O>(mut self, request_modifier: F) -> Self
1544    where
1545        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
1546        O: IntoFuture<Output = http::Request<()>> + 'static,
1547        O::IntoFuture: Send,
1548    {
1549        self.options.set_request_modifier(request_modifier);
1550        self
1551    }
1552
1553    /// Set a fallible handler for modifying the websocket HTTP request before it is sent.
1554    #[cfg(feature = "websocket")]
1555    #[must_use]
1556    pub fn fallible_request_modifier<F, O, E>(mut self, request_modifier: F) -> Self
1557    where
1558        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
1559        O: IntoFuture<Output = Result<http::Request<()>, E>> + 'static,
1560        O::IntoFuture: Send,
1561        E: std::error::Error + Send + Sync + 'static,
1562    {
1563        self.options.set_fallible_request_modifier(request_modifier);
1564        self
1565    }
1566
1567    /// Set a custom socket connector.
1568    #[cfg(not(feature = "websocket"))]
1569    #[must_use]
1570    pub fn socket_connector<F, Fut, S>(mut self, f: F) -> Self
1571    where
1572        F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
1573        Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
1574        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
1575    {
1576        self.options.set_socket_connector(f);
1577        self
1578    }
1579
1580    /// Set a custom socket connector.
1581    #[cfg(feature = "websocket")]
1582    #[must_use]
1583    pub fn socket_connector<F, Fut, S>(mut self, f: F) -> Self
1584    where
1585        F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
1586        Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
1587        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
1588    {
1589        self.options.set_socket_connector(f);
1590        self
1591    }
1592
1593    /// Set the client identifier.
1594    #[must_use]
1595    pub fn client_id(mut self, client_id: String) -> Self {
1596        self.options.set_client_id(client_id);
1597        self
1598    }
1599
1600    /// Set the transport.
1601    #[cfg(not(any(feature = "use-rustls-no-provider", feature = "use-native-tls")))]
1602    #[must_use]
1603    pub const fn transport(mut self, transport: Transport) -> Self {
1604        self.options.set_transport(transport);
1605        self
1606    }
1607
1608    /// Set the transport.
1609    #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
1610    #[must_use]
1611    pub fn transport(mut self, transport: Transport) -> Self {
1612        self.options.set_transport(transport);
1613        self
1614    }
1615
1616    /// Set number of seconds after which client should ping the broker if there is no other data exchange.
1617    #[must_use]
1618    pub fn keep_alive(mut self, seconds: u16) -> Self {
1619        self.options.set_keep_alive(seconds);
1620        self
1621    }
1622
1623    /// Set whether the broker should start a clean session.
1624    #[must_use]
1625    pub const fn clean_start(mut self, clean_start: bool) -> Self {
1626        self.options.set_clean_start(clean_start);
1627        self
1628    }
1629
1630    /// Replace the current CONNECT authentication state.
1631    #[must_use]
1632    pub fn auth(mut self, auth: ConnectAuth) -> Self {
1633        self.options.set_auth(auth);
1634        self
1635    }
1636
1637    /// Clear CONNECT authentication fields.
1638    #[must_use]
1639    pub fn clear_auth(mut self) -> Self {
1640        self.options.clear_auth();
1641        self
1642    }
1643
1644    /// Set only the MQTT username field.
1645    #[must_use]
1646    pub fn username<U: Into<String>>(mut self, username: U) -> Self {
1647        self.options.set_username(username);
1648        self
1649    }
1650
1651    /// Set only the MQTT password field.
1652    #[must_use]
1653    pub fn password<P: Into<Bytes>>(mut self, password: P) -> Self {
1654        self.options.set_password(password);
1655        self
1656    }
1657
1658    /// Set both MQTT username and binary password fields.
1659    #[must_use]
1660    pub fn credentials<U: Into<String>, P: Into<Bytes>>(
1661        mut self,
1662        username: U,
1663        password: P,
1664    ) -> Self {
1665        self.options.set_credentials(username, password);
1666        self
1667    }
1668
1669    /// Set request channel capacity.
1670    #[must_use]
1671    pub const fn request_channel_capacity(mut self, capacity: usize) -> Self {
1672        self.options.set_request_channel_capacity(capacity);
1673        self
1674    }
1675
1676    /// Set maximum number of requests processed in one eventloop iteration.
1677    #[must_use]
1678    pub const fn max_request_batch(mut self, max: usize) -> Self {
1679        self.options.set_max_request_batch(max);
1680        self
1681    }
1682
1683    /// Set maximum number of packets processed in one network read batch.
1684    #[must_use]
1685    pub const fn read_batch_size(mut self, size: usize) -> Self {
1686        self.options.set_read_batch_size(size);
1687        self
1688    }
1689
1690    /// Set the minimum delay between retransmitted outgoing packets.
1691    #[must_use]
1692    pub const fn pending_throttle(mut self, duration: Duration) -> Self {
1693        self.options.set_pending_throttle(duration);
1694        self
1695    }
1696
1697    /// Set connect timeout.
1698    #[must_use]
1699    pub const fn connect_timeout(mut self, timeout: Duration) -> Self {
1700        self.options.set_connect_timeout(timeout);
1701        self
1702    }
1703
1704    /// Set connection properties.
1705    #[must_use]
1706    pub fn connect_properties(mut self, properties: ConnectProperties) -> Self {
1707        self.options.set_connect_properties(properties);
1708        self
1709    }
1710
1711    /// Set session expiry interval on connection properties.
1712    #[must_use]
1713    pub fn session_expiry_interval(mut self, interval: Option<u32>) -> Self {
1714        self.options.set_session_expiry_interval(interval);
1715        self
1716    }
1717
1718    /// Set receive maximum on connection properties.
1719    #[must_use]
1720    pub fn receive_maximum(mut self, recv_max: Option<u16>) -> Self {
1721        self.options.set_receive_maximum(recv_max);
1722        self
1723    }
1724
1725    /// Set max packet size on connection properties.
1726    #[must_use]
1727    pub fn max_packet_size(mut self, max_size: Option<u32>) -> Self {
1728        self.options.set_max_packet_size(max_size);
1729        self
1730    }
1731
1732    /// Set local incoming packet size policy.
1733    #[must_use]
1734    pub fn incoming_packet_size_limit(mut self, limit: IncomingPacketSizeLimit) -> Self {
1735        self.options.set_incoming_packet_size_limit(limit);
1736        self
1737    }
1738
1739    /// Disable local incoming packet size checks.
1740    #[must_use]
1741    pub fn unlimited_incoming_packet_size(mut self) -> Self {
1742        self.options.set_unlimited_incoming_packet_size();
1743        self
1744    }
1745
1746    /// Set max topic alias on connection properties.
1747    #[must_use]
1748    pub fn topic_alias_max(mut self, topic_alias_max: Option<u16>) -> Self {
1749        self.options.set_topic_alias_max(topic_alias_max);
1750        self
1751    }
1752
1753    /// Enable or disable automatic outgoing topic alias assignment.
1754    #[must_use]
1755    pub const fn auto_topic_aliases(mut self, auto_topic_aliases: bool) -> Self {
1756        self.options.set_auto_topic_aliases(auto_topic_aliases);
1757        self
1758    }
1759
1760    /// Set the policy used for automatic outgoing topic alias assignment.
1761    #[must_use]
1762    pub const fn topic_alias_policy(mut self, topic_alias_policy: TopicAliasPolicy) -> Self {
1763        self.options.set_topic_alias_policy(topic_alias_policy);
1764        self
1765    }
1766
1767    /// Set request response info on connection properties.
1768    #[must_use]
1769    pub fn request_response_info(mut self, request_response_info: Option<u8>) -> Self {
1770        self.options
1771            .set_request_response_info(request_response_info);
1772        self
1773    }
1774
1775    /// Set request problem info on connection properties.
1776    #[must_use]
1777    pub fn request_problem_info(mut self, request_problem_info: Option<u8>) -> Self {
1778        self.options.set_request_problem_info(request_problem_info);
1779        self
1780    }
1781
1782    /// Set user properties on connection properties.
1783    #[must_use]
1784    pub fn user_properties(mut self, user_properties: Vec<(String, String)>) -> Self {
1785        self.options.set_user_properties(user_properties);
1786        self
1787    }
1788
1789    /// Set authentication method on connection properties.
1790    #[must_use]
1791    pub fn authentication_method(mut self, authentication_method: Option<String>) -> Self {
1792        self.options
1793            .set_authentication_method(authentication_method);
1794        self
1795    }
1796
1797    /// Set authentication data on connection properties.
1798    #[must_use]
1799    pub fn authentication_data(mut self, authentication_data: Option<Bytes>) -> Self {
1800        self.options.set_authentication_data(authentication_data);
1801        self
1802    }
1803
1804    /// Enable or disable manual acknowledgements.
1805    #[must_use]
1806    pub const fn manual_acks(mut self, manual_acks: bool) -> Self {
1807        self.options.set_manual_acks(manual_acks);
1808        self
1809    }
1810
1811    /// Set network options.
1812    #[must_use]
1813    pub fn network_options(mut self, network_options: NetworkOptions) -> Self {
1814        self.options.set_network_options(network_options);
1815        self
1816    }
1817
1818    /// Set proxy configuration.
1819    #[cfg(feature = "proxy")]
1820    #[must_use]
1821    pub fn proxy(mut self, proxy: Proxy) -> Self {
1822        self.options.set_proxy(proxy);
1823        self
1824    }
1825
1826    /// Set the upper limit on maximum number of inflight outgoing publishes.
1827    #[must_use]
1828    pub const fn outgoing_inflight_upper_limit(mut self, limit: u16) -> Self {
1829        self.options.set_outgoing_inflight_upper_limit(limit);
1830        self
1831    }
1832
1833    /// Set the authentication callback.
1834    #[must_use]
1835    pub fn authenticator(mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> Self {
1836        self.options.set_authenticator(authenticator);
1837        self
1838    }
1839
1840    /// Set the authentication callback.
1841    #[must_use]
1842    pub fn auth_manager(mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> Self {
1843        self.options.set_authenticator(authenticator);
1844        self
1845    }
1846}
1847
1848#[derive(Debug, PartialEq, Eq, thiserror::Error)]
1849pub enum OptionError {
1850    #[error("Unsupported URL scheme.")]
1851    Scheme,
1852
1853    #[error(
1854        "Secure MQTT URL schemes require explicit TLS transport configuration via MqttOptions::set_transport(...)."
1855    )]
1856    SecureUrlRequiresExplicitTransport,
1857
1858    #[error("Missing client ID.")]
1859    ClientId,
1860
1861    #[error("Invalid Unix socket path.")]
1862    UnixSocketPath,
1863
1864    #[cfg(feature = "websocket")]
1865    #[error("Invalid websocket url.")]
1866    WebsocketUrl,
1867
1868    #[cfg(feature = "websocket")]
1869    #[error(
1870        "Secure websocket URLs require Broker::websocket(\"ws://...\") plus MqttOptions::set_transport(Transport::wss_with_config(...))."
1871    )]
1872    WssRequiresExplicitTransport,
1873
1874    #[error("Invalid keep-alive value.")]
1875    KeepAlive,
1876
1877    #[error("Invalid clean-start value.")]
1878    CleanStart,
1879
1880    #[error("Invalid max-incoming-packet-size value.")]
1881    MaxIncomingPacketSize,
1882
1883    #[error("Invalid max-outgoing-packet-size value.")]
1884    MaxOutgoingPacketSize,
1885
1886    #[error("Invalid request-channel-capacity value.")]
1887    RequestChannelCapacity,
1888
1889    #[error("Invalid max-request-batch value.")]
1890    MaxRequestBatch,
1891
1892    #[error("Invalid read-batch-size value.")]
1893    ReadBatchSize,
1894
1895    #[error("Invalid pending-throttle value.")]
1896    PendingThrottle,
1897
1898    #[error("Invalid inflight value.")]
1899    Inflight,
1900
1901    #[error("Invalid conn-timeout value.")]
1902    ConnTimeout,
1903
1904    #[error("Unknown option: {0}")]
1905    Unknown(String),
1906
1907    #[cfg(feature = "url")]
1908    #[error("Couldn't parse option from url: {0}")]
1909    Parse(#[from] url::ParseError),
1910}
1911
1912#[cfg(feature = "url")]
1913impl std::convert::TryFrom<url::Url> for MqttOptions {
1914    type Error = OptionError;
1915
1916    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
1917        use std::collections::HashMap;
1918
1919        let broker = match url.scheme() {
1920            "mqtts" | "ssl" => return Err(OptionError::SecureUrlRequiresExplicitTransport),
1921            "mqtt" | "tcp" => Broker::tcp(
1922                url.host_str().unwrap_or_default(),
1923                url.port().unwrap_or(DEFAULT_BROKER_PORT),
1924            ),
1925            #[cfg(unix)]
1926            "unix" => Broker::unix(parse_unix_socket_path(&url)?),
1927            #[cfg(feature = "websocket")]
1928            "ws" => Broker::websocket(url.as_str().to_owned())?,
1929            #[cfg(feature = "websocket")]
1930            "wss" => return Err(OptionError::WssRequiresExplicitTransport),
1931            _ => return Err(OptionError::Scheme),
1932        };
1933
1934        let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
1935
1936        let id = queries
1937            .remove("client_id")
1938            .ok_or(OptionError::ClientId)?
1939            .into_owned();
1940
1941        let mut options = Self::new(id, broker);
1942        let mut connect_props = ConnectProperties::new();
1943
1944        if let Some(keep_alive) = queries
1945            .remove("keep_alive_secs")
1946            .map(|v| v.parse::<u16>().map_err(|_| OptionError::KeepAlive))
1947            .transpose()?
1948        {
1949            options.set_keep_alive(keep_alive);
1950        }
1951
1952        if let Some(clean_start) = queries
1953            .remove("clean_start")
1954            .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
1955            .transpose()?
1956        {
1957            options.set_clean_start(clean_start);
1958        }
1959
1960        let username = url.username();
1961        if let Some(password) = url.password() {
1962            options.set_credentials(username, password.to_owned());
1963        } else if !username.is_empty() {
1964            options.set_username(username);
1965        }
1966
1967        connect_props.max_packet_size = queries
1968            .remove("max_incoming_packet_size_bytes")
1969            .map(|v| {
1970                v.parse::<u32>()
1971                    .map_err(|_| OptionError::MaxIncomingPacketSize)
1972            })
1973            .transpose()?;
1974
1975        if let Some(request_channel_capacity) = queries
1976            .remove("request_channel_capacity_num")
1977            .map(|v| {
1978                v.parse::<usize>()
1979                    .map_err(|_| OptionError::RequestChannelCapacity)
1980            })
1981            .transpose()?
1982        {
1983            options.request_channel_capacity = request_channel_capacity;
1984        }
1985
1986        if let Some(max_request_batch) = queries
1987            .remove("max_request_batch_num")
1988            .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
1989            .transpose()?
1990        {
1991            options.max_request_batch = max_request_batch;
1992        }
1993
1994        if let Some(read_batch_size) = queries
1995            .remove("read_batch_size_num")
1996            .map(|v| v.parse::<usize>().map_err(|_| OptionError::ReadBatchSize))
1997            .transpose()?
1998        {
1999            options.read_batch_size = read_batch_size;
2000        }
2001
2002        if let Some(pending_throttle) = queries
2003            .remove("pending_throttle_usecs")
2004            .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
2005            .transpose()?
2006        {
2007            options.set_pending_throttle(Duration::from_micros(pending_throttle));
2008        }
2009
2010        connect_props.receive_maximum = queries
2011            .remove("inflight_num")
2012            .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
2013            .transpose()?;
2014
2015        if let Some(conn_timeout) = queries
2016            .remove("conn_timeout_secs")
2017            .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
2018            .transpose()?
2019        {
2020            options.set_connect_timeout(Duration::from_secs(conn_timeout));
2021        }
2022
2023        if let Some((opt, _)) = queries.into_iter().next() {
2024            return Err(OptionError::Unknown(opt.into_owned()));
2025        }
2026
2027        options.set_connect_properties(connect_props);
2028        Ok(options)
2029    }
2030}
2031
2032#[cfg(all(feature = "url", unix))]
2033fn parse_unix_socket_path(url: &url::Url) -> Result<PathBuf, OptionError> {
2034    if url.host_str().is_some() {
2035        return Err(OptionError::UnixSocketPath);
2036    }
2037
2038    let path = percent_decode_str(url.path()).collect::<Vec<u8>>();
2039    if path.is_empty() || path == b"/" {
2040        return Err(OptionError::UnixSocketPath);
2041    }
2042
2043    Ok(PathBuf::from(OsString::from_vec(path)))
2044}
2045
2046// Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't
2047// work.
2048impl Debug for MqttOptions {
2049    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2050        f.debug_struct("MqttOptions")
2051            .field("broker", &self.broker)
2052            .field("keep_alive", &self.keep_alive)
2053            .field("clean_start", &self.clean_start)
2054            .field("client_id", &self.client_id)
2055            .field("auth", &self.auth)
2056            .field("request_channel_capacity", &self.request_channel_capacity)
2057            .field("max_request_batch", &self.max_request_batch)
2058            .field("read_batch_size", &self.read_batch_size)
2059            .field("pending_throttle", &self.pending_throttle)
2060            .field("last_will", &self.last_will)
2061            .field("connect_timeout", &self.connect_timeout)
2062            .field("auto_topic_aliases", &self.auto_topic_aliases)
2063            .field("topic_alias_policy", &self.topic_alias_policy)
2064            .field("manual_acks", &self.manual_acks)
2065            .field("connect properties", &self.connect_properties)
2066            .finish_non_exhaustive()
2067    }
2068}
2069
2070#[cfg(test)]
2071mod test {
2072    use super::*;
2073    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2074    use std::sync::Arc;
2075    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2076    use tokio::net::{TcpListener, TcpSocket};
2077    use tokio::runtime::Builder;
2078    use tokio::sync::Notify;
2079
2080    fn runtime() -> tokio::runtime::Runtime {
2081        Builder::new_current_thread().enable_all().build().unwrap()
2082    }
2083
2084    #[test]
2085    fn staggered_attempts_allow_later_success_to_win() {
2086        runtime().block_on(async {
2087            let started = Arc::new(AtomicUsize::new(0));
2088            let started_for_connect = Arc::clone(&started);
2089            let begin = std::time::Instant::now();
2090
2091            let result = first_success_with_stagger(
2092                [0_u8, 1_u8],
2093                std::time::Duration::from_millis(10),
2094                move |attempt| {
2095                    let started = Arc::clone(&started_for_connect);
2096                    async move {
2097                        started.fetch_add(1, Ordering::SeqCst);
2098                        if attempt == 0 {
2099                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2100                            Err(std::io::Error::other("slow failure"))
2101                        } else {
2102                            Ok(42_u8)
2103                        }
2104                    }
2105                },
2106            )
2107            .await
2108            .unwrap();
2109
2110            assert_eq!(result, 42);
2111            assert_eq!(started.load(Ordering::SeqCst), 2);
2112            assert!(begin.elapsed() < std::time::Duration::from_millis(150));
2113        });
2114    }
2115
2116    #[test]
2117    fn staggered_connect_returns_invalid_input_for_empty_candidates() {
2118        runtime().block_on(async {
2119            let err = connect_resolved_addrs_staggered(Vec::new(), NetworkOptions::new())
2120                .await
2121                .unwrap_err();
2122
2123            assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
2124            assert_eq!(err.to_string(), "could not resolve to any address");
2125        });
2126    }
2127
2128    #[test]
2129    fn staggered_connect_tries_later_candidates() {
2130        runtime().block_on(async {
2131            let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2132            let good_addr = listener.local_addr().unwrap();
2133
2134            let unused_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2135            let bad_addr = unused_listener.local_addr().unwrap();
2136            drop(unused_listener);
2137
2138            let accept_task = tokio::spawn(async move {
2139                let (_stream, _) = listener.accept().await.unwrap();
2140            });
2141
2142            let stream =
2143                connect_resolved_addrs_staggered(vec![bad_addr, good_addr], NetworkOptions::new())
2144                    .await
2145                    .unwrap();
2146            assert_eq!(stream.peer_addr().unwrap(), good_addr);
2147
2148            accept_task.await.unwrap();
2149        });
2150    }
2151
2152    #[test]
2153    fn fixed_bind_port_retry_mode_keeps_slow_first_candidate_alive() {
2154        runtime().block_on(async {
2155            let reserved = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2156            let bind_port = reserved.local_addr().unwrap().port();
2157            drop(reserved);
2158
2159            let mut network_options = NetworkOptions::new();
2160            network_options.set_bind_addr(SocketAddr::V4(SocketAddrV4::new(
2161                Ipv4Addr::LOCALHOST,
2162                bind_port,
2163            )));
2164
2165            let first_attempt_started = Arc::new(Notify::new());
2166            let second_attempt_started = Arc::new(AtomicBool::new(false));
2167
2168            let mut connect_task = tokio::spawn({
2169                let first_attempt_started = Arc::clone(&first_attempt_started);
2170                let second_attempt_started = Arc::clone(&second_attempt_started);
2171                let network_options = network_options.clone();
2172                async move {
2173                    connect_with_retry_mode_and_delay(
2174                        [0_u8, 1_u8],
2175                        network_options,
2176                        Duration::from_millis(10),
2177                        move |attempt, network_options| {
2178                            let first_attempt_started = Arc::clone(&first_attempt_started);
2179                            let second_attempt_started = Arc::clone(&second_attempt_started);
2180                            async move {
2181                                if attempt == 0 {
2182                                    let bind_addr = network_options.bind_addr().unwrap();
2183                                    let socket = match bind_addr {
2184                                        SocketAddr::V4(_) => TcpSocket::new_v4()?,
2185                                        SocketAddr::V6(_) => TcpSocket::new_v6()?,
2186                                    };
2187                                    socket.bind(bind_addr)?;
2188                                    first_attempt_started.notify_one();
2189                                    std::future::pending::<io::Result<()>>().await
2190                                } else {
2191                                    second_attempt_started.store(true, Ordering::SeqCst);
2192                                    let _ = network_options;
2193                                    Ok(())
2194                                }
2195                            }
2196                        },
2197                    )
2198                    .await
2199                }
2200            });
2201
2202            first_attempt_started.notified().await;
2203
2204            assert!(
2205                tokio::time::timeout(Duration::from_millis(50), &mut connect_task)
2206                    .await
2207                    .is_err(),
2208                "fixed-port dialing should keep the first slow candidate alive instead of capping it to the stagger delay"
2209            );
2210            assert!(
2211                !second_attempt_started.load(Ordering::SeqCst),
2212                "fixed-port dialing should not start later same-family candidates while the first is still pending"
2213            );
2214            connect_task.abort();
2215        });
2216    }
2217
2218    #[test]
2219    fn fixed_bind_port_resolved_addrs_try_later_candidates() {
2220        runtime().block_on(async {
2221            let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2222            let good_addr = listener.local_addr().unwrap();
2223
2224            let unused_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2225            let bad_addr = unused_listener.local_addr().unwrap();
2226            drop(unused_listener);
2227
2228            let reserved = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2229            let bind_port = reserved.local_addr().unwrap().port();
2230            drop(reserved);
2231
2232            let mut network_options = NetworkOptions::new();
2233            network_options.set_bind_addr(SocketAddr::V4(SocketAddrV4::new(
2234                Ipv4Addr::LOCALHOST,
2235                bind_port,
2236            )));
2237
2238            let accept_task = tokio::spawn(async move {
2239                let (stream, peer_addr) = listener.accept().await.unwrap();
2240                drop(stream);
2241                peer_addr
2242            });
2243
2244            let stream =
2245                connect_resolved_addrs_staggered(vec![bad_addr, good_addr], network_options)
2246                    .await
2247                    .unwrap();
2248            assert_eq!(stream.peer_addr().unwrap(), good_addr);
2249            drop(stream);
2250
2251            let peer_addr = accept_task.await.unwrap();
2252            assert_eq!(peer_addr.port(), bind_port);
2253            assert!(peer_addr.ip().is_loopback());
2254        });
2255    }
2256
2257    #[test]
2258    fn socket_connect_uses_custom_connector_over_default() {
2259        runtime().block_on(async {
2260            let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2261            let good_addr = listener.local_addr().unwrap();
2262            let used_custom = Arc::new(AtomicUsize::new(0));
2263            let used_custom_for_connector = Arc::clone(&used_custom);
2264
2265            let accept_task = tokio::spawn(async move {
2266                let (_stream, _) = listener.accept().await.unwrap();
2267            });
2268
2269            let mut options = MqttOptions::new("test-client", "localhost");
2270            options.set_socket_connector(move |_host, _network_options| {
2271                let used_custom = Arc::clone(&used_custom_for_connector);
2272                async move {
2273                    used_custom.fetch_add(1, Ordering::SeqCst);
2274                    TcpStream::connect(good_addr).await
2275                }
2276            });
2277
2278            assert!(options.has_socket_connector());
2279            options
2280                .socket_connect("invalid.invalid:1883".to_owned(), NetworkOptions::new())
2281                .await
2282                .unwrap();
2283
2284            assert_eq!(used_custom.load(Ordering::SeqCst), 1);
2285            accept_task.await.unwrap();
2286        });
2287    }
2288
2289    #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
2290    mod request_modifier_tests {
2291        use super::{Broker, MqttOptions};
2292
2293        #[derive(Debug)]
2294        struct TestError;
2295
2296        impl std::fmt::Display for TestError {
2297            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2298                write!(f, "test error")
2299            }
2300        }
2301
2302        impl std::error::Error for TestError {}
2303
2304        #[test]
2305        fn infallible_modifier_is_set() {
2306            let mut options = MqttOptions::new(
2307                "test",
2308                Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2309            );
2310            options.set_request_modifier(|req| async move { req });
2311            assert!(options.request_modifier().is_some());
2312            assert!(options.fallible_request_modifier().is_none());
2313        }
2314
2315        #[test]
2316        fn fallible_modifier_is_set() {
2317            let mut options = MqttOptions::new(
2318                "test",
2319                Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2320            );
2321            options.set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) });
2322            assert!(options.request_modifier().is_none());
2323            assert!(options.fallible_request_modifier().is_some());
2324        }
2325
2326        #[test]
2327        fn last_setter_call_wins() {
2328            let mut options = MqttOptions::new(
2329                "test",
2330                Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2331            );
2332
2333            options
2334                .set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) })
2335                .set_request_modifier(|req| async move { req });
2336            assert!(options.request_modifier().is_some());
2337            assert!(options.fallible_request_modifier().is_none());
2338
2339            options
2340                .set_request_modifier(|req| async move { req })
2341                .set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) });
2342            assert!(options.request_modifier().is_none());
2343            assert!(options.fallible_request_modifier().is_some());
2344        }
2345    }
2346
2347    #[test]
2348    fn incoming_packet_size_limit_defaults_to_default_policy() {
2349        let mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2350        assert_eq!(
2351            mqtt_opts.incoming_packet_size_limit(),
2352            IncomingPacketSizeLimit::Default
2353        );
2354        assert_eq!(
2355            mqtt_opts.max_incoming_packet_size(),
2356            Some(mqtt_opts.default_max_incoming_size)
2357        );
2358    }
2359
2360    #[test]
2361    fn set_max_packet_size_remains_backward_compatible() {
2362        let mut mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2363
2364        mqtt_opts.set_max_packet_size(Some(2048));
2365        assert_eq!(
2366            mqtt_opts.incoming_packet_size_limit(),
2367            IncomingPacketSizeLimit::Bytes(2048)
2368        );
2369        assert_eq!(mqtt_opts.max_packet_size(), Some(2048));
2370        assert_eq!(mqtt_opts.max_incoming_packet_size(), Some(2048));
2371
2372        mqtt_opts.set_max_packet_size(None);
2373        assert_eq!(
2374            mqtt_opts.incoming_packet_size_limit(),
2375            IncomingPacketSizeLimit::Default
2376        );
2377        assert_eq!(mqtt_opts.max_packet_size(), None);
2378        assert_eq!(
2379            mqtt_opts.max_incoming_packet_size(),
2380            Some(mqtt_opts.default_max_incoming_size)
2381        );
2382    }
2383
2384    #[test]
2385    fn incoming_packet_size_limit_unlimited_disables_local_check() {
2386        let mut mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2387        mqtt_opts.set_unlimited_incoming_packet_size();
2388
2389        assert_eq!(
2390            mqtt_opts.incoming_packet_size_limit(),
2391            IncomingPacketSizeLimit::Unlimited
2392        );
2393        assert_eq!(mqtt_opts.max_incoming_packet_size(), None);
2394        assert_eq!(mqtt_opts.max_packet_size(), None);
2395        assert!(mqtt_opts.connect_properties.is_none());
2396    }
2397
2398    #[test]
2399    #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
2400    fn websocket_transport_can_be_explicitly_upgraded_to_wss() {
2401        use crate::{TlsConfiguration, Transport};
2402        let broker = Broker::websocket(
2403            "ws://a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host",
2404        )
2405        .expect("valid websocket broker");
2406        let mut mqttoptions = MqttOptions::new("client_a", broker);
2407
2408        assert!(matches!(mqttoptions.transport(), Transport::Ws));
2409        mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
2410
2411        if let Transport::Wss(TlsConfiguration::Simple {
2412            ca,
2413            client_auth,
2414            alpn,
2415        }) = mqttoptions.transport()
2416        {
2417            assert_eq!(ca.as_slice(), b"Test CA");
2418            assert_eq!(client_auth, None);
2419            assert_eq!(alpn, None);
2420        } else {
2421            panic!("Unexpected transport!");
2422        }
2423
2424        assert_eq!(
2425            mqttoptions.broker().websocket_url(),
2426            Some(
2427                "ws://a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"
2428            )
2429        );
2430    }
2431
2432    #[test]
2433    #[cfg(feature = "websocket")]
2434    fn wss_websocket_urls_require_explicit_transport() {
2435        assert_eq!(
2436            Broker::websocket("wss://example.com/mqtt"),
2437            Err(OptionError::WssRequiresExplicitTransport)
2438        );
2439    }
2440
2441    #[test]
2442    #[cfg(all(
2443        feature = "url",
2444        feature = "use-rustls-no-provider",
2445        feature = "websocket"
2446    ))]
2447    fn parse_url_ws_transport_can_be_explicitly_upgraded_to_wss() {
2448        use crate::{TlsConfiguration, Transport};
2449        let mut mqttoptions =
2450            MqttOptions::parse_url("ws://example.com:443/mqtt?client_id=client_a")
2451                .expect("valid websocket options");
2452
2453        assert!(matches!(mqttoptions.transport(), Transport::Ws));
2454        mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
2455
2456        if let Transport::Wss(TlsConfiguration::Simple {
2457            ca,
2458            client_auth,
2459            alpn,
2460        }) = mqttoptions.transport()
2461        {
2462            assert_eq!(ca.as_slice(), b"Test CA");
2463            assert_eq!(client_auth, None);
2464            assert_eq!(alpn, None);
2465        } else {
2466            panic!("Unexpected transport!");
2467        }
2468    }
2469
2470    #[test]
2471    #[cfg(all(feature = "url", feature = "use-rustls-no-provider"))]
2472    fn parse_url_mqtt_transport_can_be_explicitly_upgraded_to_tls() {
2473        use crate::{TlsConfiguration, Transport};
2474        let mut mqttoptions = MqttOptions::parse_url("mqtt://example.com:8883?client_id=client_a")
2475            .expect("valid tls options");
2476
2477        assert!(matches!(mqttoptions.transport(), Transport::Tcp));
2478        mqttoptions.set_transport(Transport::tls(Vec::from("Test CA"), None, None));
2479
2480        if let Transport::Tls(TlsConfiguration::Simple {
2481            ca,
2482            client_auth,
2483            alpn,
2484        }) = mqttoptions.transport()
2485        {
2486            assert_eq!(ca.as_slice(), b"Test CA");
2487            assert_eq!(client_auth, None);
2488            assert_eq!(alpn, None);
2489        } else {
2490            panic!("Unexpected transport!");
2491        }
2492    }
2493
2494    #[test]
2495    #[cfg(feature = "url")]
2496    fn parse_url_rejects_secure_url_schemes() {
2497        assert!(matches!(
2498            MqttOptions::parse_url("mqtts://example.com:8883?client_id=client_a"),
2499            Err(OptionError::SecureUrlRequiresExplicitTransport)
2500        ));
2501        assert!(matches!(
2502            MqttOptions::parse_url("ssl://example.com:8883?client_id=client_a"),
2503            Err(OptionError::SecureUrlRequiresExplicitTransport)
2504        ));
2505
2506        #[cfg(feature = "websocket")]
2507        assert!(matches!(
2508            MqttOptions::parse_url("wss://example.com:443/mqtt?client_id=client_a"),
2509            Err(OptionError::WssRequiresExplicitTransport)
2510        ));
2511    }
2512
2513    #[test]
2514    #[cfg(feature = "url")]
2515    fn from_url() {
2516        fn opt(s: &str) -> Result<MqttOptions, OptionError> {
2517            MqttOptions::parse_url(s)
2518        }
2519        fn ok(s: &str) -> MqttOptions {
2520            opt(s).expect("valid options")
2521        }
2522        fn err(s: &str) -> OptionError {
2523            opt(s).expect_err("invalid options")
2524        }
2525
2526        let v = ok("mqtt://host:42?client_id=foo");
2527        assert_eq!(v.broker().tcp_address(), Some(("host", 42)));
2528        assert_eq!(v.client_id(), "foo".to_owned());
2529
2530        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
2531        assert_eq!(v.keep_alive, Duration::from_secs(5));
2532        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=0");
2533        assert_eq!(v.keep_alive, Duration::from_secs(0));
2534        let v = ok("mqtt://host:42?client_id=foo&read_batch_size_num=32");
2535        assert_eq!(v.read_batch_size(), 32);
2536        let v = ok("mqtt://host:42?client_id=foo&conn_timeout_secs=7");
2537        assert_eq!(v.connect_timeout(), Duration::from_secs(7));
2538        let v = ok("mqtt://user@host:42?client_id=foo");
2539        assert_eq!(
2540            v.auth(),
2541            &ConnectAuth::Username {
2542                username: "user".to_owned(),
2543            }
2544        );
2545        let v = ok("mqtt://user:pw@host:42?client_id=foo");
2546        assert_eq!(
2547            v.auth(),
2548            &ConnectAuth::UsernamePassword {
2549                username: "user".to_owned(),
2550                password: Bytes::from_static(b"pw"),
2551            }
2552        );
2553        let v = ok("mqtt://:pw@host:42?client_id=foo");
2554        assert_eq!(
2555            v.auth(),
2556            &ConnectAuth::UsernamePassword {
2557                username: String::new(),
2558                password: Bytes::from_static(b"pw"),
2559            }
2560        );
2561
2562        assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
2563        assert_eq!(
2564            err("mqtt://host:42?client_id=foo&foo=bar"),
2565            OptionError::Unknown("foo".to_owned())
2566        );
2567        assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
2568        assert_eq!(
2569            err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
2570            OptionError::KeepAlive
2571        );
2572        assert_eq!(
2573            err("mqtt://host:42?client_id=foo&keep_alive_secs=65536"),
2574            OptionError::KeepAlive
2575        );
2576        assert_eq!(
2577            err("mqtt://host:42?client_id=foo&clean_start=foo"),
2578            OptionError::CleanStart
2579        );
2580        assert_eq!(
2581            err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
2582            OptionError::MaxIncomingPacketSize
2583        );
2584        assert_eq!(
2585            err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
2586            OptionError::RequestChannelCapacity
2587        );
2588        assert_eq!(
2589            err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
2590            OptionError::MaxRequestBatch
2591        );
2592        assert_eq!(
2593            err("mqtt://host:42?client_id=foo&read_batch_size_num=foo"),
2594            OptionError::ReadBatchSize
2595        );
2596        assert_eq!(
2597            err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
2598            OptionError::PendingThrottle
2599        );
2600        assert_eq!(
2601            err("mqtt://host:42?client_id=foo&inflight_num=foo"),
2602            OptionError::Inflight
2603        );
2604        assert_eq!(
2605            err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
2606            OptionError::ConnTimeout
2607        );
2608    }
2609
2610    #[test]
2611    #[cfg(unix)]
2612    fn unix_broker_sets_unix_transport_and_preserves_defaults() {
2613        let options = MqttOptions::new("client", Broker::unix("/tmp/mqtt.sock"));
2614        let baseline = MqttOptions::new("client", "127.0.0.1");
2615
2616        assert!(matches!(options.transport(), Transport::Unix));
2617        assert_eq!(
2618            options.broker().unix_path(),
2619            Some(std::path::Path::new("/tmp/mqtt.sock"))
2620        );
2621        assert_eq!(options.keep_alive, baseline.keep_alive);
2622        assert_eq!(options.clean_start, baseline.clean_start);
2623        assert_eq!(options.client_id, baseline.client_id);
2624        assert_eq!(
2625            options.request_channel_capacity,
2626            baseline.request_channel_capacity
2627        );
2628        assert_eq!(options.max_request_batch, baseline.max_request_batch);
2629        assert_eq!(options.read_batch_size, baseline.read_batch_size);
2630        assert_eq!(options.pending_throttle, baseline.pending_throttle);
2631        assert_eq!(options.connect_timeout, baseline.connect_timeout);
2632        assert_eq!(
2633            options.default_max_incoming_size,
2634            baseline.default_max_incoming_size
2635        );
2636        assert_eq!(
2637            options.incoming_packet_size_limit,
2638            baseline.incoming_packet_size_limit
2639        );
2640        assert_eq!(options.manual_acks, baseline.manual_acks);
2641        assert_eq!(
2642            options.outgoing_inflight_upper_limit,
2643            baseline.outgoing_inflight_upper_limit
2644        );
2645        assert!(options.authenticator.is_none());
2646    }
2647
2648    #[test]
2649    #[cfg(all(feature = "url", unix))]
2650    fn from_url_supports_unix_socket_paths() {
2651        let options = MqttOptions::parse_url(
2652            "unix:///tmp/mqtt.sock?client_id=foo&keep_alive_secs=5&read_batch_size_num=32",
2653        )
2654        .expect("valid unix socket options");
2655
2656        assert!(matches!(options.transport(), Transport::Unix));
2657        assert_eq!(
2658            options.broker().unix_path(),
2659            Some(std::path::Path::new("/tmp/mqtt.sock"))
2660        );
2661        assert_eq!(options.client_id(), "foo");
2662        assert_eq!(options.keep_alive, Duration::from_secs(5));
2663        assert_eq!(options.read_batch_size(), 32);
2664    }
2665
2666    #[test]
2667    #[cfg(all(feature = "url", unix))]
2668    fn from_url_decodes_percent_escaped_unix_socket_paths() {
2669        let options =
2670            MqttOptions::parse_url("unix:///tmp/mqtt%20broker.sock?client_id=foo").unwrap();
2671
2672        assert_eq!(
2673            options.broker().unix_path(),
2674            Some(std::path::Path::new("/tmp/mqtt broker.sock"))
2675        );
2676    }
2677
2678    #[test]
2679    #[cfg(all(feature = "url", unix))]
2680    fn from_url_preserves_percent_decoded_unix_socket_bytes() {
2681        use std::os::unix::ffi::OsStrExt;
2682
2683        let options = MqttOptions::parse_url("unix:///tmp/mqtt%FF.sock?client_id=foo").unwrap();
2684
2685        assert_eq!(
2686            options.broker().unix_path().unwrap().as_os_str().as_bytes(),
2687            b"/tmp/mqtt\xff.sock"
2688        );
2689    }
2690
2691    #[test]
2692    #[cfg(all(feature = "url", unix))]
2693    fn from_url_rejects_invalid_unix_socket_paths() {
2694        fn err(s: &str) -> OptionError {
2695            MqttOptions::parse_url(s).expect_err("invalid unix socket url")
2696        }
2697
2698        assert_eq!(err("unix:///tmp/mqtt.sock"), OptionError::ClientId);
2699        assert_eq!(
2700            err("unix://localhost/tmp/mqtt.sock?client_id=foo"),
2701            OptionError::UnixSocketPath
2702        );
2703        assert_eq!(err("unix:///?client_id=foo"), OptionError::UnixSocketPath);
2704    }
2705
2706    #[test]
2707    fn allow_empty_client_id() {
2708        let _mqtt_opts = MqttOptions::new("", "127.0.0.1").set_clean_start(true);
2709    }
2710
2711    #[test]
2712    fn mqtt_options_builder_matches_setter_configuration() {
2713        let will = LastWill::new("hello/world", "good bye", QoS::AtLeastOnce, false, None);
2714        let mut expected = MqttOptions::new("client", ("localhost", 1884));
2715        expected
2716            .set_keep_alive(5)
2717            .set_last_will(will.clone())
2718            .set_clean_start(false)
2719            .set_credentials("user", Bytes::from_static(b"password"))
2720            .set_request_channel_capacity(16)
2721            .set_max_request_batch(8)
2722            .set_read_batch_size(32)
2723            .set_pending_throttle(Duration::from_micros(250))
2724            .set_connect_timeout(Duration::from_secs(7))
2725            .set_session_expiry_interval(Some(120))
2726            .set_receive_maximum(Some(10))
2727            .set_topic_alias_max(Some(4))
2728            .set_request_response_info(Some(1))
2729            .set_request_problem_info(Some(0))
2730            .set_user_properties(vec![("k".to_owned(), "v".to_owned())])
2731            .set_authentication_method(Some("SCRAM-SHA-256".to_owned()))
2732            .set_authentication_data(Some(Bytes::from_static(b"auth")))
2733            .set_auto_topic_aliases(true)
2734            .set_topic_alias_policy(TopicAliasPolicy::Lru)
2735            .set_manual_acks(true)
2736            .set_outgoing_inflight_upper_limit(4);
2737
2738        let actual = MqttOptions::builder("client", ("localhost", 1884))
2739            .keep_alive(5)
2740            .last_will(will)
2741            .clean_start(false)
2742            .credentials("user", Bytes::from_static(b"password"))
2743            .request_channel_capacity(16)
2744            .max_request_batch(8)
2745            .read_batch_size(32)
2746            .pending_throttle(Duration::from_micros(250))
2747            .connect_timeout(Duration::from_secs(7))
2748            .session_expiry_interval(Some(120))
2749            .receive_maximum(Some(10))
2750            .topic_alias_max(Some(4))
2751            .request_response_info(Some(1))
2752            .request_problem_info(Some(0))
2753            .user_properties(vec![("k".to_owned(), "v".to_owned())])
2754            .authentication_method(Some("SCRAM-SHA-256".to_owned()))
2755            .authentication_data(Some(Bytes::from_static(b"auth")))
2756            .auto_topic_aliases(true)
2757            .topic_alias_policy(TopicAliasPolicy::Lru)
2758            .manual_acks(true)
2759            .outgoing_inflight_upper_limit(4)
2760            .build();
2761
2762        assert_eq!(
2763            actual.broker().tcp_address(),
2764            expected.broker().tcp_address()
2765        );
2766        assert_eq!(actual.keep_alive(), expected.keep_alive());
2767        assert_eq!(actual.last_will(), expected.last_will());
2768        assert_eq!(actual.clean_start(), expected.clean_start());
2769        assert_eq!(actual.auth(), expected.auth());
2770        assert_eq!(
2771            actual.request_channel_capacity(),
2772            expected.request_channel_capacity()
2773        );
2774        assert_eq!(actual.max_request_batch(), expected.max_request_batch());
2775        assert_eq!(actual.read_batch_size(), expected.read_batch_size());
2776        assert_eq!(actual.pending_throttle(), expected.pending_throttle());
2777        assert_eq!(actual.connect_timeout(), expected.connect_timeout());
2778        assert_eq!(actual.connect_properties(), expected.connect_properties());
2779        assert_eq!(actual.auto_topic_aliases(), expected.auto_topic_aliases());
2780        assert_eq!(actual.topic_alias_policy(), expected.topic_alias_policy());
2781        assert_eq!(actual.manual_acks(), expected.manual_acks());
2782        assert_eq!(
2783            actual.get_outgoing_inflight_upper_limit(),
2784            expected.get_outgoing_inflight_upper_limit()
2785        );
2786    }
2787
2788    #[test]
2789    fn mqtt_options_builder_configures_packet_size_policies() {
2790        let properties = ConnectProperties {
2791            max_packet_size: Some(2048),
2792            ..ConnectProperties::new()
2793        };
2794
2795        let from_properties = MqttOptions::builder("client", "localhost")
2796            .connect_properties(properties)
2797            .build();
2798        assert_eq!(
2799            from_properties.incoming_packet_size_limit(),
2800            IncomingPacketSizeLimit::Bytes(2048)
2801        );
2802
2803        let unlimited = MqttOptions::builder("client", "localhost")
2804            .max_packet_size(Some(1024))
2805            .unlimited_incoming_packet_size()
2806            .build();
2807        assert_eq!(
2808            unlimited.incoming_packet_size_limit(),
2809            IncomingPacketSizeLimit::Unlimited
2810        );
2811        assert_eq!(unlimited.max_packet_size(), None);
2812    }
2813
2814    #[test]
2815    fn mqtt_options_builder_can_replace_and_clear_auth() {
2816        let actual = MqttOptions::builder("client", "localhost")
2817            .password("password")
2818            .clear_auth()
2819            .auth(ConnectAuth::Username {
2820                username: "next".to_owned(),
2821            })
2822            .build();
2823
2824        assert_eq!(
2825            actual.auth(),
2826            &ConnectAuth::Username {
2827                username: "next".to_owned(),
2828            }
2829        );
2830    }
2831
2832    #[test]
2833    fn mqtt_options_builder_request_capacity_feeds_client_builder_default() {
2834        let mqttoptions = MqttOptions::builder("test-1", "localhost")
2835            .request_channel_capacity(1)
2836            .build();
2837        let (client, _eventloop) = AsyncClient::builder(mqttoptions).build();
2838
2839        client
2840            .try_publish("hello/world", QoS::AtMostOnce, false, "one")
2841            .expect("first request should fit configured capacity");
2842        assert!(matches!(
2843            client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
2844            Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
2845        ));
2846    }
2847
2848    #[test]
2849    fn read_batch_size_defaults_to_adaptive() {
2850        let options = MqttOptions::new("client", "127.0.0.1");
2851        assert_eq!(options.read_batch_size(), 0);
2852    }
2853
2854    #[test]
2855    fn set_read_batch_size() {
2856        let mut options = MqttOptions::new("client", "127.0.0.1");
2857        options.set_read_batch_size(48);
2858        assert_eq!(options.read_batch_size(), 48);
2859    }
2860
2861    #[test]
2862    #[cfg(feature = "url")]
2863    fn from_url_uses_default_incoming_limit_when_unspecified() {
2864        let mqtt_opts = MqttOptions::parse_url("mqtt://host:42?client_id=foo").unwrap();
2865        assert_eq!(
2866            mqtt_opts.incoming_packet_size_limit(),
2867            IncomingPacketSizeLimit::Default
2868        );
2869        assert_eq!(
2870            mqtt_opts.max_incoming_packet_size(),
2871            Some(mqtt_opts.default_max_incoming_size)
2872        );
2873    }
2874}