1use bytes::Bytes;
2use std::fmt::{self, Debug, Formatter};
3use std::time::Duration;
4#[cfg(feature = "websocket")]
5use std::{
6 future::{Future, IntoFuture},
7 pin::Pin,
8 sync::Arc,
9};
10
11mod client;
12mod eventloop;
13mod framed;
14pub mod mqttbytes;
15mod state;
16
17use crate::tokens::Resolver;
18use crate::{NetworkOptions, Outgoing, Transport};
19
20use mqttbytes::v5::*;
21
22pub use client::{
23 AsyncClient, Client, ClientError, Connection, Iter, RecvError, RecvTimeoutError, TryRecvError,
24};
25pub use eventloop::{ConnectionError, Event, EventLoop};
26pub use state::{MqttState, StateError};
27
28#[cfg(feature = "use-rustls-no-provider")]
29pub use crate::tls::Error as TlsError;
30
31#[cfg(feature = "proxy")]
32pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
33
34pub type Incoming = Packet;
35
36#[derive(Debug)]
38pub enum AckOfPub {
39 PubAck(PubAck),
40 PubComp(PubComp),
41 None,
42}
43
44#[derive(Debug)]
46pub enum AckOfAck {
47 None,
48 PubRel(PubRel),
49}
50
51#[derive(Debug)]
54pub enum Request {
55 Publish(Publish, Resolver<AckOfPub>),
56 PubAck(PubAck, Resolver<AckOfAck>),
57 PubRec(PubRec, Resolver<AckOfAck>),
58 PubRel(PubRel, Resolver<AckOfPub>),
59 Subscribe(Subscribe, Resolver<SubAck>),
60 Unsubscribe(Unsubscribe, Resolver<UnsubAck>),
61 Disconnect(Resolver<()>),
62 PingReq,
63}
64
65#[cfg(feature = "websocket")]
66type RequestModifierFn = Arc<
67 dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
68 + Send
69 + Sync,
70>;
71
72#[derive(Clone)]
77pub struct MqttOptions {
78 broker_addr: String,
80 port: u16,
82 transport: Transport,
84 keep_alive: Duration,
86 clean_start: bool,
88 client_id: String,
90 credentials: Option<Login>,
92 request_channel_capacity: usize,
94 max_request_batch: usize,
96 pending_throttle: Duration,
99 last_will: Option<LastWill>,
101 conn_timeout: u64,
103 default_max_incoming_size: u32,
106 connect_properties: Option<ConnectProperties>,
108 manual_acks: bool,
111 network_options: NetworkOptions,
112 #[cfg(feature = "proxy")]
113 proxy: Option<Proxy>,
115 outgoing_inflight_upper_limit: Option<u16>,
118 #[cfg(feature = "websocket")]
119 request_modifier: Option<RequestModifierFn>,
120}
121
122impl MqttOptions {
123 pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
133 MqttOptions {
134 broker_addr: host.into(),
135 port,
136 transport: Transport::tcp(),
137 keep_alive: Duration::from_secs(60),
138 clean_start: true,
139 client_id: id.into(),
140 credentials: None,
141 request_channel_capacity: 10,
142 max_request_batch: 0,
143 pending_throttle: Duration::from_micros(0),
144 last_will: None,
145 conn_timeout: 5,
146 default_max_incoming_size: 10 * 1024,
147 connect_properties: None,
148 manual_acks: false,
149 network_options: NetworkOptions::new(),
150 #[cfg(feature = "proxy")]
151 proxy: None,
152 outgoing_inflight_upper_limit: None,
153 #[cfg(feature = "websocket")]
154 request_modifier: None,
155 }
156 }
157
158 #[cfg(feature = "url")]
159 pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
185 let url = url::Url::parse(&url.into())?;
186 let options = MqttOptions::try_from(url)?;
187
188 Ok(options)
189 }
190
191 pub fn broker_address(&self) -> (String, u16) {
193 (self.broker_addr.clone(), self.port)
194 }
195
196 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
197 self.last_will = Some(will);
198 self
199 }
200
201 pub fn last_will(&self) -> Option<LastWill> {
202 self.last_will.clone()
203 }
204
205 #[cfg(feature = "websocket")]
206 pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
207 where
208 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
209 O: IntoFuture<Output = http::Request<()>> + 'static,
210 O::IntoFuture: Send,
211 {
212 self.request_modifier = Some(Arc::new(move |request| {
213 let request_modifier = request_modifier(request).into_future();
214 Box::pin(request_modifier)
215 }));
216
217 self
218 }
219
220 #[cfg(feature = "websocket")]
221 pub fn request_modifier(&self) -> Option<RequestModifierFn> {
222 self.request_modifier.clone()
223 }
224
225 pub fn set_client_id(&mut self, client_id: String) -> &mut Self {
226 self.client_id = client_id;
227 self
228 }
229
230 pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
231 self.transport = transport;
232 self
233 }
234
235 pub fn transport(&self) -> Transport {
236 self.transport.clone()
237 }
238
239 pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
242 assert!(duration.as_secs() >= 5, "Keep alives should be >= 5 secs");
243
244 self.keep_alive = duration;
245 self
246 }
247
248 pub fn keep_alive(&self) -> Duration {
250 self.keep_alive
251 }
252
253 pub fn client_id(&self) -> String {
255 self.client_id.clone()
256 }
257
258 pub fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
265 self.clean_start = clean_start;
266 self
267 }
268
269 pub fn clean_start(&self) -> bool {
271 self.clean_start
272 }
273
274 pub fn set_credentials<U: Into<String>, P: Into<String>>(
276 &mut self,
277 username: U,
278 password: P,
279 ) -> &mut Self {
280 self.credentials = Some(Login::new(username, password));
281 self
282 }
283
284 pub fn credentials(&self) -> Option<Login> {
286 self.credentials.clone()
287 }
288
289 pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
291 self.request_channel_capacity = capacity;
292 self
293 }
294
295 pub fn request_channel_capacity(&self) -> usize {
297 self.request_channel_capacity
298 }
299
300 pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
302 self.pending_throttle = duration;
303 self
304 }
305
306 pub fn pending_throttle(&self) -> Duration {
308 self.pending_throttle
309 }
310
311 pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
313 self.conn_timeout = timeout;
314 self
315 }
316
317 pub fn connection_timeout(&self) -> u64 {
319 self.conn_timeout
320 }
321
322 pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
324 self.connect_properties = Some(properties);
325 self
326 }
327
328 pub fn connect_properties(&self) -> Option<ConnectProperties> {
330 self.connect_properties.clone()
331 }
332
333 pub fn set_session_expiry_interval(&mut self, interval: Option<u32>) -> &mut Self {
335 if let Some(conn_props) = &mut self.connect_properties {
336 conn_props.session_expiry_interval = interval;
337 self
338 } else {
339 let mut conn_props = ConnectProperties::new();
340 conn_props.session_expiry_interval = interval;
341 self.set_connect_properties(conn_props)
342 }
343 }
344
345 pub fn session_expiry_interval(&self) -> Option<u32> {
347 if let Some(conn_props) = &self.connect_properties {
348 conn_props.session_expiry_interval
349 } else {
350 None
351 }
352 }
353
354 pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
356 if let Some(conn_props) = &mut self.connect_properties {
357 conn_props.receive_maximum = recv_max;
358 self
359 } else {
360 let mut conn_props = ConnectProperties::new();
361 conn_props.receive_maximum = recv_max;
362 self.set_connect_properties(conn_props)
363 }
364 }
365
366 pub fn receive_maximum(&self) -> Option<u16> {
368 if let Some(conn_props) = &self.connect_properties {
369 conn_props.receive_maximum
370 } else {
371 None
372 }
373 }
374
375 pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
377 if let Some(conn_props) = &mut self.connect_properties {
378 conn_props.max_packet_size = max_size;
379 self
380 } else {
381 let mut conn_props = ConnectProperties::new();
382 conn_props.max_packet_size = max_size;
383 self.set_connect_properties(conn_props)
384 }
385 }
386
387 pub fn max_packet_size(&self) -> Option<u32> {
389 if let Some(conn_props) = &self.connect_properties {
390 conn_props.max_packet_size
391 } else {
392 None
393 }
394 }
395
396 pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
398 if let Some(conn_props) = &mut self.connect_properties {
399 conn_props.topic_alias_max = topic_alias_max;
400 self
401 } else {
402 let mut conn_props = ConnectProperties::new();
403 conn_props.topic_alias_max = topic_alias_max;
404 self.set_connect_properties(conn_props)
405 }
406 }
407
408 pub fn topic_alias_max(&self) -> Option<u16> {
410 if let Some(conn_props) = &self.connect_properties {
411 conn_props.topic_alias_max
412 } else {
413 None
414 }
415 }
416
417 pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
419 if let Some(conn_props) = &mut self.connect_properties {
420 conn_props.request_response_info = request_response_info;
421 self
422 } else {
423 let mut conn_props = ConnectProperties::new();
424 conn_props.request_response_info = request_response_info;
425 self.set_connect_properties(conn_props)
426 }
427 }
428
429 pub fn request_response_info(&self) -> Option<u8> {
431 if let Some(conn_props) = &self.connect_properties {
432 conn_props.request_response_info
433 } else {
434 None
435 }
436 }
437
438 pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
440 if let Some(conn_props) = &mut self.connect_properties {
441 conn_props.request_problem_info = request_problem_info;
442 self
443 } else {
444 let mut conn_props = ConnectProperties::new();
445 conn_props.request_problem_info = request_problem_info;
446 self.set_connect_properties(conn_props)
447 }
448 }
449
450 pub fn request_problem_info(&self) -> Option<u8> {
452 if let Some(conn_props) = &self.connect_properties {
453 conn_props.request_problem_info
454 } else {
455 None
456 }
457 }
458
459 pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
461 if let Some(conn_props) = &mut self.connect_properties {
462 conn_props.user_properties = user_properties;
463 self
464 } else {
465 let mut conn_props = ConnectProperties::new();
466 conn_props.user_properties = user_properties;
467 self.set_connect_properties(conn_props)
468 }
469 }
470
471 pub fn user_properties(&self) -> Vec<(String, String)> {
473 if let Some(conn_props) = &self.connect_properties {
474 conn_props.user_properties.clone()
475 } else {
476 Vec::new()
477 }
478 }
479
480 pub fn set_authentication_method(
482 &mut self,
483 authentication_method: Option<String>,
484 ) -> &mut Self {
485 if let Some(conn_props) = &mut self.connect_properties {
486 conn_props.authentication_method = authentication_method;
487 self
488 } else {
489 let mut conn_props = ConnectProperties::new();
490 conn_props.authentication_method = authentication_method;
491 self.set_connect_properties(conn_props)
492 }
493 }
494
495 pub fn authentication_method(&self) -> Option<String> {
497 if let Some(conn_props) = &self.connect_properties {
498 conn_props.authentication_method.clone()
499 } else {
500 None
501 }
502 }
503
504 pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
506 if let Some(conn_props) = &mut self.connect_properties {
507 conn_props.authentication_data = authentication_data;
508 self
509 } else {
510 let mut conn_props = ConnectProperties::new();
511 conn_props.authentication_data = authentication_data;
512 self.set_connect_properties(conn_props)
513 }
514 }
515
516 pub fn authentication_data(&self) -> Option<Bytes> {
518 if let Some(conn_props) = &self.connect_properties {
519 conn_props.authentication_data.clone()
520 } else {
521 None
522 }
523 }
524
525 pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
527 self.manual_acks = manual_acks;
528 self
529 }
530
531 pub fn manual_acks(&self) -> bool {
533 self.manual_acks
534 }
535
536 pub fn network_options(&self) -> NetworkOptions {
537 self.network_options.clone()
538 }
539
540 pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
541 self.network_options = network_options;
542 self
543 }
544
545 #[cfg(feature = "proxy")]
546 pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
547 self.proxy = Some(proxy);
548 self
549 }
550
551 #[cfg(feature = "proxy")]
552 pub fn proxy(&self) -> Option<Proxy> {
553 self.proxy.clone()
554 }
555
556 pub fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
559 self.outgoing_inflight_upper_limit = Some(limit);
560 self
561 }
562
563 pub fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
566 self.outgoing_inflight_upper_limit
567 }
568}
569
570#[cfg(feature = "url")]
571#[derive(Debug, PartialEq, Eq, thiserror::Error)]
572pub enum OptionError {
573 #[error("Unsupported URL scheme.")]
574 Scheme,
575
576 #[error("Missing client ID.")]
577 ClientId,
578
579 #[error("Invalid keep-alive value.")]
580 KeepAlive,
581
582 #[error("Invalid clean-start value.")]
583 CleanStart,
584
585 #[error("Invalid max-incoming-packet-size value.")]
586 MaxIncomingPacketSize,
587
588 #[error("Invalid max-outgoing-packet-size value.")]
589 MaxOutgoingPacketSize,
590
591 #[error("Invalid request-channel-capacity value.")]
592 RequestChannelCapacity,
593
594 #[error("Invalid max-request-batch value.")]
595 MaxRequestBatch,
596
597 #[error("Invalid pending-throttle value.")]
598 PendingThrottle,
599
600 #[error("Invalid inflight value.")]
601 Inflight,
602
603 #[error("Invalid conn-timeout value.")]
604 ConnTimeout,
605
606 #[error("Unknown option: {0}")]
607 Unknown(String),
608
609 #[error("Couldn't parse option from url: {0}")]
610 Parse(#[from] url::ParseError),
611}
612
613#[cfg(feature = "url")]
614impl std::convert::TryFrom<url::Url> for MqttOptions {
615 type Error = OptionError;
616
617 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
618 use std::collections::HashMap;
619
620 let host = url.host_str().unwrap_or_default().to_owned();
621
622 let (transport, default_port) = match url.scheme() {
623 #[cfg(feature = "use-rustls-no-provider")]
627 "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
628 "mqtt" | "tcp" => (Transport::Tcp, 1883),
629 #[cfg(feature = "websocket")]
630 "ws" => (Transport::Ws, 8000),
631 #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
632 "wss" => (Transport::wss_with_default_config(), 8000),
633 _ => return Err(OptionError::Scheme),
634 };
635
636 let port = url.port().unwrap_or(default_port);
637
638 let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
639
640 let id = queries
641 .remove("client_id")
642 .ok_or(OptionError::ClientId)?
643 .into_owned();
644
645 let mut options = MqttOptions::new(id, host, port);
646 let mut connect_props = ConnectProperties::new();
647 options.set_transport(transport);
648
649 if let Some(keep_alive) = queries
650 .remove("keep_alive_secs")
651 .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
652 .transpose()?
653 {
654 options.set_keep_alive(Duration::from_secs(keep_alive));
655 }
656
657 if let Some(clean_start) = queries
658 .remove("clean_start")
659 .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
660 .transpose()?
661 {
662 options.set_clean_start(clean_start);
663 }
664
665 if let Some((username, password)) = {
666 match url.username() {
667 "" => None,
668 username => Some((
669 username.to_owned(),
670 url.password().unwrap_or_default().to_owned(),
671 )),
672 }
673 } {
674 options.set_credentials(username, password);
675 }
676
677 connect_props.max_packet_size = queries
678 .remove("max_incoming_packet_size_bytes")
679 .map(|v| {
680 v.parse::<u32>()
681 .map_err(|_| OptionError::MaxIncomingPacketSize)
682 })
683 .transpose()?;
684
685 if let Some(request_channel_capacity) = queries
686 .remove("request_channel_capacity_num")
687 .map(|v| {
688 v.parse::<usize>()
689 .map_err(|_| OptionError::RequestChannelCapacity)
690 })
691 .transpose()?
692 {
693 options.request_channel_capacity = request_channel_capacity;
694 }
695
696 if let Some(max_request_batch) = queries
697 .remove("max_request_batch_num")
698 .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
699 .transpose()?
700 {
701 options.max_request_batch = max_request_batch;
702 }
703
704 if let Some(pending_throttle) = queries
705 .remove("pending_throttle_usecs")
706 .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
707 .transpose()?
708 {
709 options.set_pending_throttle(Duration::from_micros(pending_throttle));
710 }
711
712 connect_props.receive_maximum = queries
713 .remove("inflight_num")
714 .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
715 .transpose()?;
716
717 if let Some(conn_timeout) = queries
718 .remove("conn_timeout_secs")
719 .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
720 .transpose()?
721 {
722 options.set_connection_timeout(conn_timeout);
723 }
724
725 if let Some((opt, _)) = queries.into_iter().next() {
726 return Err(OptionError::Unknown(opt.into_owned()));
727 }
728
729 options.connect_properties = Some(connect_props);
730 Ok(options)
731 }
732}
733
734impl Debug for MqttOptions {
737 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
738 f.debug_struct("MqttOptions")
739 .field("broker_addr", &self.broker_addr)
740 .field("port", &self.port)
741 .field("keep_alive", &self.keep_alive)
742 .field("clean_start", &self.clean_start)
743 .field("client_id", &self.client_id)
744 .field("credentials", &self.credentials)
745 .field("request_channel_capacity", &self.request_channel_capacity)
746 .field("max_request_batch", &self.max_request_batch)
747 .field("pending_throttle", &self.pending_throttle)
748 .field("last_will", &self.last_will)
749 .field("conn_timeout", &self.conn_timeout)
750 .field("manual_acks", &self.manual_acks)
751 .field("connect properties", &self.connect_properties)
752 .finish()
753 }
754}
755
756#[cfg(test)]
757mod test {
758 use super::*;
759
760 #[test]
761 #[cfg(all(feature = "use-rustls-no-provider", feature = "websocket"))]
762 fn no_scheme() {
763 use crate::{TlsConfiguration, Transport};
764 let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
765
766 mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
767
768 if let Transport::Wss(TlsConfiguration::Simple {
769 ca,
770 client_auth,
771 alpn,
772 }) = mqttoptions.transport
773 {
774 assert_eq!(ca, Vec::from("Test CA"));
775 assert_eq!(client_auth, None);
776 assert_eq!(alpn, None);
777 } else {
778 panic!("Unexpected transport!");
779 }
780
781 assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
782 }
783
784 #[test]
785 #[cfg(feature = "url")]
786 fn from_url() {
787 fn opt(s: &str) -> Result<MqttOptions, OptionError> {
788 MqttOptions::parse_url(s)
789 }
790 fn ok(s: &str) -> MqttOptions {
791 opt(s).expect("valid options")
792 }
793 fn err(s: &str) -> OptionError {
794 opt(s).expect_err("invalid options")
795 }
796
797 let v = ok("mqtt://host:42?client_id=foo");
798 assert_eq!(v.broker_address(), ("host".to_owned(), 42));
799 assert_eq!(v.client_id(), "foo".to_owned());
800
801 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
802 assert_eq!(v.keep_alive, Duration::from_secs(5));
803
804 assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
805 assert_eq!(
806 err("mqtt://host:42?client_id=foo&foo=bar"),
807 OptionError::Unknown("foo".to_owned())
808 );
809 assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
810 assert_eq!(
811 err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
812 OptionError::KeepAlive
813 );
814 assert_eq!(
815 err("mqtt://host:42?client_id=foo&clean_start=foo"),
816 OptionError::CleanStart
817 );
818 assert_eq!(
819 err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
820 OptionError::MaxIncomingPacketSize
821 );
822 assert_eq!(
823 err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
824 OptionError::RequestChannelCapacity
825 );
826 assert_eq!(
827 err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
828 OptionError::MaxRequestBatch
829 );
830 assert_eq!(
831 err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
832 OptionError::PendingThrottle
833 );
834 assert_eq!(
835 err("mqtt://host:42?client_id=foo&inflight_num=foo"),
836 OptionError::Inflight
837 );
838 assert_eq!(
839 err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
840 OptionError::ConnTimeout
841 );
842 }
843
844 #[test]
845 fn allow_empty_client_id() {
846 let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_start(true);
847 }
848}