1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4#[cfg(all(feature = "use-rustls-ring", feature = "use-rustls-aws-lc"))]
5compile_error!(
6 "Features `use-rustls-ring` and `use-rustls-aws-lc` are mutually exclusive. Enable only one rustls provider feature."
7);
8
9#[macro_use]
10extern crate log;
11
12use bytes::Bytes;
13use std::fmt::{self, Debug, Formatter};
14use std::io;
15use std::net::SocketAddr;
16use std::path::PathBuf;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::net::{TcpStream, lookup_host};
20use tokio::task::JoinSet;
21
22#[cfg(all(feature = "url", unix))]
23use percent_encoding::percent_decode_str;
24
25#[cfg(all(feature = "url", unix))]
26use std::{ffi::OsString, os::unix::ffi::OsStringExt};
27
28#[cfg(feature = "websocket")]
29use std::{
30 future::{Future, IntoFuture},
31 pin::Pin,
32};
33
34mod auth;
35mod client;
36mod eventloop;
37mod framed;
38pub mod mqttbytes;
39mod notice;
40mod state;
41mod transport;
42
43#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
44mod tls;
45
46#[cfg(feature = "websocket")]
47mod websockets;
48
49#[cfg(feature = "proxy")]
50mod proxy;
51
52pub use client::{
53 AsyncClient, AsyncClientBuilder, Client, ClientBuilder, ClientError, Connection, InvalidTopic,
54 Iter, ManualAck, PublishTopic, RecvError, RecvTimeoutError, TryRecvError, ValidatedTopic,
55};
56pub use eventloop::{ConnectionError, Event, EventLoop};
57pub use mqttbytes::v5::*;
58pub use mqttbytes::*;
59pub use notice::{
60 AuthNotice, AuthNoticeError, NoticeFailureReason, PublishNotice, PublishNoticeError,
61 PublishResult, SubscribeNotice, SubscribeNoticeError, UnsubscribeNotice,
62 UnsubscribeNoticeError,
63};
64pub use rumqttc_core::NetworkOptions;
65#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
66pub use rumqttc_core::TlsConfiguration;
67pub use rumqttc_core::default_socket_connect;
68pub use state::{MqttState, MqttStateBuilder, StateError};
69pub use transport::Transport;
70
71#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
73pub enum TopicAliasPolicy {
74 #[default]
76 Monotonic,
77 Lru,
79}
80
81#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
82pub use crate::tls::Error as TlsError;
83
84#[cfg(feature = "proxy")]
85pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
86
87#[cfg(feature = "use-native-tls")]
88pub use tokio_native_tls;
89#[cfg(feature = "use-rustls-no-provider")]
90pub use tokio_rustls;
91
92pub type Incoming = Packet;
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum Outgoing {
97 Publish(u16),
99 Subscribe(u16),
101 Unsubscribe(u16),
103 PubAck(u16),
105 PubRec(u16),
107 PubRel(u16),
109 PubComp(u16),
111 PingReq,
113 PingResp,
115 Disconnect,
117 AwaitAck(u16),
119 Auth,
121}
122
123pub(crate) type SocketConnector = rumqttc_core::SocketConnector;
125
126const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(100);
127
128async fn first_success_with_stagger<T, I, F, Fut>(
129 items: I,
130 attempt_delay: Duration,
131 connect_fn: F,
132) -> io::Result<T>
133where
134 T: Send + 'static,
135 I: IntoIterator,
136 I::Item: Send + 'static,
137 F: Fn(I::Item) -> Fut + Send + Sync + Clone + 'static,
138 Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
139{
140 let mut join_set = JoinSet::new();
141 let mut item_count = 0usize;
142
143 for (index, item) in items.into_iter().enumerate() {
144 item_count += 1;
145 let delay = attempt_delay.saturating_mul(u32::try_from(index).unwrap_or(u32::MAX));
146 let connect_fn = connect_fn.clone();
147 join_set.spawn(async move {
148 tokio::time::sleep(delay).await;
149 connect_fn(item).await
150 });
151 }
152
153 if item_count == 0 {
154 return Err(io::Error::new(
155 io::ErrorKind::InvalidInput,
156 "could not resolve to any address",
157 ));
158 }
159
160 let mut last_err = None;
161
162 while let Some(task_result) = join_set.join_next().await {
163 match task_result {
164 Ok(Ok(stream)) => {
165 join_set.abort_all();
166 return Ok(stream);
167 }
168 Ok(Err(err)) => {
169 last_err = Some(err);
170 }
171 Err(err) => {
172 last_err = Some(io::Error::other(format!(
173 "concurrent connect task failed: {err}"
174 )));
175 }
176 }
177 }
178
179 Err(last_err.unwrap_or_else(|| {
180 io::Error::new(
181 io::ErrorKind::InvalidInput,
182 "could not resolve to any address",
183 )
184 }))
185}
186
187async fn first_success_sequential<T, I, F, Fut>(items: I, connect_fn: F) -> io::Result<T>
188where
189 I: IntoIterator,
190 F: Fn(I::Item) -> Fut,
191 Fut: std::future::Future<Output = io::Result<T>>,
192{
193 let mut item_count = 0usize;
194 let mut last_err = None;
195
196 for item in items {
197 item_count += 1;
198 match connect_fn(item).await {
199 Ok(stream) => return Ok(stream),
200 Err(err) => last_err = Some(err),
201 }
202 }
203
204 if item_count == 0 {
205 return Err(io::Error::new(
206 io::ErrorKind::InvalidInput,
207 "could not resolve to any address",
208 ));
209 }
210
211 Err(last_err.unwrap_or_else(|| {
212 io::Error::new(
213 io::ErrorKind::InvalidInput,
214 "could not resolve to any address",
215 )
216 }))
217}
218
219fn should_stagger_connect_attempts(network_options: &NetworkOptions) -> bool {
220 network_options
221 .bind_addr()
222 .is_none_or(|bind_addr| bind_addr.port() == 0)
223}
224
225async fn connect_with_retry_mode<T, I, F, Fut>(
226 items: I,
227 network_options: NetworkOptions,
228 connect_fn: F,
229) -> io::Result<T>
230where
231 T: Send + 'static,
232 I: IntoIterator,
233 I::Item: Send + 'static,
234 F: Fn(I::Item, NetworkOptions) -> Fut + Send + Sync + Clone + 'static,
235 Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
236{
237 connect_with_retry_mode_and_delay(items, network_options, CONNECTION_ATTEMPT_DELAY, connect_fn)
238 .await
239}
240
241async fn connect_with_retry_mode_and_delay<T, I, F, Fut>(
242 items: I,
243 network_options: NetworkOptions,
244 connection_attempt_delay: Duration,
245 connect_fn: F,
246) -> io::Result<T>
247where
248 T: Send + 'static,
249 I: IntoIterator,
250 I::Item: Send + 'static,
251 F: Fn(I::Item, NetworkOptions) -> Fut + Send + Sync + Clone + 'static,
252 Fut: std::future::Future<Output = io::Result<T>> + Send + 'static,
253{
254 if should_stagger_connect_attempts(&network_options) {
255 first_success_with_stagger(items, connection_attempt_delay, move |item| {
256 let network_options = network_options.clone();
257 let connect_fn = connect_fn.clone();
258 async move { connect_fn(item, network_options).await }
259 })
260 .await
261 } else {
262 first_success_sequential(items, move |item| {
263 let network_options = network_options.clone();
264 let connect_fn = connect_fn.clone();
265 async move { connect_fn(item, network_options).await }
266 })
267 .await
268 }
269}
270
271async fn connect_resolved_addrs_staggered(
272 addrs: Vec<SocketAddr>,
273 network_options: NetworkOptions,
274) -> io::Result<TcpStream> {
275 connect_with_retry_mode(
276 addrs,
277 network_options,
278 move |addr, network_options| async move {
279 rumqttc_core::connect_socket_addr(addr, network_options).await
280 },
281 )
282 .await
283}
284
285async fn default_socket_connect_staggered(
286 host: String,
287 network_options: NetworkOptions,
288) -> io::Result<TcpStream> {
289 let addrs = lookup_host(host).await?.collect::<Vec<_>>();
290 connect_resolved_addrs_staggered(addrs, network_options).await
291}
292
293fn default_socket_connector() -> SocketConnector {
294 Arc::new(|host, network_options| {
295 Box::pin(async move {
296 let tcp = default_socket_connect_staggered(host, network_options).await?;
297 Ok(Box::new(tcp) as Box<dyn crate::framed::AsyncReadWrite>)
298 })
299 })
300}
301
302const DEFAULT_BROKER_PORT: u16 = 1883;
303
304#[derive(Clone, Debug, PartialEq, Eq)]
306pub struct Broker {
307 inner: BrokerInner,
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311enum BrokerInner {
312 Tcp {
313 host: String,
314 port: u16,
315 },
316 #[cfg(unix)]
317 Unix {
318 path: PathBuf,
319 },
320 #[cfg(feature = "websocket")]
321 Websocket {
322 url: String,
323 },
324}
325
326impl Broker {
327 #[must_use]
328 pub fn tcp<S: Into<String>>(host: S, port: u16) -> Self {
329 Self {
330 inner: BrokerInner::Tcp {
331 host: host.into(),
332 port,
333 },
334 }
335 }
336
337 #[cfg(unix)]
338 #[must_use]
339 pub fn unix<P: Into<PathBuf>>(path: P) -> Self {
340 Self {
341 inner: BrokerInner::Unix { path: path.into() },
342 }
343 }
344
345 #[cfg(feature = "websocket")]
346 pub fn websocket<S: Into<String>>(url: S) -> Result<Self, OptionError> {
352 let url = url.into();
353 let uri = url
354 .parse::<http::Uri>()
355 .map_err(|_| OptionError::WebsocketUrl)?;
356
357 match uri.scheme_str() {
358 Some("ws") => {
359 rumqttc_core::split_url(&url).map_err(|_| OptionError::WebsocketUrl)?;
360 Ok(Self {
361 inner: BrokerInner::Websocket { url },
362 })
363 }
364 Some("wss") => Err(OptionError::WssRequiresExplicitTransport),
365 _ => Err(OptionError::Scheme),
366 }
367 }
368
369 #[must_use]
370 pub const fn tcp_address(&self) -> Option<(&str, u16)> {
371 match &self.inner {
372 BrokerInner::Tcp { host, port } => Some((host.as_str(), *port)),
373 #[cfg(unix)]
374 BrokerInner::Unix { .. } => None,
375 #[cfg(feature = "websocket")]
376 BrokerInner::Websocket { .. } => None,
377 }
378 }
379
380 #[cfg(unix)]
381 #[must_use]
382 pub fn unix_path(&self) -> Option<&std::path::Path> {
383 match &self.inner {
384 BrokerInner::Unix { path } => Some(path.as_path()),
385 BrokerInner::Tcp { .. } => None,
386 #[cfg(feature = "websocket")]
387 BrokerInner::Websocket { .. } => None,
388 }
389 }
390
391 #[cfg(feature = "websocket")]
392 #[must_use]
393 pub const fn websocket_url(&self) -> Option<&str> {
394 match &self.inner {
395 BrokerInner::Websocket { url } => Some(url.as_str()),
396 BrokerInner::Tcp { .. } => None,
397 #[cfg(unix)]
398 BrokerInner::Unix { .. } => None,
399 }
400 }
401
402 pub(crate) const fn default_transport(&self) -> Transport {
403 match &self.inner {
404 BrokerInner::Tcp { .. } => Transport::tcp(),
405 #[cfg(unix)]
406 BrokerInner::Unix { .. } => Transport::unix(),
407 #[cfg(feature = "websocket")]
408 BrokerInner::Websocket { .. } => Transport::Ws,
409 }
410 }
411}
412
413impl From<&str> for Broker {
414 fn from(host: &str) -> Self {
415 Self::tcp(host, DEFAULT_BROKER_PORT)
416 }
417}
418
419impl From<String> for Broker {
420 fn from(host: String) -> Self {
421 Self::tcp(host, DEFAULT_BROKER_PORT)
422 }
423}
424
425impl<S: Into<String>> From<(S, u16)> for Broker {
426 fn from((host, port): (S, u16)) -> Self {
427 Self::tcp(host, port)
428 }
429}
430
431#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
433pub enum IncomingPacketSizeLimit {
434 #[default]
436 Default,
437 Unlimited,
439 Bytes(u32),
441}
442
443#[derive(Clone, Copy, Debug, PartialEq, Eq)]
445pub enum AuthExchangeKind {
446 InitialConnect,
447 Reauthentication,
448}
449
450#[derive(Clone, Copy, Debug, PartialEq, Eq)]
452pub struct AuthContext<'a> {
453 pub kind: AuthExchangeKind,
454 pub method: &'a str,
455}
456
457#[derive(Clone, Debug, PartialEq, Eq)]
459pub enum AuthAction {
460 Send(AuthProperties),
461 Complete,
462 Fail(String),
463}
464
465#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
467pub enum AuthError {
468 #[error("authentication failed: {0}")]
469 Failed(String),
470}
471
472impl From<String> for AuthError {
473 fn from(value: String) -> Self {
474 Self::Failed(value)
475 }
476}
477
478impl From<&str> for AuthError {
479 fn from(value: &str) -> Self {
480 Self::Failed(value.to_owned())
481 }
482}
483
484#[derive(Clone, Debug, PartialEq, Eq)]
486pub enum AuthFailureReason {
487 SessionReset,
488 ProtocolError,
489 AuthenticationFailed(String),
490 ConnectionClosed,
491 OverlappingReauth,
492 MissingAuthenticationMethod,
493 NoticeDropped,
494}
495
496#[derive(Clone, Debug, PartialEq, Eq)]
498pub enum AuthOutcome {
499 Success,
500}
501
502#[derive(Clone, Debug, PartialEq, Eq)]
504pub enum AuthEvent {
505 Started {
506 kind: AuthExchangeKind,
507 method: String,
508 },
509 Continue {
510 kind: AuthExchangeKind,
511 method: String,
512 },
513 Succeeded {
514 kind: AuthExchangeKind,
515 method: String,
516 },
517 Failed {
518 kind: AuthExchangeKind,
519 method: String,
520 reason: AuthFailureReason,
521 },
522}
523
524pub trait Authenticator: std::fmt::Debug + Send {
526 fn start(&mut self, context: AuthContext<'_>) -> Result<Option<AuthProperties>, AuthError>;
532
533 fn continue_auth(
539 &mut self,
540 context: AuthContext<'_>,
541 incoming: Option<AuthProperties>,
542 ) -> Result<AuthAction, AuthError>;
543
544 fn success(
551 &mut self,
552 context: AuthContext<'_>,
553 incoming: Option<AuthProperties>,
554 ) -> Result<(), AuthError>;
555
556 fn failure(&mut self, context: AuthContext<'_>, error: AuthError);
558}
559
560#[derive(Clone, Debug, PartialEq, Eq)]
563pub enum Request {
564 Publish(Publish),
565 PubAck(PubAck),
566 PubRec(PubRec),
567 PubComp(PubComp),
568 PubRel(PubRel),
569 PingReq,
570 PingResp,
571 Subscribe(Subscribe),
572 SubAck(SubAck),
573 Unsubscribe(Unsubscribe),
574 UnsubAck(UnsubAck),
575 Auth(Auth),
576 Disconnect(Disconnect),
577 DisconnectNow(Disconnect),
578 DisconnectWithTimeout(Disconnect, Duration),
579}
580
581impl From<Subscribe> for Request {
582 fn from(subscribe: Subscribe) -> Self {
583 Self::Subscribe(subscribe)
584 }
585}
586
587#[cfg(feature = "websocket")]
588type RequestModifierFn = Arc<
589 dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
590 + Send
591 + Sync,
592>;
593
594#[cfg(feature = "websocket")]
595type RequestModifierError = Box<dyn std::error::Error + Send + Sync>;
596
597#[cfg(feature = "websocket")]
598type FallibleRequestModifierFn = Arc<
599 dyn Fn(
600 http::Request<()>,
601 )
602 -> Pin<Box<dyn Future<Output = Result<http::Request<()>, RequestModifierError>> + Send>>
603 + Send
604 + Sync,
605>;
606
607#[derive(Clone)]
609pub struct MqttOptions {
610 broker: Broker,
612 transport: Transport,
613 keep_alive: Duration,
615 clean_start: bool,
617 client_id: String,
619 auth: ConnectAuth,
621 request_channel_capacity: usize,
623 max_request_batch: usize,
625 read_batch_size: usize,
628 pending_throttle: Duration,
631 last_will: Option<LastWill>,
633 connect_timeout: Duration,
635 default_max_incoming_size: u32,
638 incoming_packet_size_limit: IncomingPacketSizeLimit,
640 connect_properties: Option<ConnectProperties>,
642 auto_topic_aliases: bool,
644 topic_alias_policy: TopicAliasPolicy,
646 manual_acks: bool,
650 network_options: NetworkOptions,
651 #[cfg(feature = "proxy")]
652 proxy: Option<Proxy>,
654 outgoing_inflight_upper_limit: Option<u16>,
657 #[cfg(feature = "websocket")]
658 request_modifier: Option<RequestModifierFn>,
659 #[cfg(feature = "websocket")]
660 fallible_request_modifier: Option<FallibleRequestModifierFn>,
661 socket_connector: Option<SocketConnector>,
662
663 authenticator: Option<Arc<Mutex<dyn Authenticator>>>,
664}
665
666impl MqttOptions {
667 pub fn new<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> Self {
676 let broker = broker.into();
677 Self {
678 transport: broker.default_transport(),
679 broker,
680 keep_alive: Duration::from_secs(60),
681 clean_start: true,
682 client_id: id.into(),
683 auth: ConnectAuth::None,
684 request_channel_capacity: 10,
685 max_request_batch: 0,
686 read_batch_size: 0,
687 pending_throttle: Duration::from_micros(0),
688 last_will: None,
689 connect_timeout: Duration::from_secs(5),
690 default_max_incoming_size: 10 * 1024,
691 incoming_packet_size_limit: IncomingPacketSizeLimit::Default,
692 connect_properties: None,
693 auto_topic_aliases: false,
694 topic_alias_policy: TopicAliasPolicy::default(),
695 manual_acks: false,
696 network_options: NetworkOptions::new(),
697 #[cfg(feature = "proxy")]
698 proxy: None,
699 outgoing_inflight_upper_limit: None,
700 #[cfg(feature = "websocket")]
701 request_modifier: None,
702 #[cfg(feature = "websocket")]
703 fallible_request_modifier: None,
704 socket_connector: None,
705 authenticator: None,
706 }
707 }
708
709 #[must_use]
719 pub fn builder<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> MqttOptionsBuilder {
720 MqttOptionsBuilder::new(id, broker)
721 }
722
723 #[cfg(feature = "url")]
724 pub fn parse_url<S: Into<String>>(url: S) -> Result<Self, OptionError> {
758 let url = url::Url::parse(&url.into())?;
759 let options = Self::try_from(url)?;
760
761 Ok(options)
762 }
763
764 pub const fn broker(&self) -> &Broker {
766 &self.broker
767 }
768
769 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
770 self.last_will = Some(will);
771 self
772 }
773
774 pub fn last_will(&self) -> Option<LastWill> {
775 self.last_will.clone()
776 }
777
778 #[cfg(feature = "websocket")]
782 pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
783 where
784 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
785 O: IntoFuture<Output = http::Request<()>> + 'static,
786 O::IntoFuture: Send,
787 {
788 self.request_modifier = Some(Arc::new(move |request| {
789 let request_modifier = request_modifier(request).into_future();
790 Box::pin(request_modifier)
791 }));
792 self.fallible_request_modifier = None;
793 self
794 }
795
796 #[cfg(feature = "websocket")]
802 pub fn set_fallible_request_modifier<F, O, E>(&mut self, request_modifier: F) -> &mut Self
803 where
804 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
805 O: IntoFuture<Output = Result<http::Request<()>, E>> + 'static,
806 O::IntoFuture: Send,
807 E: std::error::Error + Send + Sync + 'static,
808 {
809 self.fallible_request_modifier = Some(Arc::new(move |request| {
810 let request_modifier = request_modifier(request).into_future();
811 Box::pin(async move {
812 request_modifier
813 .await
814 .map_err(|error| Box::new(error) as RequestModifierError)
815 })
816 }));
817 self.request_modifier = None;
818 self
819 }
820
821 #[cfg(feature = "websocket")]
822 pub fn request_modifier(&self) -> Option<RequestModifierFn> {
823 self.request_modifier.clone()
824 }
825
826 #[cfg(feature = "websocket")]
827 pub(crate) fn fallible_request_modifier(&self) -> Option<FallibleRequestModifierFn> {
828 self.fallible_request_modifier.clone()
829 }
830
831 #[cfg(not(feature = "websocket"))]
853 pub fn set_socket_connector<F, Fut, S>(&mut self, f: F) -> &mut Self
854 where
855 F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
856 Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
857 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
858 {
859 self.socket_connector = Some(Arc::new(move |host, network_options| {
860 let stream_future = f(host, network_options);
861 let future = async move {
862 let stream = stream_future.await?;
863 Ok(Box::new(stream) as Box<dyn crate::framed::AsyncReadWrite>)
864 };
865 Box::pin(future)
866 }));
867 self
868 }
869
870 #[cfg(feature = "websocket")]
892 pub fn set_socket_connector<F, Fut, S>(&mut self, f: F) -> &mut Self
893 where
894 F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
895 Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
896 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
897 {
898 self.socket_connector = Some(Arc::new(move |host, network_options| {
899 let stream_future = f(host, network_options);
900 let future = async move {
901 let stream = stream_future.await?;
902 Ok(Box::new(stream) as Box<dyn crate::framed::AsyncReadWrite>)
903 };
904 Box::pin(future)
905 }));
906 self
907 }
908
909 pub fn has_socket_connector(&self) -> bool {
911 self.socket_connector.is_some()
912 }
913
914 pub fn set_client_id(&mut self, client_id: String) -> &mut Self {
915 self.client_id = client_id;
916 self
917 }
918
919 #[cfg(not(any(feature = "use-rustls-no-provider", feature = "use-native-tls")))]
920 pub const fn set_transport(&mut self, transport: Transport) -> &mut Self {
921 self.transport = transport;
922 self
923 }
924
925 #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
926 pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
927 self.transport = transport;
928 self
929 }
930
931 pub fn transport(&self) -> Transport {
933 self.transport.clone()
934 }
935
936 pub fn set_keep_alive(&mut self, seconds: u16) -> &mut Self {
940 self.keep_alive = Duration::from_secs(u64::from(seconds));
941 self
942 }
943
944 pub const fn keep_alive(&self) -> Duration {
946 self.keep_alive
947 }
948
949 pub fn client_id(&self) -> String {
951 self.client_id.clone()
952 }
953
954 pub const fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
961 self.clean_start = clean_start;
962 self
963 }
964
965 pub const fn clean_start(&self) -> bool {
967 self.clean_start
968 }
969
970 pub fn set_auth(&mut self, auth: ConnectAuth) -> &mut Self {
983 self.auth = auth;
984 self
985 }
986
987 pub fn clear_auth(&mut self) -> &mut Self {
989 self.auth = ConnectAuth::None;
990 self
991 }
992
993 pub fn set_username<U: Into<String>>(&mut self, username: U) -> &mut Self {
1009 self.auth = ConnectAuth::Username {
1010 username: username.into(),
1011 };
1012 self
1013 }
1014
1015 pub fn set_password<P: Into<Bytes>>(&mut self, password: P) -> &mut Self {
1032 self.auth = ConnectAuth::Password {
1033 password: password.into(),
1034 };
1035 self
1036 }
1037
1038 pub fn set_credentials<U: Into<String>, P: Into<Bytes>>(
1056 &mut self,
1057 username: U,
1058 password: P,
1059 ) -> &mut Self {
1060 self.auth = ConnectAuth::UsernamePassword {
1061 username: username.into(),
1062 password: password.into(),
1063 };
1064 self
1065 }
1066
1067 pub const fn auth(&self) -> &ConnectAuth {
1081 &self.auth
1082 }
1083
1084 pub const fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
1086 self.request_channel_capacity = capacity;
1087 self
1088 }
1089
1090 pub const fn request_channel_capacity(&self) -> usize {
1092 self.request_channel_capacity
1093 }
1094
1095 pub const fn set_max_request_batch(&mut self, max: usize) -> &mut Self {
1099 self.max_request_batch = max;
1100 self
1101 }
1102
1103 pub const fn max_request_batch(&self) -> usize {
1105 self.max_request_batch
1106 }
1107
1108 pub const fn set_read_batch_size(&mut self, size: usize) -> &mut Self {
1112 self.read_batch_size = size;
1113 self
1114 }
1115
1116 pub const fn read_batch_size(&self) -> usize {
1120 self.read_batch_size
1121 }
1122
1123 pub const fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
1125 self.pending_throttle = duration;
1126 self
1127 }
1128
1129 pub const fn pending_throttle(&self) -> Duration {
1131 self.pending_throttle
1132 }
1133
1134 pub const fn set_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
1136 self.connect_timeout = timeout;
1137 self
1138 }
1139
1140 pub const fn connect_timeout(&self) -> Duration {
1142 self.connect_timeout
1143 }
1144
1145 pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
1147 self.incoming_packet_size_limit = properties.max_packet_size.map_or(
1148 IncomingPacketSizeLimit::Default,
1149 IncomingPacketSizeLimit::Bytes,
1150 );
1151 self.connect_properties = Some(properties);
1152 self
1153 }
1154
1155 pub fn connect_properties(&self) -> Option<ConnectProperties> {
1157 self.connect_properties.clone()
1158 }
1159
1160 pub fn set_session_expiry_interval(&mut self, interval: Option<u32>) -> &mut Self {
1162 if let Some(conn_props) = &mut self.connect_properties {
1163 conn_props.session_expiry_interval = interval;
1164 self
1165 } else {
1166 let mut conn_props = ConnectProperties::new();
1167 conn_props.session_expiry_interval = interval;
1168 self.set_connect_properties(conn_props)
1169 }
1170 }
1171
1172 pub const fn session_expiry_interval(&self) -> Option<u32> {
1174 if let Some(conn_props) = &self.connect_properties {
1175 conn_props.session_expiry_interval
1176 } else {
1177 None
1178 }
1179 }
1180
1181 pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
1183 if let Some(conn_props) = &mut self.connect_properties {
1184 conn_props.receive_maximum = recv_max;
1185 self
1186 } else {
1187 let mut conn_props = ConnectProperties::new();
1188 conn_props.receive_maximum = recv_max;
1189 self.set_connect_properties(conn_props)
1190 }
1191 }
1192
1193 pub const fn receive_maximum(&self) -> Option<u16> {
1195 if let Some(conn_props) = &self.connect_properties {
1196 conn_props.receive_maximum
1197 } else {
1198 None
1199 }
1200 }
1201
1202 pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
1204 self.incoming_packet_size_limit = max_size.map_or(
1205 IncomingPacketSizeLimit::Default,
1206 IncomingPacketSizeLimit::Bytes,
1207 );
1208
1209 if let Some(conn_props) = &mut self.connect_properties {
1210 conn_props.max_packet_size = max_size;
1211 self
1212 } else {
1213 let mut conn_props = ConnectProperties::new();
1214 conn_props.max_packet_size = max_size;
1215 self.set_connect_properties(conn_props)
1216 }
1217 }
1218
1219 pub const fn max_packet_size(&self) -> Option<u32> {
1221 if let Some(conn_props) = &self.connect_properties {
1222 conn_props.max_packet_size
1223 } else {
1224 None
1225 }
1226 }
1227
1228 pub fn set_incoming_packet_size_limit(&mut self, limit: IncomingPacketSizeLimit) -> &mut Self {
1235 self.incoming_packet_size_limit = limit;
1236
1237 if let Some(conn_props) = &mut self.connect_properties {
1238 conn_props.max_packet_size = match limit {
1239 IncomingPacketSizeLimit::Bytes(max_size) => Some(max_size),
1240 IncomingPacketSizeLimit::Default | IncomingPacketSizeLimit::Unlimited => None,
1241 };
1242 return self;
1243 }
1244
1245 if let IncomingPacketSizeLimit::Bytes(max_size) = limit {
1246 let mut conn_props = ConnectProperties::new();
1247 conn_props.max_packet_size = Some(max_size);
1248 self.set_connect_properties(conn_props)
1249 } else {
1250 self
1251 }
1252 }
1253
1254 pub fn set_unlimited_incoming_packet_size(&mut self) -> &mut Self {
1256 self.set_incoming_packet_size_limit(IncomingPacketSizeLimit::Unlimited)
1257 }
1258
1259 pub const fn incoming_packet_size_limit(&self) -> IncomingPacketSizeLimit {
1261 self.incoming_packet_size_limit
1262 }
1263
1264 pub(crate) const fn max_incoming_packet_size(&self) -> Option<u32> {
1265 match self.incoming_packet_size_limit {
1266 IncomingPacketSizeLimit::Default => Some(self.default_max_incoming_size),
1267 IncomingPacketSizeLimit::Unlimited => None,
1268 IncomingPacketSizeLimit::Bytes(max_size) => Some(max_size),
1269 }
1270 }
1271
1272 pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
1274 if let Some(conn_props) = &mut self.connect_properties {
1275 conn_props.topic_alias_max = topic_alias_max;
1276 self
1277 } else {
1278 let mut conn_props = ConnectProperties::new();
1279 conn_props.topic_alias_max = topic_alias_max;
1280 self.set_connect_properties(conn_props)
1281 }
1282 }
1283
1284 pub const fn topic_alias_max(&self) -> Option<u16> {
1286 if let Some(conn_props) = &self.connect_properties {
1287 conn_props.topic_alias_max
1288 } else {
1289 None
1290 }
1291 }
1292
1293 pub const fn set_auto_topic_aliases(&mut self, auto_topic_aliases: bool) -> &mut Self {
1300 self.auto_topic_aliases = auto_topic_aliases;
1301 self
1302 }
1303
1304 pub const fn auto_topic_aliases(&self) -> bool {
1306 self.auto_topic_aliases
1307 }
1308
1309 pub const fn set_topic_alias_policy(
1314 &mut self,
1315 topic_alias_policy: TopicAliasPolicy,
1316 ) -> &mut Self {
1317 self.topic_alias_policy = topic_alias_policy;
1318 self
1319 }
1320
1321 pub const fn topic_alias_policy(&self) -> TopicAliasPolicy {
1323 self.topic_alias_policy
1324 }
1325
1326 pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
1328 if let Some(conn_props) = &mut self.connect_properties {
1329 conn_props.request_response_info = request_response_info;
1330 self
1331 } else {
1332 let mut conn_props = ConnectProperties::new();
1333 conn_props.request_response_info = request_response_info;
1334 self.set_connect_properties(conn_props)
1335 }
1336 }
1337
1338 pub const fn request_response_info(&self) -> Option<u8> {
1340 if let Some(conn_props) = &self.connect_properties {
1341 conn_props.request_response_info
1342 } else {
1343 None
1344 }
1345 }
1346
1347 pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
1349 if let Some(conn_props) = &mut self.connect_properties {
1350 conn_props.request_problem_info = request_problem_info;
1351 self
1352 } else {
1353 let mut conn_props = ConnectProperties::new();
1354 conn_props.request_problem_info = request_problem_info;
1355 self.set_connect_properties(conn_props)
1356 }
1357 }
1358
1359 pub const fn request_problem_info(&self) -> Option<u8> {
1361 if let Some(conn_props) = &self.connect_properties {
1362 conn_props.request_problem_info
1363 } else {
1364 None
1365 }
1366 }
1367
1368 pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
1370 if let Some(conn_props) = &mut self.connect_properties {
1371 conn_props.user_properties = user_properties;
1372 self
1373 } else {
1374 let mut conn_props = ConnectProperties::new();
1375 conn_props.user_properties = user_properties;
1376 self.set_connect_properties(conn_props)
1377 }
1378 }
1379
1380 pub fn user_properties(&self) -> Vec<(String, String)> {
1382 self.connect_properties
1383 .as_ref()
1384 .map_or_else(Vec::new, |conn_props| conn_props.user_properties.clone())
1385 }
1386
1387 pub fn set_authentication_method(
1389 &mut self,
1390 authentication_method: Option<String>,
1391 ) -> &mut Self {
1392 if let Some(conn_props) = &mut self.connect_properties {
1393 conn_props.authentication_method = authentication_method;
1394 self
1395 } else {
1396 let mut conn_props = ConnectProperties::new();
1397 conn_props.authentication_method = authentication_method;
1398 self.set_connect_properties(conn_props)
1399 }
1400 }
1401
1402 pub fn authentication_method(&self) -> Option<String> {
1404 self.connect_properties
1405 .as_ref()
1406 .and_then(|conn_props| conn_props.authentication_method.clone())
1407 }
1408
1409 pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
1411 if let Some(conn_props) = &mut self.connect_properties {
1412 conn_props.authentication_data = authentication_data;
1413 self
1414 } else {
1415 let mut conn_props = ConnectProperties::new();
1416 conn_props.authentication_data = authentication_data;
1417 self.set_connect_properties(conn_props)
1418 }
1419 }
1420
1421 pub fn authentication_data(&self) -> Option<Bytes> {
1423 self.connect_properties
1424 .as_ref()
1425 .map_or_else(|| None, |conn_props| conn_props.authentication_data.clone())
1426 }
1427
1428 pub const fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
1436 self.manual_acks = manual_acks;
1437 self
1438 }
1439
1440 pub const fn manual_acks(&self) -> bool {
1442 self.manual_acks
1443 }
1444
1445 pub fn network_options(&self) -> NetworkOptions {
1446 self.network_options.clone()
1447 }
1448
1449 pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
1450 self.network_options = network_options;
1451 self
1452 }
1453
1454 #[cfg(feature = "proxy")]
1455 pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
1456 self.proxy = Some(proxy);
1457 self
1458 }
1459
1460 #[cfg(feature = "proxy")]
1461 pub fn proxy(&self) -> Option<Proxy> {
1462 self.proxy.clone()
1463 }
1464
1465 pub(crate) fn effective_socket_connector(&self) -> SocketConnector {
1466 self.socket_connector
1467 .clone()
1468 .unwrap_or_else(default_socket_connector)
1469 }
1470
1471 pub(crate) async fn socket_connect(
1472 &self,
1473 host: String,
1474 network_options: NetworkOptions,
1475 ) -> std::io::Result<Box<dyn crate::framed::AsyncReadWrite>> {
1476 let connector = self.effective_socket_connector();
1477 connector(host, network_options).await
1478 }
1479
1480 pub const fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
1483 self.outgoing_inflight_upper_limit = Some(limit);
1484 self
1485 }
1486
1487 pub const fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
1490 self.outgoing_inflight_upper_limit
1491 }
1492
1493 pub fn set_authenticator(&mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> &mut Self {
1494 self.authenticator = Some(authenticator);
1495 self
1496 }
1497
1498 pub fn authenticator(&self) -> Option<Arc<Mutex<dyn Authenticator>>> {
1499 self.authenticator.as_ref()?;
1500
1501 self.authenticator.clone()
1502 }
1503
1504 pub fn set_auth_manager(&mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> &mut Self {
1505 self.set_authenticator(authenticator)
1506 }
1507
1508 pub fn auth_manager(&self) -> Option<Arc<Mutex<dyn Authenticator>>> {
1509 self.authenticator()
1510 }
1511}
1512
1513pub struct MqttOptionsBuilder {
1515 options: MqttOptions,
1516}
1517
1518impl MqttOptionsBuilder {
1519 #[must_use]
1521 pub fn new<S: Into<String>, B: Into<Broker>>(id: S, broker: B) -> Self {
1522 Self {
1523 options: MqttOptions::new(id, broker),
1524 }
1525 }
1526
1527 #[must_use]
1529 pub fn build(self) -> MqttOptions {
1530 self.options
1531 }
1532
1533 #[must_use]
1535 pub fn last_will(mut self, will: LastWill) -> Self {
1536 self.options.set_last_will(will);
1537 self
1538 }
1539
1540 #[cfg(feature = "websocket")]
1542 #[must_use]
1543 pub fn request_modifier<F, O>(mut self, request_modifier: F) -> Self
1544 where
1545 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
1546 O: IntoFuture<Output = http::Request<()>> + 'static,
1547 O::IntoFuture: Send,
1548 {
1549 self.options.set_request_modifier(request_modifier);
1550 self
1551 }
1552
1553 #[cfg(feature = "websocket")]
1555 #[must_use]
1556 pub fn fallible_request_modifier<F, O, E>(mut self, request_modifier: F) -> Self
1557 where
1558 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
1559 O: IntoFuture<Output = Result<http::Request<()>, E>> + 'static,
1560 O::IntoFuture: Send,
1561 E: std::error::Error + Send + Sync + 'static,
1562 {
1563 self.options.set_fallible_request_modifier(request_modifier);
1564 self
1565 }
1566
1567 #[cfg(not(feature = "websocket"))]
1569 #[must_use]
1570 pub fn socket_connector<F, Fut, S>(mut self, f: F) -> Self
1571 where
1572 F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
1573 Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
1574 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
1575 {
1576 self.options.set_socket_connector(f);
1577 self
1578 }
1579
1580 #[cfg(feature = "websocket")]
1582 #[must_use]
1583 pub fn socket_connector<F, Fut, S>(mut self, f: F) -> Self
1584 where
1585 F: Fn(String, NetworkOptions) -> Fut + Send + Sync + 'static,
1586 Fut: std::future::Future<Output = Result<S, std::io::Error>> + Send + 'static,
1587 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
1588 {
1589 self.options.set_socket_connector(f);
1590 self
1591 }
1592
1593 #[must_use]
1595 pub fn client_id(mut self, client_id: String) -> Self {
1596 self.options.set_client_id(client_id);
1597 self
1598 }
1599
1600 #[cfg(not(any(feature = "use-rustls-no-provider", feature = "use-native-tls")))]
1602 #[must_use]
1603 pub const fn transport(mut self, transport: Transport) -> Self {
1604 self.options.set_transport(transport);
1605 self
1606 }
1607
1608 #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
1610 #[must_use]
1611 pub fn transport(mut self, transport: Transport) -> Self {
1612 self.options.set_transport(transport);
1613 self
1614 }
1615
1616 #[must_use]
1618 pub fn keep_alive(mut self, seconds: u16) -> Self {
1619 self.options.set_keep_alive(seconds);
1620 self
1621 }
1622
1623 #[must_use]
1625 pub const fn clean_start(mut self, clean_start: bool) -> Self {
1626 self.options.set_clean_start(clean_start);
1627 self
1628 }
1629
1630 #[must_use]
1632 pub fn auth(mut self, auth: ConnectAuth) -> Self {
1633 self.options.set_auth(auth);
1634 self
1635 }
1636
1637 #[must_use]
1639 pub fn clear_auth(mut self) -> Self {
1640 self.options.clear_auth();
1641 self
1642 }
1643
1644 #[must_use]
1646 pub fn username<U: Into<String>>(mut self, username: U) -> Self {
1647 self.options.set_username(username);
1648 self
1649 }
1650
1651 #[must_use]
1653 pub fn password<P: Into<Bytes>>(mut self, password: P) -> Self {
1654 self.options.set_password(password);
1655 self
1656 }
1657
1658 #[must_use]
1660 pub fn credentials<U: Into<String>, P: Into<Bytes>>(
1661 mut self,
1662 username: U,
1663 password: P,
1664 ) -> Self {
1665 self.options.set_credentials(username, password);
1666 self
1667 }
1668
1669 #[must_use]
1671 pub const fn request_channel_capacity(mut self, capacity: usize) -> Self {
1672 self.options.set_request_channel_capacity(capacity);
1673 self
1674 }
1675
1676 #[must_use]
1678 pub const fn max_request_batch(mut self, max: usize) -> Self {
1679 self.options.set_max_request_batch(max);
1680 self
1681 }
1682
1683 #[must_use]
1685 pub const fn read_batch_size(mut self, size: usize) -> Self {
1686 self.options.set_read_batch_size(size);
1687 self
1688 }
1689
1690 #[must_use]
1692 pub const fn pending_throttle(mut self, duration: Duration) -> Self {
1693 self.options.set_pending_throttle(duration);
1694 self
1695 }
1696
1697 #[must_use]
1699 pub const fn connect_timeout(mut self, timeout: Duration) -> Self {
1700 self.options.set_connect_timeout(timeout);
1701 self
1702 }
1703
1704 #[must_use]
1706 pub fn connect_properties(mut self, properties: ConnectProperties) -> Self {
1707 self.options.set_connect_properties(properties);
1708 self
1709 }
1710
1711 #[must_use]
1713 pub fn session_expiry_interval(mut self, interval: Option<u32>) -> Self {
1714 self.options.set_session_expiry_interval(interval);
1715 self
1716 }
1717
1718 #[must_use]
1720 pub fn receive_maximum(mut self, recv_max: Option<u16>) -> Self {
1721 self.options.set_receive_maximum(recv_max);
1722 self
1723 }
1724
1725 #[must_use]
1727 pub fn max_packet_size(mut self, max_size: Option<u32>) -> Self {
1728 self.options.set_max_packet_size(max_size);
1729 self
1730 }
1731
1732 #[must_use]
1734 pub fn incoming_packet_size_limit(mut self, limit: IncomingPacketSizeLimit) -> Self {
1735 self.options.set_incoming_packet_size_limit(limit);
1736 self
1737 }
1738
1739 #[must_use]
1741 pub fn unlimited_incoming_packet_size(mut self) -> Self {
1742 self.options.set_unlimited_incoming_packet_size();
1743 self
1744 }
1745
1746 #[must_use]
1748 pub fn topic_alias_max(mut self, topic_alias_max: Option<u16>) -> Self {
1749 self.options.set_topic_alias_max(topic_alias_max);
1750 self
1751 }
1752
1753 #[must_use]
1755 pub const fn auto_topic_aliases(mut self, auto_topic_aliases: bool) -> Self {
1756 self.options.set_auto_topic_aliases(auto_topic_aliases);
1757 self
1758 }
1759
1760 #[must_use]
1762 pub const fn topic_alias_policy(mut self, topic_alias_policy: TopicAliasPolicy) -> Self {
1763 self.options.set_topic_alias_policy(topic_alias_policy);
1764 self
1765 }
1766
1767 #[must_use]
1769 pub fn request_response_info(mut self, request_response_info: Option<u8>) -> Self {
1770 self.options
1771 .set_request_response_info(request_response_info);
1772 self
1773 }
1774
1775 #[must_use]
1777 pub fn request_problem_info(mut self, request_problem_info: Option<u8>) -> Self {
1778 self.options.set_request_problem_info(request_problem_info);
1779 self
1780 }
1781
1782 #[must_use]
1784 pub fn user_properties(mut self, user_properties: Vec<(String, String)>) -> Self {
1785 self.options.set_user_properties(user_properties);
1786 self
1787 }
1788
1789 #[must_use]
1791 pub fn authentication_method(mut self, authentication_method: Option<String>) -> Self {
1792 self.options
1793 .set_authentication_method(authentication_method);
1794 self
1795 }
1796
1797 #[must_use]
1799 pub fn authentication_data(mut self, authentication_data: Option<Bytes>) -> Self {
1800 self.options.set_authentication_data(authentication_data);
1801 self
1802 }
1803
1804 #[must_use]
1806 pub const fn manual_acks(mut self, manual_acks: bool) -> Self {
1807 self.options.set_manual_acks(manual_acks);
1808 self
1809 }
1810
1811 #[must_use]
1813 pub fn network_options(mut self, network_options: NetworkOptions) -> Self {
1814 self.options.set_network_options(network_options);
1815 self
1816 }
1817
1818 #[cfg(feature = "proxy")]
1820 #[must_use]
1821 pub fn proxy(mut self, proxy: Proxy) -> Self {
1822 self.options.set_proxy(proxy);
1823 self
1824 }
1825
1826 #[must_use]
1828 pub const fn outgoing_inflight_upper_limit(mut self, limit: u16) -> Self {
1829 self.options.set_outgoing_inflight_upper_limit(limit);
1830 self
1831 }
1832
1833 #[must_use]
1835 pub fn authenticator(mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> Self {
1836 self.options.set_authenticator(authenticator);
1837 self
1838 }
1839
1840 #[must_use]
1842 pub fn auth_manager(mut self, authenticator: Arc<Mutex<dyn Authenticator>>) -> Self {
1843 self.options.set_authenticator(authenticator);
1844 self
1845 }
1846}
1847
1848#[derive(Debug, PartialEq, Eq, thiserror::Error)]
1849pub enum OptionError {
1850 #[error("Unsupported URL scheme.")]
1851 Scheme,
1852
1853 #[error(
1854 "Secure MQTT URL schemes require explicit TLS transport configuration via MqttOptions::set_transport(...)."
1855 )]
1856 SecureUrlRequiresExplicitTransport,
1857
1858 #[error("Missing client ID.")]
1859 ClientId,
1860
1861 #[error("Invalid Unix socket path.")]
1862 UnixSocketPath,
1863
1864 #[cfg(feature = "websocket")]
1865 #[error("Invalid websocket url.")]
1866 WebsocketUrl,
1867
1868 #[cfg(feature = "websocket")]
1869 #[error(
1870 "Secure websocket URLs require Broker::websocket(\"ws://...\") plus MqttOptions::set_transport(Transport::wss_with_config(...))."
1871 )]
1872 WssRequiresExplicitTransport,
1873
1874 #[error("Invalid keep-alive value.")]
1875 KeepAlive,
1876
1877 #[error("Invalid clean-start value.")]
1878 CleanStart,
1879
1880 #[error("Invalid max-incoming-packet-size value.")]
1881 MaxIncomingPacketSize,
1882
1883 #[error("Invalid max-outgoing-packet-size value.")]
1884 MaxOutgoingPacketSize,
1885
1886 #[error("Invalid request-channel-capacity value.")]
1887 RequestChannelCapacity,
1888
1889 #[error("Invalid max-request-batch value.")]
1890 MaxRequestBatch,
1891
1892 #[error("Invalid read-batch-size value.")]
1893 ReadBatchSize,
1894
1895 #[error("Invalid pending-throttle value.")]
1896 PendingThrottle,
1897
1898 #[error("Invalid inflight value.")]
1899 Inflight,
1900
1901 #[error("Invalid conn-timeout value.")]
1902 ConnTimeout,
1903
1904 #[error("Unknown option: {0}")]
1905 Unknown(String),
1906
1907 #[cfg(feature = "url")]
1908 #[error("Couldn't parse option from url: {0}")]
1909 Parse(#[from] url::ParseError),
1910}
1911
1912#[cfg(feature = "url")]
1913impl std::convert::TryFrom<url::Url> for MqttOptions {
1914 type Error = OptionError;
1915
1916 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
1917 use std::collections::HashMap;
1918
1919 let broker = match url.scheme() {
1920 "mqtts" | "ssl" => return Err(OptionError::SecureUrlRequiresExplicitTransport),
1921 "mqtt" | "tcp" => Broker::tcp(
1922 url.host_str().unwrap_or_default(),
1923 url.port().unwrap_or(DEFAULT_BROKER_PORT),
1924 ),
1925 #[cfg(unix)]
1926 "unix" => Broker::unix(parse_unix_socket_path(&url)?),
1927 #[cfg(feature = "websocket")]
1928 "ws" => Broker::websocket(url.as_str().to_owned())?,
1929 #[cfg(feature = "websocket")]
1930 "wss" => return Err(OptionError::WssRequiresExplicitTransport),
1931 _ => return Err(OptionError::Scheme),
1932 };
1933
1934 let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
1935
1936 let id = queries
1937 .remove("client_id")
1938 .ok_or(OptionError::ClientId)?
1939 .into_owned();
1940
1941 let mut options = Self::new(id, broker);
1942 let mut connect_props = ConnectProperties::new();
1943
1944 if let Some(keep_alive) = queries
1945 .remove("keep_alive_secs")
1946 .map(|v| v.parse::<u16>().map_err(|_| OptionError::KeepAlive))
1947 .transpose()?
1948 {
1949 options.set_keep_alive(keep_alive);
1950 }
1951
1952 if let Some(clean_start) = queries
1953 .remove("clean_start")
1954 .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
1955 .transpose()?
1956 {
1957 options.set_clean_start(clean_start);
1958 }
1959
1960 let username = url.username();
1961 if let Some(password) = url.password() {
1962 options.set_credentials(username, password.to_owned());
1963 } else if !username.is_empty() {
1964 options.set_username(username);
1965 }
1966
1967 connect_props.max_packet_size = queries
1968 .remove("max_incoming_packet_size_bytes")
1969 .map(|v| {
1970 v.parse::<u32>()
1971 .map_err(|_| OptionError::MaxIncomingPacketSize)
1972 })
1973 .transpose()?;
1974
1975 if let Some(request_channel_capacity) = queries
1976 .remove("request_channel_capacity_num")
1977 .map(|v| {
1978 v.parse::<usize>()
1979 .map_err(|_| OptionError::RequestChannelCapacity)
1980 })
1981 .transpose()?
1982 {
1983 options.request_channel_capacity = request_channel_capacity;
1984 }
1985
1986 if let Some(max_request_batch) = queries
1987 .remove("max_request_batch_num")
1988 .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
1989 .transpose()?
1990 {
1991 options.max_request_batch = max_request_batch;
1992 }
1993
1994 if let Some(read_batch_size) = queries
1995 .remove("read_batch_size_num")
1996 .map(|v| v.parse::<usize>().map_err(|_| OptionError::ReadBatchSize))
1997 .transpose()?
1998 {
1999 options.read_batch_size = read_batch_size;
2000 }
2001
2002 if let Some(pending_throttle) = queries
2003 .remove("pending_throttle_usecs")
2004 .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
2005 .transpose()?
2006 {
2007 options.set_pending_throttle(Duration::from_micros(pending_throttle));
2008 }
2009
2010 connect_props.receive_maximum = queries
2011 .remove("inflight_num")
2012 .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
2013 .transpose()?;
2014
2015 if let Some(conn_timeout) = queries
2016 .remove("conn_timeout_secs")
2017 .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
2018 .transpose()?
2019 {
2020 options.set_connect_timeout(Duration::from_secs(conn_timeout));
2021 }
2022
2023 if let Some((opt, _)) = queries.into_iter().next() {
2024 return Err(OptionError::Unknown(opt.into_owned()));
2025 }
2026
2027 options.set_connect_properties(connect_props);
2028 Ok(options)
2029 }
2030}
2031
2032#[cfg(all(feature = "url", unix))]
2033fn parse_unix_socket_path(url: &url::Url) -> Result<PathBuf, OptionError> {
2034 if url.host_str().is_some() {
2035 return Err(OptionError::UnixSocketPath);
2036 }
2037
2038 let path = percent_decode_str(url.path()).collect::<Vec<u8>>();
2039 if path.is_empty() || path == b"/" {
2040 return Err(OptionError::UnixSocketPath);
2041 }
2042
2043 Ok(PathBuf::from(OsString::from_vec(path)))
2044}
2045
2046impl Debug for MqttOptions {
2049 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2050 f.debug_struct("MqttOptions")
2051 .field("broker", &self.broker)
2052 .field("keep_alive", &self.keep_alive)
2053 .field("clean_start", &self.clean_start)
2054 .field("client_id", &self.client_id)
2055 .field("auth", &self.auth)
2056 .field("request_channel_capacity", &self.request_channel_capacity)
2057 .field("max_request_batch", &self.max_request_batch)
2058 .field("read_batch_size", &self.read_batch_size)
2059 .field("pending_throttle", &self.pending_throttle)
2060 .field("last_will", &self.last_will)
2061 .field("connect_timeout", &self.connect_timeout)
2062 .field("auto_topic_aliases", &self.auto_topic_aliases)
2063 .field("topic_alias_policy", &self.topic_alias_policy)
2064 .field("manual_acks", &self.manual_acks)
2065 .field("connect properties", &self.connect_properties)
2066 .finish_non_exhaustive()
2067 }
2068}
2069
2070#[cfg(test)]
2071mod test {
2072 use super::*;
2073 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2074 use std::sync::Arc;
2075 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2076 use tokio::net::{TcpListener, TcpSocket};
2077 use tokio::runtime::Builder;
2078 use tokio::sync::Notify;
2079
2080 fn runtime() -> tokio::runtime::Runtime {
2081 Builder::new_current_thread().enable_all().build().unwrap()
2082 }
2083
2084 #[test]
2085 fn staggered_attempts_allow_later_success_to_win() {
2086 runtime().block_on(async {
2087 let started = Arc::new(AtomicUsize::new(0));
2088 let started_for_connect = Arc::clone(&started);
2089 let begin = std::time::Instant::now();
2090
2091 let result = first_success_with_stagger(
2092 [0_u8, 1_u8],
2093 std::time::Duration::from_millis(10),
2094 move |attempt| {
2095 let started = Arc::clone(&started_for_connect);
2096 async move {
2097 started.fetch_add(1, Ordering::SeqCst);
2098 if attempt == 0 {
2099 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2100 Err(std::io::Error::other("slow failure"))
2101 } else {
2102 Ok(42_u8)
2103 }
2104 }
2105 },
2106 )
2107 .await
2108 .unwrap();
2109
2110 assert_eq!(result, 42);
2111 assert_eq!(started.load(Ordering::SeqCst), 2);
2112 assert!(begin.elapsed() < std::time::Duration::from_millis(150));
2113 });
2114 }
2115
2116 #[test]
2117 fn staggered_connect_returns_invalid_input_for_empty_candidates() {
2118 runtime().block_on(async {
2119 let err = connect_resolved_addrs_staggered(Vec::new(), NetworkOptions::new())
2120 .await
2121 .unwrap_err();
2122
2123 assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
2124 assert_eq!(err.to_string(), "could not resolve to any address");
2125 });
2126 }
2127
2128 #[test]
2129 fn staggered_connect_tries_later_candidates() {
2130 runtime().block_on(async {
2131 let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2132 let good_addr = listener.local_addr().unwrap();
2133
2134 let unused_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2135 let bad_addr = unused_listener.local_addr().unwrap();
2136 drop(unused_listener);
2137
2138 let accept_task = tokio::spawn(async move {
2139 let (_stream, _) = listener.accept().await.unwrap();
2140 });
2141
2142 let stream =
2143 connect_resolved_addrs_staggered(vec![bad_addr, good_addr], NetworkOptions::new())
2144 .await
2145 .unwrap();
2146 assert_eq!(stream.peer_addr().unwrap(), good_addr);
2147
2148 accept_task.await.unwrap();
2149 });
2150 }
2151
2152 #[test]
2153 fn fixed_bind_port_retry_mode_keeps_slow_first_candidate_alive() {
2154 runtime().block_on(async {
2155 let reserved = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2156 let bind_port = reserved.local_addr().unwrap().port();
2157 drop(reserved);
2158
2159 let mut network_options = NetworkOptions::new();
2160 network_options.set_bind_addr(SocketAddr::V4(SocketAddrV4::new(
2161 Ipv4Addr::LOCALHOST,
2162 bind_port,
2163 )));
2164
2165 let first_attempt_started = Arc::new(Notify::new());
2166 let second_attempt_started = Arc::new(AtomicBool::new(false));
2167
2168 let mut connect_task = tokio::spawn({
2169 let first_attempt_started = Arc::clone(&first_attempt_started);
2170 let second_attempt_started = Arc::clone(&second_attempt_started);
2171 let network_options = network_options.clone();
2172 async move {
2173 connect_with_retry_mode_and_delay(
2174 [0_u8, 1_u8],
2175 network_options,
2176 Duration::from_millis(10),
2177 move |attempt, network_options| {
2178 let first_attempt_started = Arc::clone(&first_attempt_started);
2179 let second_attempt_started = Arc::clone(&second_attempt_started);
2180 async move {
2181 if attempt == 0 {
2182 let bind_addr = network_options.bind_addr().unwrap();
2183 let socket = match bind_addr {
2184 SocketAddr::V4(_) => TcpSocket::new_v4()?,
2185 SocketAddr::V6(_) => TcpSocket::new_v6()?,
2186 };
2187 socket.bind(bind_addr)?;
2188 first_attempt_started.notify_one();
2189 std::future::pending::<io::Result<()>>().await
2190 } else {
2191 second_attempt_started.store(true, Ordering::SeqCst);
2192 let _ = network_options;
2193 Ok(())
2194 }
2195 }
2196 },
2197 )
2198 .await
2199 }
2200 });
2201
2202 first_attempt_started.notified().await;
2203
2204 assert!(
2205 tokio::time::timeout(Duration::from_millis(50), &mut connect_task)
2206 .await
2207 .is_err(),
2208 "fixed-port dialing should keep the first slow candidate alive instead of capping it to the stagger delay"
2209 );
2210 assert!(
2211 !second_attempt_started.load(Ordering::SeqCst),
2212 "fixed-port dialing should not start later same-family candidates while the first is still pending"
2213 );
2214 connect_task.abort();
2215 });
2216 }
2217
2218 #[test]
2219 fn fixed_bind_port_resolved_addrs_try_later_candidates() {
2220 runtime().block_on(async {
2221 let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2222 let good_addr = listener.local_addr().unwrap();
2223
2224 let unused_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2225 let bad_addr = unused_listener.local_addr().unwrap();
2226 drop(unused_listener);
2227
2228 let reserved = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2229 let bind_port = reserved.local_addr().unwrap().port();
2230 drop(reserved);
2231
2232 let mut network_options = NetworkOptions::new();
2233 network_options.set_bind_addr(SocketAddr::V4(SocketAddrV4::new(
2234 Ipv4Addr::LOCALHOST,
2235 bind_port,
2236 )));
2237
2238 let accept_task = tokio::spawn(async move {
2239 let (stream, peer_addr) = listener.accept().await.unwrap();
2240 drop(stream);
2241 peer_addr
2242 });
2243
2244 let stream =
2245 connect_resolved_addrs_staggered(vec![bad_addr, good_addr], network_options)
2246 .await
2247 .unwrap();
2248 assert_eq!(stream.peer_addr().unwrap(), good_addr);
2249 drop(stream);
2250
2251 let peer_addr = accept_task.await.unwrap();
2252 assert_eq!(peer_addr.port(), bind_port);
2253 assert!(peer_addr.ip().is_loopback());
2254 });
2255 }
2256
2257 #[test]
2258 fn socket_connect_uses_custom_connector_over_default() {
2259 runtime().block_on(async {
2260 let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2261 let good_addr = listener.local_addr().unwrap();
2262 let used_custom = Arc::new(AtomicUsize::new(0));
2263 let used_custom_for_connector = Arc::clone(&used_custom);
2264
2265 let accept_task = tokio::spawn(async move {
2266 let (_stream, _) = listener.accept().await.unwrap();
2267 });
2268
2269 let mut options = MqttOptions::new("test-client", "localhost");
2270 options.set_socket_connector(move |_host, _network_options| {
2271 let used_custom = Arc::clone(&used_custom_for_connector);
2272 async move {
2273 used_custom.fetch_add(1, Ordering::SeqCst);
2274 TcpStream::connect(good_addr).await
2275 }
2276 });
2277
2278 assert!(options.has_socket_connector());
2279 options
2280 .socket_connect("invalid.invalid:1883".to_owned(), NetworkOptions::new())
2281 .await
2282 .unwrap();
2283
2284 assert_eq!(used_custom.load(Ordering::SeqCst), 1);
2285 accept_task.await.unwrap();
2286 });
2287 }
2288
2289 #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
2290 mod request_modifier_tests {
2291 use super::{Broker, MqttOptions};
2292
2293 #[derive(Debug)]
2294 struct TestError;
2295
2296 impl std::fmt::Display for TestError {
2297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2298 write!(f, "test error")
2299 }
2300 }
2301
2302 impl std::error::Error for TestError {}
2303
2304 #[test]
2305 fn infallible_modifier_is_set() {
2306 let mut options = MqttOptions::new(
2307 "test",
2308 Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2309 );
2310 options.set_request_modifier(|req| async move { req });
2311 assert!(options.request_modifier().is_some());
2312 assert!(options.fallible_request_modifier().is_none());
2313 }
2314
2315 #[test]
2316 fn fallible_modifier_is_set() {
2317 let mut options = MqttOptions::new(
2318 "test",
2319 Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2320 );
2321 options.set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) });
2322 assert!(options.request_modifier().is_none());
2323 assert!(options.fallible_request_modifier().is_some());
2324 }
2325
2326 #[test]
2327 fn last_setter_call_wins() {
2328 let mut options = MqttOptions::new(
2329 "test",
2330 Broker::websocket("ws://localhost:8080").expect("valid websocket broker"),
2331 );
2332
2333 options
2334 .set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) })
2335 .set_request_modifier(|req| async move { req });
2336 assert!(options.request_modifier().is_some());
2337 assert!(options.fallible_request_modifier().is_none());
2338
2339 options
2340 .set_request_modifier(|req| async move { req })
2341 .set_fallible_request_modifier(|req| async move { Ok::<_, TestError>(req) });
2342 assert!(options.request_modifier().is_none());
2343 assert!(options.fallible_request_modifier().is_some());
2344 }
2345 }
2346
2347 #[test]
2348 fn incoming_packet_size_limit_defaults_to_default_policy() {
2349 let mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2350 assert_eq!(
2351 mqtt_opts.incoming_packet_size_limit(),
2352 IncomingPacketSizeLimit::Default
2353 );
2354 assert_eq!(
2355 mqtt_opts.max_incoming_packet_size(),
2356 Some(mqtt_opts.default_max_incoming_size)
2357 );
2358 }
2359
2360 #[test]
2361 fn set_max_packet_size_remains_backward_compatible() {
2362 let mut mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2363
2364 mqtt_opts.set_max_packet_size(Some(2048));
2365 assert_eq!(
2366 mqtt_opts.incoming_packet_size_limit(),
2367 IncomingPacketSizeLimit::Bytes(2048)
2368 );
2369 assert_eq!(mqtt_opts.max_packet_size(), Some(2048));
2370 assert_eq!(mqtt_opts.max_incoming_packet_size(), Some(2048));
2371
2372 mqtt_opts.set_max_packet_size(None);
2373 assert_eq!(
2374 mqtt_opts.incoming_packet_size_limit(),
2375 IncomingPacketSizeLimit::Default
2376 );
2377 assert_eq!(mqtt_opts.max_packet_size(), None);
2378 assert_eq!(
2379 mqtt_opts.max_incoming_packet_size(),
2380 Some(mqtt_opts.default_max_incoming_size)
2381 );
2382 }
2383
2384 #[test]
2385 fn incoming_packet_size_limit_unlimited_disables_local_check() {
2386 let mut mqtt_opts = MqttOptions::new("client", "127.0.0.1");
2387 mqtt_opts.set_unlimited_incoming_packet_size();
2388
2389 assert_eq!(
2390 mqtt_opts.incoming_packet_size_limit(),
2391 IncomingPacketSizeLimit::Unlimited
2392 );
2393 assert_eq!(mqtt_opts.max_incoming_packet_size(), None);
2394 assert_eq!(mqtt_opts.max_packet_size(), None);
2395 assert!(mqtt_opts.connect_properties.is_none());
2396 }
2397
2398 #[test]
2399 #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
2400 fn websocket_transport_can_be_explicitly_upgraded_to_wss() {
2401 use crate::{TlsConfiguration, Transport};
2402 let broker = Broker::websocket(
2403 "ws://a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host",
2404 )
2405 .expect("valid websocket broker");
2406 let mut mqttoptions = MqttOptions::new("client_a", broker);
2407
2408 assert!(matches!(mqttoptions.transport(), Transport::Ws));
2409 mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
2410
2411 if let Transport::Wss(TlsConfiguration::Simple {
2412 ca,
2413 client_auth,
2414 alpn,
2415 }) = mqttoptions.transport()
2416 {
2417 assert_eq!(ca.as_slice(), b"Test CA");
2418 assert_eq!(client_auth, None);
2419 assert_eq!(alpn, None);
2420 } else {
2421 panic!("Unexpected transport!");
2422 }
2423
2424 assert_eq!(
2425 mqttoptions.broker().websocket_url(),
2426 Some(
2427 "ws://a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"
2428 )
2429 );
2430 }
2431
2432 #[test]
2433 #[cfg(feature = "websocket")]
2434 fn wss_websocket_urls_require_explicit_transport() {
2435 assert_eq!(
2436 Broker::websocket("wss://example.com/mqtt"),
2437 Err(OptionError::WssRequiresExplicitTransport)
2438 );
2439 }
2440
2441 #[test]
2442 #[cfg(all(
2443 feature = "url",
2444 feature = "use-rustls-no-provider",
2445 feature = "websocket"
2446 ))]
2447 fn parse_url_ws_transport_can_be_explicitly_upgraded_to_wss() {
2448 use crate::{TlsConfiguration, Transport};
2449 let mut mqttoptions =
2450 MqttOptions::parse_url("ws://example.com:443/mqtt?client_id=client_a")
2451 .expect("valid websocket options");
2452
2453 assert!(matches!(mqttoptions.transport(), Transport::Ws));
2454 mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
2455
2456 if let Transport::Wss(TlsConfiguration::Simple {
2457 ca,
2458 client_auth,
2459 alpn,
2460 }) = mqttoptions.transport()
2461 {
2462 assert_eq!(ca.as_slice(), b"Test CA");
2463 assert_eq!(client_auth, None);
2464 assert_eq!(alpn, None);
2465 } else {
2466 panic!("Unexpected transport!");
2467 }
2468 }
2469
2470 #[test]
2471 #[cfg(all(feature = "url", feature = "use-rustls-no-provider"))]
2472 fn parse_url_mqtt_transport_can_be_explicitly_upgraded_to_tls() {
2473 use crate::{TlsConfiguration, Transport};
2474 let mut mqttoptions = MqttOptions::parse_url("mqtt://example.com:8883?client_id=client_a")
2475 .expect("valid tls options");
2476
2477 assert!(matches!(mqttoptions.transport(), Transport::Tcp));
2478 mqttoptions.set_transport(Transport::tls(Vec::from("Test CA"), None, None));
2479
2480 if let Transport::Tls(TlsConfiguration::Simple {
2481 ca,
2482 client_auth,
2483 alpn,
2484 }) = mqttoptions.transport()
2485 {
2486 assert_eq!(ca.as_slice(), b"Test CA");
2487 assert_eq!(client_auth, None);
2488 assert_eq!(alpn, None);
2489 } else {
2490 panic!("Unexpected transport!");
2491 }
2492 }
2493
2494 #[test]
2495 #[cfg(feature = "url")]
2496 fn parse_url_rejects_secure_url_schemes() {
2497 assert!(matches!(
2498 MqttOptions::parse_url("mqtts://example.com:8883?client_id=client_a"),
2499 Err(OptionError::SecureUrlRequiresExplicitTransport)
2500 ));
2501 assert!(matches!(
2502 MqttOptions::parse_url("ssl://example.com:8883?client_id=client_a"),
2503 Err(OptionError::SecureUrlRequiresExplicitTransport)
2504 ));
2505
2506 #[cfg(feature = "websocket")]
2507 assert!(matches!(
2508 MqttOptions::parse_url("wss://example.com:443/mqtt?client_id=client_a"),
2509 Err(OptionError::WssRequiresExplicitTransport)
2510 ));
2511 }
2512
2513 #[test]
2514 #[cfg(feature = "url")]
2515 fn from_url() {
2516 fn opt(s: &str) -> Result<MqttOptions, OptionError> {
2517 MqttOptions::parse_url(s)
2518 }
2519 fn ok(s: &str) -> MqttOptions {
2520 opt(s).expect("valid options")
2521 }
2522 fn err(s: &str) -> OptionError {
2523 opt(s).expect_err("invalid options")
2524 }
2525
2526 let v = ok("mqtt://host:42?client_id=foo");
2527 assert_eq!(v.broker().tcp_address(), Some(("host", 42)));
2528 assert_eq!(v.client_id(), "foo".to_owned());
2529
2530 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
2531 assert_eq!(v.keep_alive, Duration::from_secs(5));
2532 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=0");
2533 assert_eq!(v.keep_alive, Duration::from_secs(0));
2534 let v = ok("mqtt://host:42?client_id=foo&read_batch_size_num=32");
2535 assert_eq!(v.read_batch_size(), 32);
2536 let v = ok("mqtt://host:42?client_id=foo&conn_timeout_secs=7");
2537 assert_eq!(v.connect_timeout(), Duration::from_secs(7));
2538 let v = ok("mqtt://user@host:42?client_id=foo");
2539 assert_eq!(
2540 v.auth(),
2541 &ConnectAuth::Username {
2542 username: "user".to_owned(),
2543 }
2544 );
2545 let v = ok("mqtt://user:pw@host:42?client_id=foo");
2546 assert_eq!(
2547 v.auth(),
2548 &ConnectAuth::UsernamePassword {
2549 username: "user".to_owned(),
2550 password: Bytes::from_static(b"pw"),
2551 }
2552 );
2553 let v = ok("mqtt://:pw@host:42?client_id=foo");
2554 assert_eq!(
2555 v.auth(),
2556 &ConnectAuth::UsernamePassword {
2557 username: String::new(),
2558 password: Bytes::from_static(b"pw"),
2559 }
2560 );
2561
2562 assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
2563 assert_eq!(
2564 err("mqtt://host:42?client_id=foo&foo=bar"),
2565 OptionError::Unknown("foo".to_owned())
2566 );
2567 assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
2568 assert_eq!(
2569 err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
2570 OptionError::KeepAlive
2571 );
2572 assert_eq!(
2573 err("mqtt://host:42?client_id=foo&keep_alive_secs=65536"),
2574 OptionError::KeepAlive
2575 );
2576 assert_eq!(
2577 err("mqtt://host:42?client_id=foo&clean_start=foo"),
2578 OptionError::CleanStart
2579 );
2580 assert_eq!(
2581 err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
2582 OptionError::MaxIncomingPacketSize
2583 );
2584 assert_eq!(
2585 err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
2586 OptionError::RequestChannelCapacity
2587 );
2588 assert_eq!(
2589 err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
2590 OptionError::MaxRequestBatch
2591 );
2592 assert_eq!(
2593 err("mqtt://host:42?client_id=foo&read_batch_size_num=foo"),
2594 OptionError::ReadBatchSize
2595 );
2596 assert_eq!(
2597 err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
2598 OptionError::PendingThrottle
2599 );
2600 assert_eq!(
2601 err("mqtt://host:42?client_id=foo&inflight_num=foo"),
2602 OptionError::Inflight
2603 );
2604 assert_eq!(
2605 err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
2606 OptionError::ConnTimeout
2607 );
2608 }
2609
2610 #[test]
2611 #[cfg(unix)]
2612 fn unix_broker_sets_unix_transport_and_preserves_defaults() {
2613 let options = MqttOptions::new("client", Broker::unix("/tmp/mqtt.sock"));
2614 let baseline = MqttOptions::new("client", "127.0.0.1");
2615
2616 assert!(matches!(options.transport(), Transport::Unix));
2617 assert_eq!(
2618 options.broker().unix_path(),
2619 Some(std::path::Path::new("/tmp/mqtt.sock"))
2620 );
2621 assert_eq!(options.keep_alive, baseline.keep_alive);
2622 assert_eq!(options.clean_start, baseline.clean_start);
2623 assert_eq!(options.client_id, baseline.client_id);
2624 assert_eq!(
2625 options.request_channel_capacity,
2626 baseline.request_channel_capacity
2627 );
2628 assert_eq!(options.max_request_batch, baseline.max_request_batch);
2629 assert_eq!(options.read_batch_size, baseline.read_batch_size);
2630 assert_eq!(options.pending_throttle, baseline.pending_throttle);
2631 assert_eq!(options.connect_timeout, baseline.connect_timeout);
2632 assert_eq!(
2633 options.default_max_incoming_size,
2634 baseline.default_max_incoming_size
2635 );
2636 assert_eq!(
2637 options.incoming_packet_size_limit,
2638 baseline.incoming_packet_size_limit
2639 );
2640 assert_eq!(options.manual_acks, baseline.manual_acks);
2641 assert_eq!(
2642 options.outgoing_inflight_upper_limit,
2643 baseline.outgoing_inflight_upper_limit
2644 );
2645 assert!(options.authenticator.is_none());
2646 }
2647
2648 #[test]
2649 #[cfg(all(feature = "url", unix))]
2650 fn from_url_supports_unix_socket_paths() {
2651 let options = MqttOptions::parse_url(
2652 "unix:///tmp/mqtt.sock?client_id=foo&keep_alive_secs=5&read_batch_size_num=32",
2653 )
2654 .expect("valid unix socket options");
2655
2656 assert!(matches!(options.transport(), Transport::Unix));
2657 assert_eq!(
2658 options.broker().unix_path(),
2659 Some(std::path::Path::new("/tmp/mqtt.sock"))
2660 );
2661 assert_eq!(options.client_id(), "foo");
2662 assert_eq!(options.keep_alive, Duration::from_secs(5));
2663 assert_eq!(options.read_batch_size(), 32);
2664 }
2665
2666 #[test]
2667 #[cfg(all(feature = "url", unix))]
2668 fn from_url_decodes_percent_escaped_unix_socket_paths() {
2669 let options =
2670 MqttOptions::parse_url("unix:///tmp/mqtt%20broker.sock?client_id=foo").unwrap();
2671
2672 assert_eq!(
2673 options.broker().unix_path(),
2674 Some(std::path::Path::new("/tmp/mqtt broker.sock"))
2675 );
2676 }
2677
2678 #[test]
2679 #[cfg(all(feature = "url", unix))]
2680 fn from_url_preserves_percent_decoded_unix_socket_bytes() {
2681 use std::os::unix::ffi::OsStrExt;
2682
2683 let options = MqttOptions::parse_url("unix:///tmp/mqtt%FF.sock?client_id=foo").unwrap();
2684
2685 assert_eq!(
2686 options.broker().unix_path().unwrap().as_os_str().as_bytes(),
2687 b"/tmp/mqtt\xff.sock"
2688 );
2689 }
2690
2691 #[test]
2692 #[cfg(all(feature = "url", unix))]
2693 fn from_url_rejects_invalid_unix_socket_paths() {
2694 fn err(s: &str) -> OptionError {
2695 MqttOptions::parse_url(s).expect_err("invalid unix socket url")
2696 }
2697
2698 assert_eq!(err("unix:///tmp/mqtt.sock"), OptionError::ClientId);
2699 assert_eq!(
2700 err("unix://localhost/tmp/mqtt.sock?client_id=foo"),
2701 OptionError::UnixSocketPath
2702 );
2703 assert_eq!(err("unix:///?client_id=foo"), OptionError::UnixSocketPath);
2704 }
2705
2706 #[test]
2707 fn allow_empty_client_id() {
2708 let _mqtt_opts = MqttOptions::new("", "127.0.0.1").set_clean_start(true);
2709 }
2710
2711 #[test]
2712 fn mqtt_options_builder_matches_setter_configuration() {
2713 let will = LastWill::new("hello/world", "good bye", QoS::AtLeastOnce, false, None);
2714 let mut expected = MqttOptions::new("client", ("localhost", 1884));
2715 expected
2716 .set_keep_alive(5)
2717 .set_last_will(will.clone())
2718 .set_clean_start(false)
2719 .set_credentials("user", Bytes::from_static(b"password"))
2720 .set_request_channel_capacity(16)
2721 .set_max_request_batch(8)
2722 .set_read_batch_size(32)
2723 .set_pending_throttle(Duration::from_micros(250))
2724 .set_connect_timeout(Duration::from_secs(7))
2725 .set_session_expiry_interval(Some(120))
2726 .set_receive_maximum(Some(10))
2727 .set_topic_alias_max(Some(4))
2728 .set_request_response_info(Some(1))
2729 .set_request_problem_info(Some(0))
2730 .set_user_properties(vec![("k".to_owned(), "v".to_owned())])
2731 .set_authentication_method(Some("SCRAM-SHA-256".to_owned()))
2732 .set_authentication_data(Some(Bytes::from_static(b"auth")))
2733 .set_auto_topic_aliases(true)
2734 .set_topic_alias_policy(TopicAliasPolicy::Lru)
2735 .set_manual_acks(true)
2736 .set_outgoing_inflight_upper_limit(4);
2737
2738 let actual = MqttOptions::builder("client", ("localhost", 1884))
2739 .keep_alive(5)
2740 .last_will(will)
2741 .clean_start(false)
2742 .credentials("user", Bytes::from_static(b"password"))
2743 .request_channel_capacity(16)
2744 .max_request_batch(8)
2745 .read_batch_size(32)
2746 .pending_throttle(Duration::from_micros(250))
2747 .connect_timeout(Duration::from_secs(7))
2748 .session_expiry_interval(Some(120))
2749 .receive_maximum(Some(10))
2750 .topic_alias_max(Some(4))
2751 .request_response_info(Some(1))
2752 .request_problem_info(Some(0))
2753 .user_properties(vec![("k".to_owned(), "v".to_owned())])
2754 .authentication_method(Some("SCRAM-SHA-256".to_owned()))
2755 .authentication_data(Some(Bytes::from_static(b"auth")))
2756 .auto_topic_aliases(true)
2757 .topic_alias_policy(TopicAliasPolicy::Lru)
2758 .manual_acks(true)
2759 .outgoing_inflight_upper_limit(4)
2760 .build();
2761
2762 assert_eq!(
2763 actual.broker().tcp_address(),
2764 expected.broker().tcp_address()
2765 );
2766 assert_eq!(actual.keep_alive(), expected.keep_alive());
2767 assert_eq!(actual.last_will(), expected.last_will());
2768 assert_eq!(actual.clean_start(), expected.clean_start());
2769 assert_eq!(actual.auth(), expected.auth());
2770 assert_eq!(
2771 actual.request_channel_capacity(),
2772 expected.request_channel_capacity()
2773 );
2774 assert_eq!(actual.max_request_batch(), expected.max_request_batch());
2775 assert_eq!(actual.read_batch_size(), expected.read_batch_size());
2776 assert_eq!(actual.pending_throttle(), expected.pending_throttle());
2777 assert_eq!(actual.connect_timeout(), expected.connect_timeout());
2778 assert_eq!(actual.connect_properties(), expected.connect_properties());
2779 assert_eq!(actual.auto_topic_aliases(), expected.auto_topic_aliases());
2780 assert_eq!(actual.topic_alias_policy(), expected.topic_alias_policy());
2781 assert_eq!(actual.manual_acks(), expected.manual_acks());
2782 assert_eq!(
2783 actual.get_outgoing_inflight_upper_limit(),
2784 expected.get_outgoing_inflight_upper_limit()
2785 );
2786 }
2787
2788 #[test]
2789 fn mqtt_options_builder_configures_packet_size_policies() {
2790 let properties = ConnectProperties {
2791 max_packet_size: Some(2048),
2792 ..ConnectProperties::new()
2793 };
2794
2795 let from_properties = MqttOptions::builder("client", "localhost")
2796 .connect_properties(properties)
2797 .build();
2798 assert_eq!(
2799 from_properties.incoming_packet_size_limit(),
2800 IncomingPacketSizeLimit::Bytes(2048)
2801 );
2802
2803 let unlimited = MqttOptions::builder("client", "localhost")
2804 .max_packet_size(Some(1024))
2805 .unlimited_incoming_packet_size()
2806 .build();
2807 assert_eq!(
2808 unlimited.incoming_packet_size_limit(),
2809 IncomingPacketSizeLimit::Unlimited
2810 );
2811 assert_eq!(unlimited.max_packet_size(), None);
2812 }
2813
2814 #[test]
2815 fn mqtt_options_builder_can_replace_and_clear_auth() {
2816 let actual = MqttOptions::builder("client", "localhost")
2817 .password("password")
2818 .clear_auth()
2819 .auth(ConnectAuth::Username {
2820 username: "next".to_owned(),
2821 })
2822 .build();
2823
2824 assert_eq!(
2825 actual.auth(),
2826 &ConnectAuth::Username {
2827 username: "next".to_owned(),
2828 }
2829 );
2830 }
2831
2832 #[test]
2833 fn mqtt_options_builder_request_capacity_feeds_client_builder_default() {
2834 let mqttoptions = MqttOptions::builder("test-1", "localhost")
2835 .request_channel_capacity(1)
2836 .build();
2837 let (client, _eventloop) = AsyncClient::builder(mqttoptions).build();
2838
2839 client
2840 .try_publish("hello/world", QoS::AtMostOnce, false, "one")
2841 .expect("first request should fit configured capacity");
2842 assert!(matches!(
2843 client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
2844 Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
2845 ));
2846 }
2847
2848 #[test]
2849 fn read_batch_size_defaults_to_adaptive() {
2850 let options = MqttOptions::new("client", "127.0.0.1");
2851 assert_eq!(options.read_batch_size(), 0);
2852 }
2853
2854 #[test]
2855 fn set_read_batch_size() {
2856 let mut options = MqttOptions::new("client", "127.0.0.1");
2857 options.set_read_batch_size(48);
2858 assert_eq!(options.read_batch_size(), 48);
2859 }
2860
2861 #[test]
2862 #[cfg(feature = "url")]
2863 fn from_url_uses_default_incoming_limit_when_unspecified() {
2864 let mqtt_opts = MqttOptions::parse_url("mqtt://host:42?client_id=foo").unwrap();
2865 assert_eq!(
2866 mqtt_opts.incoming_packet_size_limit(),
2867 IncomingPacketSizeLimit::Default
2868 );
2869 assert_eq!(
2870 mqtt_opts.max_incoming_packet_size(),
2871 Some(mqtt_opts.default_max_incoming_size)
2872 );
2873 }
2874}