1#![cfg_attr(docsrs, feature(doc_cfg))]
98
99#[macro_use]
100extern crate log;
101
102use std::fmt::{self, Debug, Formatter};
103#[cfg(feature = "use-rustls")]
104use std::sync::Arc;
105use std::time::Duration;
106
107mod client;
108mod eventloop;
109mod framed;
110pub mod mqttbytes;
111mod state;
112#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
113mod tls;
114
115pub use async_channel::{SendError, Sender, TrySendError};
116pub use client::{AsyncClient, Client, ClientError, Connection};
117pub use eventloop::{ConnectionError, Event, EventLoop};
118pub use mqttbytes::v4::*;
119pub use mqttbytes::*;
120pub use state::{MqttState, StateError};
121#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
122pub use tls::Error as TlsError;
123#[cfg(feature = "use-rustls")]
124pub use tokio_rustls::rustls::ClientConfig;
125
126pub type Incoming = Packet;
127
128#[derive(Debug, Eq, PartialEq, Clone)]
130pub enum Outgoing {
131 Publish(u16, String),
133 Subscribe(u16),
135 Unsubscribe(u16),
137 PubAck(u16),
139 PubRec(u16),
141 PubRel(u16),
143 PubComp(u16),
145 PingReq,
147 PingResp,
149 Disconnect,
151 AwaitAck(u16),
153}
154
155#[derive(Clone, Debug, PartialEq)]
158pub enum Request {
159 Publish(Publish),
160 PubAck(PubAck),
161 PubRec(PubRec),
162 PubComp(PubComp),
163 PubRel(PubRel),
164 PingReq,
165 PingResp,
166 Subscribe(Subscribe),
167 SubAck(SubAck),
168 Unsubscribe(Unsubscribe),
169 UnsubAck(UnsubAck),
170 Disconnect,
171}
172
173#[derive(Clone, Debug, Eq, PartialEq)]
175pub enum Key {
176 RSA(Vec<u8>),
177 ECC(Vec<u8>),
178}
179
180impl From<Publish> for Request {
181 fn from(publish: Publish) -> Request {
182 Request::Publish(publish)
183 }
184}
185
186impl From<Subscribe> for Request {
187 fn from(subscribe: Subscribe) -> Request {
188 Request::Subscribe(subscribe)
189 }
190}
191
192impl From<Unsubscribe> for Request {
193 fn from(unsubscribe: Unsubscribe) -> Request {
194 Request::Unsubscribe(unsubscribe)
195 }
196}
197
198#[derive(Clone)]
199pub enum Transport {
200 Tcp,
201 #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
202 Tls(TlsConfiguration),
203 #[cfg(unix)]
204 Unix,
205 #[cfg(feature = "websocket")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
207 Ws,
208 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
209 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
210 Wss(TlsConfiguration),
211}
212
213impl Default for Transport {
214 fn default() -> Self {
215 Self::tcp()
216 }
217}
218
219impl Transport {
220 pub fn tcp() -> Self {
222 Self::Tcp
223 }
224
225 #[cfg(feature = "use-rustls")]
226 #[cfg(feature = "use-rustls")]
228 pub fn tls(
229 ca: Vec<u8>,
230 client_auth: Option<(Vec<u8>, Key)>,
231 alpn: Option<Vec<Vec<u8>>>,
232 ) -> Self {
233 let config = TlsConfiguration::Simple {
234 ca,
235 alpn,
236 client_auth,
237 };
238
239 Self::tls_with_config(config)
240 }
241
242 #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
243 pub fn tls_with_config(tls_config: TlsConfiguration) -> Self {
244 Self::Tls(tls_config)
245 }
246
247 #[cfg(unix)]
248 pub fn unix() -> Self {
249 Self::Unix
250 }
251
252 #[cfg(feature = "websocket")]
254 #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
255 pub fn ws() -> Self {
256 Self::Ws
257 }
258
259 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
261 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
262 pub fn wss(
263 ca: Vec<u8>,
264 client_auth: Option<(Vec<u8>, Key)>,
265 alpn: Option<Vec<Vec<u8>>>,
266 ) -> Self {
267 let config = TlsConfiguration::Simple {
268 ca,
269 client_auth,
270 alpn,
271 };
272
273 Self::wss_with_config(config)
274 }
275
276 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
277 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
278 pub fn wss_with_config(tls_config: TlsConfiguration) -> Self {
279 Self::Wss(tls_config)
280 }
281}
282
283#[derive(Clone)]
284#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
285pub enum TlsConfiguration {
286 #[cfg(feature = "use-rustls")]
287 Simple {
288 ca: Vec<u8>,
290 alpn: Option<Vec<Vec<u8>>>,
292 client_auth: Option<(Vec<u8>, Key)>,
294 },
295 #[cfg(feature = "use-rustls")]
296 Rustls(Arc<ClientConfig>),
298 #[cfg(feature = "use-native-tls")]
299 Native,
300}
301
302#[cfg(feature = "use-rustls")]
303impl From<ClientConfig> for TlsConfiguration {
304 fn from(config: ClientConfig) -> Self {
305 TlsConfiguration::Rustls(Arc::new(config))
306 }
307}
308
309#[derive(Clone)]
314pub struct MqttOptions {
315 broker_addr: String,
317 port: u16,
319 transport: Transport,
321 keep_alive: Duration,
323 clean_session: bool,
325 client_id: String,
327 credentials: Option<(String, String)>,
329 max_incoming_packet_size: usize,
331 max_outgoing_packet_size: usize,
336 request_channel_capacity: usize,
338 max_request_batch: usize,
340 pending_throttle: Duration,
343 inflight: u16,
345 last_will: Option<LastWill>,
347 conn_timeout: u64,
349 manual_acks: bool,
352}
353
354impl MqttOptions {
355 pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
357 let id = id.into();
358 if id.starts_with(' ') || id.is_empty() {
359 panic!("Invalid client id")
360 }
361
362 MqttOptions {
363 broker_addr: host.into(),
364 port,
365 transport: Transport::tcp(),
366 keep_alive: Duration::from_secs(60),
367 clean_session: true,
368 client_id: id,
369 credentials: None,
370 max_incoming_packet_size: 10 * 1024,
371 max_outgoing_packet_size: 10 * 1024,
372 request_channel_capacity: 10,
373 max_request_batch: 0,
374 pending_throttle: Duration::from_micros(0),
375 inflight: 100,
376 last_will: None,
377 conn_timeout: 5,
378 manual_acks: false,
379 }
380 }
381
382 pub fn broker_address(&self) -> (String, u16) {
384 (self.broker_addr.clone(), self.port)
385 }
386
387 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
388 self.last_will = Some(will);
389 self
390 }
391
392 pub fn last_will(&self) -> Option<LastWill> {
393 self.last_will.clone()
394 }
395
396 pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
397 self.transport = transport;
398 self
399 }
400
401 pub fn transport(&self) -> Transport {
402 self.transport.clone()
403 }
404
405 pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
408 if duration.as_secs() < 5 {
409 panic!("Keep alives should be >= 5 secs");
410 }
411
412 self.keep_alive = duration;
413 self
414 }
415
416 pub fn keep_alive(&self) -> Duration {
418 self.keep_alive
419 }
420
421 pub fn client_id(&self) -> String {
423 self.client_id.clone()
424 }
425
426 pub fn set_max_packet_size(&mut self, incoming: usize, outgoing: usize) -> &mut Self {
428 self.max_incoming_packet_size = incoming;
429 self.max_outgoing_packet_size = outgoing;
430 self
431 }
432
433 pub fn max_packet_size(&self) -> usize {
435 self.max_incoming_packet_size
436 }
437
438 pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
445 self.clean_session = clean_session;
446 self
447 }
448
449 pub fn clean_session(&self) -> bool {
451 self.clean_session
452 }
453
454 pub fn set_credentials<U: Into<String>, P: Into<String>>(
456 &mut self,
457 username: U,
458 password: P,
459 ) -> &mut Self {
460 self.credentials = Some((username.into(), password.into()));
461 self
462 }
463
464 pub fn credentials(&self) -> Option<(String, String)> {
466 self.credentials.clone()
467 }
468
469 pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
471 self.request_channel_capacity = capacity;
472 self
473 }
474
475 pub fn request_channel_capacity(&self) -> usize {
477 self.request_channel_capacity
478 }
479
480 pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
482 self.pending_throttle = duration;
483 self
484 }
485
486 pub fn pending_throttle(&self) -> Duration {
488 self.pending_throttle
489 }
490
491 pub fn set_inflight(&mut self, inflight: u16) -> &mut Self {
493 if inflight == 0 {
494 panic!("zero in flight is not allowed")
495 }
496
497 self.inflight = inflight;
498 self
499 }
500
501 pub fn inflight(&self) -> u16 {
503 self.inflight
504 }
505
506 pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
508 self.conn_timeout = timeout;
509 self
510 }
511
512 pub fn connection_timeout(&self) -> u64 {
514 self.conn_timeout
515 }
516
517 pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
519 self.manual_acks = manual_acks;
520 self
521 }
522
523 pub fn manual_acks(&self) -> bool {
525 self.manual_acks
526 }
527}
528
529#[cfg(feature = "url")]
530#[derive(Debug, PartialEq, thiserror::Error)]
531pub enum OptionError {
532 #[error("Unsupported URL scheme.")]
533 Scheme,
534
535 #[error("Missing client ID.")]
536 ClientId,
537
538 #[error("Invalid keep-alive value.")]
539 KeepAlive,
540
541 #[error("Invalid clean-session value.")]
542 CleanSession,
543
544 #[error("Invalid max-incoming-packet-size value.")]
545 MaxIncomingPacketSize,
546
547 #[error("Invalid max-outgoing-packet-size value.")]
548 MaxOutgoingPacketSize,
549
550 #[error("Invalid request-channel-capacity value.")]
551 RequestChannelCapacity,
552
553 #[error("Invalid max-request-batch value.")]
554 MaxRequestBatch,
555
556 #[error("Invalid pending-throttle value.")]
557 PendingThrottle,
558
559 #[error("Invalid inflight value.")]
560 Inflight,
561
562 #[error("Invalid conn-timeout value.")]
563 ConnTimeout,
564
565 #[error("Unknown option: {0}")]
566 Unknown(String),
567}
568
569#[cfg(feature = "url")]
570impl std::convert::TryFrom<url::Url> for MqttOptions {
571 type Error = OptionError;
572
573 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
574 use std::collections::HashMap;
575
576 let broker_addr = url.host_str().unwrap_or_default().to_owned();
577
578 let (transport, default_port) = match url.scheme() {
579 "mqtts" | "ssl" => (Transport::Tcp, 8883),
583 "mqtt" | "tcp" => (Transport::Tcp, 1883),
584 _ => return Err(OptionError::Scheme),
585 };
586
587 let port = url.port().unwrap_or(default_port);
588
589 let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
590
591 let keep_alive = Duration::from_secs(
592 queries
593 .remove("keep_alive_secs")
594 .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
595 .transpose()?
596 .unwrap_or(60),
597 );
598
599 let client_id = queries
600 .remove("client_id")
601 .ok_or(OptionError::ClientId)?
602 .into_owned();
603
604 let clean_session = queries
605 .remove("clean_session")
606 .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanSession))
607 .transpose()?
608 .unwrap_or(true);
609
610 let credentials = {
611 match url.username() {
612 "" => None,
613 username => Some((
614 username.to_owned(),
615 url.password().unwrap_or_default().to_owned(),
616 )),
617 }
618 };
619
620 let max_incoming_packet_size = queries
621 .remove("max_incoming_packet_size_bytes")
622 .map(|v| {
623 v.parse::<usize>()
624 .map_err(|_| OptionError::MaxIncomingPacketSize)
625 })
626 .transpose()?
627 .unwrap_or(10 * 1024);
628
629 let max_outgoing_packet_size = queries
630 .remove("max_outgoing_packet_size_bytes")
631 .map(|v| {
632 v.parse::<usize>()
633 .map_err(|_| OptionError::MaxOutgoingPacketSize)
634 })
635 .transpose()?
636 .unwrap_or(10 * 1024);
637
638 let request_channel_capacity = queries
639 .remove("request_channel_capacity_num")
640 .map(|v| {
641 v.parse::<usize>()
642 .map_err(|_| OptionError::RequestChannelCapacity)
643 })
644 .transpose()?
645 .unwrap_or(10);
646
647 let max_request_batch = queries
648 .remove("max_request_batch_num")
649 .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
650 .transpose()?
651 .unwrap_or(0);
652
653 let pending_throttle = Duration::from_micros(
654 queries
655 .remove("pending_throttle_usecs")
656 .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
657 .transpose()?
658 .unwrap_or(0),
659 );
660
661 let inflight = queries
662 .remove("inflight_num")
663 .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
664 .transpose()?
665 .unwrap_or(100);
666
667 let conn_timeout = queries
668 .remove("conn_timeout_secs")
669 .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
670 .transpose()?
671 .unwrap_or(5);
672
673 if let Some((opt, _)) = queries.into_iter().next() {
674 return Err(OptionError::Unknown(opt.into_owned()));
675 }
676
677 Ok(Self {
678 broker_addr,
679 port,
680 transport,
681 keep_alive,
682 clean_session,
683 client_id,
684 credentials,
685 max_incoming_packet_size,
686 max_outgoing_packet_size,
687 request_channel_capacity,
688 max_request_batch,
689 pending_throttle,
690 inflight,
691 last_will: None,
692 conn_timeout,
693 manual_acks: false,
694 })
695 }
696}
697
698impl Debug for MqttOptions {
701 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
702 f.debug_struct("MqttOptions")
703 .field("broker_addr", &self.broker_addr)
704 .field("port", &self.port)
705 .field("keep_alive", &self.keep_alive)
706 .field("clean_session", &self.clean_session)
707 .field("client_id", &self.client_id)
708 .field("credentials", &self.credentials)
709 .field("max_packet_size", &self.max_incoming_packet_size)
710 .field("request_channel_capacity", &self.request_channel_capacity)
711 .field("max_request_batch", &self.max_request_batch)
712 .field("pending_throttle", &self.pending_throttle)
713 .field("inflight", &self.inflight)
714 .field("last_will", &self.last_will)
715 .field("conn_timeout", &self.conn_timeout)
716 .field("manual_acks", &self.manual_acks)
717 .finish()
718 }
719}
720
721#[cfg(test)]
722mod test {
723 use super::*;
724
725 #[test]
726 #[should_panic]
727 fn client_id_startswith_space() {
728 let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
729 }
730
731 #[test]
732 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
733 fn no_scheme() {
734 let mut _mqtt_opts = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
735
736 _mqtt_opts.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None));
737
738 if let crate::Transport::Wss(TlsConfiguration::Simple {
739 ca,
740 client_auth,
741 alpn,
742 }) = _mqtt_opts.transport
743 {
744 assert_eq!(ca, Vec::from("Test CA"));
745 assert_eq!(client_auth, None);
746 assert_eq!(alpn, None);
747 } else {
748 panic!("Unexpected transport!");
749 }
750
751 assert_eq!(_mqtt_opts.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
752 }
753
754 #[test]
755 #[cfg(feature = "url")]
756 fn from_url() {
757 use std::convert::TryInto;
758 use std::str::FromStr;
759
760 fn opt(s: &str) -> Result<MqttOptions, OptionError> {
761 url::Url::from_str(s).expect("valid url").try_into()
762 }
763 fn ok(s: &str) -> MqttOptions {
764 opt(s).expect("valid options")
765 }
766 fn err(s: &str) -> OptionError {
767 opt(s).expect_err("invalid options")
768 }
769
770 let v = ok("mqtt://host:42?client_id=foo");
771 assert_eq!(v.broker_address(), ("host".to_owned(), 42));
772 assert_eq!(v.client_id(), "foo".to_owned());
773
774 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
775 assert_eq!(v.keep_alive, Duration::from_secs(5));
776
777 assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
778 assert_eq!(
779 err("mqtt://host:42?client_id=foo&foo=bar"),
780 OptionError::Unknown("foo".to_owned())
781 );
782 assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
783 assert_eq!(
784 err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
785 OptionError::KeepAlive
786 );
787 assert_eq!(
788 err("mqtt://host:42?client_id=foo&clean_session=foo"),
789 OptionError::CleanSession
790 );
791 assert_eq!(
792 err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
793 OptionError::MaxIncomingPacketSize
794 );
795 assert_eq!(
796 err("mqtt://host:42?client_id=foo&max_outgoing_packet_size_bytes=foo"),
797 OptionError::MaxOutgoingPacketSize
798 );
799 assert_eq!(
800 err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
801 OptionError::RequestChannelCapacity
802 );
803 assert_eq!(
804 err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
805 OptionError::MaxRequestBatch
806 );
807 assert_eq!(
808 err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
809 OptionError::PendingThrottle
810 );
811 assert_eq!(
812 err("mqtt://host:42?client_id=foo&inflight_num=foo"),
813 OptionError::Inflight
814 );
815 assert_eq!(
816 err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
817 OptionError::ConnTimeout
818 );
819 }
820
821 #[test]
822 #[should_panic]
823 fn no_client_id() {
824 let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
825 }
826}