lmc/
options.rs

1use std::net::SocketAddr;
2use std::time::Duration;
3
4use tokio::net::TcpStream;
5
6use crate::QoS;
7use crate::transport::{Transport, TcpTransport};
8use crate::errors::ConnectError;
9
10/// [`LastWill`] describes a message to be published if the client disconnects unexpectedly,
11/// e.g. if the socket gets closed before a `DISCONNECT` packet could be sent by the client.
12pub struct LastWill<'a>
13{
14    /// The topic to publish in
15    pub topic: &'a str,
16
17    /// The message (aka payload) to publish
18    pub message: &'a [u8],
19
20    /// Whether the last will message should be retained
21    pub retain: bool,
22
23    /// The quality of service of the last will message
24    pub qos: QoS
25}
26
27/// A bitfield used to represent what versions of the Internet Protocol (IP) to enable.
28/// 
29/// The set is initially constructed with all versions enabled by default (IPv4 and IPV6)
30/// using [`IpVersionSet::all()`].
31#[derive(Debug, Clone, Copy)]
32pub struct IpVersionSet(u8);
33
34impl IpVersionSet
35{
36    const V4_BIT: u8 = 1;
37    const V6_BIT: u8 = 2;
38
39    /// Constructs an [`IpVersionSet`] with both IPv4 and IPv6 enabled.
40    pub fn all() -> Self
41    {
42        Self(Self::V4_BIT | Self::V6_BIT)
43    }
44
45    /// Creates a copy of this set that excludes IPv4.
46    pub fn without_v4(self) -> Self
47    {
48        Self(self.0 & !Self::V4_BIT)
49    }
50
51    /// Creates a copy of this set that excludes IPv6.
52    pub fn without_v6(self) -> Self
53    {
54        Self(self.0 & !Self::V6_BIT)
55    }
56
57    /// Checks whether the IP version associated with the specified [`SocketAddr`] is
58    /// included in this set.
59    pub fn supports(self, addr: &SocketAddr) -> bool
60    {
61        match addr {
62            SocketAddr::V4(_) => (self.0 & Self::V4_BIT) != 0,
63            SocketAddr::V6(_) => (self.0 & Self::V6_BIT) != 0
64        }
65    }
66}
67
68/// A trait to wrap the creation of transports. Mainly used for TLS purposes.
69pub trait ConnectionConfig<T>
70{
71    /// Creates a "connection" tied to the specified hostname. Does nothing
72    /// for non-TLS connections.
73    fn create_connection(self, host: &str) -> Result<T, ConnectError>;
74
75    /// Creates the transport based on a [`TcpStream`] and the previously-created
76    /// connection.
77    fn create_transport(stream: TcpStream, connection: T) -> Box<dyn Transport>;
78}
79
80impl ConnectionConfig<()> for ()
81{
82    fn create_connection(self, _host: &str) -> Result<(), ConnectError>
83    {
84        Ok(())
85    }
86
87    fn create_transport(stream: TcpStream, _connection: ()) -> Box<dyn Transport>
88    {
89        Box::new(TcpTransport::new(stream))
90    }
91}
92
93/// A struct containing all the MQTT protocol & implementation settings. To create this
94/// structure with its default values, use [`Options::new()`](OptionsT::new()).
95/// 
96/// # Example
97/// 
98#[cfg_attr(feature = "tls", doc = r##"```
99use lmc::{Options, QoS};
100
101let mut opts = Options::new("client_id")
102    .enable_tls()
103    .expect("Failed to load native system TLS certificates");
104    
105opts.set_last_will("status", b"unexpected_disconnect", true, QoS::AtLeastOnce)
106    .set_keep_alive(10)
107    .set_clean_session()
108    .set_no_delay();
109```"##)]
110pub struct OptionsT<'a, T>
111{
112    /// Whether to establish a persistent session or not. If this is set to `true`,
113    /// then the broker will drop any pre-existing session information associated
114    /// with the specified [`Self::client_id`]. This means that when subscribing to
115    /// topics, any retained messages will be (re-)transmitted to this client. In
116    /// addition, the broker will not store any information about this session.
117    /// 
118    /// Default is `false`.
119    pub clean_session: bool,
120
121    /// Time interval (in seconds) between any packet and a `PING` requests.
122    /// 
123    /// Default is 30 seconds.
124    pub keep_alive: u16,
125
126    /// A string identifying this client. The broker may choose to use soemthing
127    /// different.
128    /// 
129    /// It is also used by the broker to establish a persistent session (should
130    /// [`Self::clean_session`] be `false`).
131    pub client_id: &'a str,
132
133    /// The [`LastWill`] is used to publish a message if the client connection ends
134    /// in an unexpected manner. This is optional.
135    /// 
136    /// Default is `None`.
137    pub last_will: Option<LastWill<'a>>,
138
139    /// An optional username used to authenticate the client to the broker.
140    /// 
141    /// Default is `None`.
142    pub username: Option<&'a str>,
143
144    /// An optional password used to authenticate the client to the broker.
145    /// 
146    /// Default is `None`.
147    pub password: Option<&'a [u8]>,
148
149    /// Sets the "no delay" flag to the TCP connection.
150    /// 
151    /// Default is `false`.
152    /// 
153    /// See [`std::net::TcpStream::set_nodelay()`]
154    pub no_delay: bool,
155
156    /// A set specifying which Internet Protocol versions to use (IPv4 and/or
157    /// IPv6) to establish the TCP connection.
158    /// 
159    /// By default, both IP versions are enabled.
160    pub enabled_ip_versions: IpVersionSet,
161
162    /// The TLS configuration used to establish the connection. By default,
163    /// this field is empty. Enabling TLS can be done by first enabling the
164    /// `tls` feature in the crate and then using one of the dedicated
165    /// functions of [`OptionsT`], such as [`OptionsT::enable_tls()`].
166    pub connection_cfg: T,
167
168    /// The maximum amount of time that can be spent looking up the broker's
169    /// hostname.
170    /// 
171    /// By default, this is set to 3 seconds.
172    pub dns_timeout: Duration,
173
174    /// The maximum amount of time that can be spent establishing the TCP
175    /// connection.
176    /// 
177    /// By default, this is set to 10 seconds.
178    pub tcp_connect_timeout: Duration,
179
180    /// The maximum amount of time waiting for the broker's `CONNACK` packet.
181    /// 
182    /// By default, this is set to 3 seconds.
183    pub mqtt_connect_timeout: Duration,
184
185    /// How long the client should wait before re-sending a packet if no
186    /// acknowledment packet is received. This delay has a low precision.
187    /// 
188    /// Default (and minimum) is 1 second.
189    pub packets_resend_delay: Duration,
190
191    /// Default port to use if none is specified in the hostname string.
192    /// This field is not accessible. Use [`OptionsT::set_default_port()`]
193    /// to change it and [`OptionsT::default_port()`] to access it.
194    /// 
195    /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
196    default_port: u16,
197
198    /// True if [`Self::default_port`] has been changed through a call to
199    /// [`OptionsT::set_default_port()`]. This is used to switch the default
200    /// port to 8883 when enabling TLS, if the developer didn't change it
201    /// beforehand.
202    default_port_changed: bool
203}
204
205/// Options with TLS disabled. See [`OptionsT`] for documentation.
206pub type Options<'a> = OptionsT<'a, ()>;
207
208impl<'a> Options<'a>
209{
210    /// Creates new options with the default values and TLS disabled. TLS
211    /// can be enabled later on using one of the functions of [`OptionsT`]
212    /// such as [`OptionsT::enable_tls()`] (available only if the `tls`
213    /// feature of the crate is enabled).
214    pub fn new(client_id: &'a str) -> Self
215    {
216        Self {
217            clean_session: false,
218            keep_alive: 30,
219            client_id: client_id.into(),
220            last_will: None,
221            username: None,
222            password: None,
223            no_delay: false,
224            enabled_ip_versions: IpVersionSet::all(),
225            connection_cfg: (),
226            dns_timeout: Duration::from_secs(3),
227            tcp_connect_timeout: Duration::from_secs(10),
228            mqtt_connect_timeout: Duration::from_secs(3),
229            packets_resend_delay: Duration::from_secs(1),
230            default_port: 1883,
231            default_port_changed: false
232        }
233    }
234}
235
236impl<'a, T> OptionsT<'a, T>
237{
238    /// Changes the type of the [`Self::tls_config`] field.
239    pub(super) fn map_connection_cfg<O, F>(self, f: F) -> OptionsT<'a, O>
240    where F: FnOnce(T) -> O
241    {
242        //Need `type_changing_struct_update` to be stable before we can use `..self`
243
244        OptionsT {
245            clean_session: self.clean_session,
246            keep_alive: self.keep_alive,
247            client_id: self.client_id,
248            last_will: self.last_will,
249            username: self.username,
250            password: self.password,
251            no_delay: self.no_delay,
252            enabled_ip_versions: self.enabled_ip_versions,
253            connection_cfg: f(self.connection_cfg),
254            dns_timeout: self.dns_timeout,
255            tcp_connect_timeout: self.tcp_connect_timeout,
256            mqtt_connect_timeout: self.mqtt_connect_timeout,
257            packets_resend_delay: self.packets_resend_delay,
258            default_port: self.default_port,
259            default_port_changed: self.default_port_changed
260        }
261    }
262
263    /// Sets the [`Self::clean_session`] flag to `true`. See the corresponding
264    /// field's documentation for more information.
265    pub fn set_clean_session(&mut self) -> &mut Self
266    {
267        self.clean_session = true;
268        self
269    }
270
271    /// Changes [`Self::keep_alive`] to the specified value, in seconds. See
272    /// the corresponding field's documentation for more information.
273    pub fn set_keep_alive(&mut self, keep_alive: u16) -> &mut Self
274    {
275        self.keep_alive = keep_alive;
276        self
277    }
278
279    /// Enables MQTT's last will functionality with the specified settings.
280    /// See [`Self::last_will`] for more information.
281    pub fn set_last_will_ex(&mut self, last_will: LastWill<'a>) -> &mut Self
282    {
283        self.last_will = Some(last_will);
284        self
285    }
286
287    /// Enables MQTT's last will functionality with the specified settings.
288    /// See [`Self::last_will`] for more information.
289    pub fn set_last_will(&mut self, topic: &'a str, message: &'a [u8], retain: bool, qos: QoS) -> &mut Self
290    {
291        self.set_last_will_ex(LastWill { topic, message, retain, qos })
292    }
293
294    /// Changes [`Self::username`] to the specified value. See the
295    /// corresponding field's documentation for more information.
296    pub fn set_username(&mut self, username: &'a str) -> &mut Self
297    {
298        self.username = Some(username);
299        self
300    }
301
302    /// Changes [`Self::password`] to the specified value. See the
303    /// corresponding field's documentation for more information.
304    pub fn set_password(&mut self, password: &'a [u8]) -> &mut Self
305    {
306        self.password = Some(password);
307        self
308    }
309
310    /// Sets [`Self::no_delay`] to `true`. See the corresponding field's
311    /// documentation for more information.
312    pub fn set_no_delay(&mut self) -> &mut Self
313    {
314        self.no_delay = true;
315        self
316    }
317
318    /// Removes IPv4 from the [`Self::enabled_ip_versions`] set. See the
319    /// corresponding field's documentation for more information.
320    pub fn disable_ipv4(&mut self) -> &mut Self
321    {
322        self.enabled_ip_versions = self.enabled_ip_versions.without_v4();
323        self
324    }
325
326    /// Removes IPv6 from the [`Self::enabled_ip_versions`] set. See the
327    /// corresponding field's documentation for more information.
328    pub fn disable_ipv6(&mut self) -> &mut Self
329    {
330        self.enabled_ip_versions = self.enabled_ip_versions.without_v6();
331        self
332    }
333
334    /// Changes [`Self::dns_timeout`] to the specified value. See the
335    /// corresponding field's documentation for more information.
336    pub fn set_dns_timeout(&mut self, to: Duration) -> &mut Self
337    {
338        self.dns_timeout = to;
339        self
340    }
341
342    /// Changes [`Self::tcp_connect_timeout`] to the specified value. See
343    /// the corresponding field's documentation for more information.
344    pub fn set_tcp_connect_timeout(&mut self, to: Duration) -> &mut Self
345    {
346        self.tcp_connect_timeout = to;
347        self
348    }
349
350    /// Changes [`Self::mqtt_connect_timeout`] to the specified value. See
351    /// the corresponding field's documentation for more information.
352    pub fn set_mqtt_connect_timeout(&mut self, to: Duration) -> &mut Self
353    {
354        self.mqtt_connect_timeout = to;
355        self
356    }
357
358    /// Changes [`Self::dns_timeout`], [`Self::tcp_connect_timeout`], and
359    /// [`Self::mqtt_connect_timeout`] to the specified values. See their
360    /// respective documentation for more information.
361    pub fn set_all_timeouts(&mut self, dns: Duration, tcp_connect: Duration, mqtt_connect: Duration) -> &mut Self
362    {
363        self.dns_timeout = dns;
364        self.tcp_connect_timeout = tcp_connect;
365        self.mqtt_connect_timeout = mqtt_connect;
366
367        self
368    }
369
370    /// Sets [`Self::dns_timeout`], [`Self::tcp_connect_timeout`], and
371    /// [`Self::mqtt_connect_timeout`] to the same specified value. See their
372    /// respective documentation for more information.
373    pub fn set_all_timeouts_to(&mut self, to: Duration) -> &mut Self
374    {
375        self.dns_timeout = to;
376        self.tcp_connect_timeout = to;
377        self.mqtt_connect_timeout = to;
378
379        self
380    }
381
382    /// Changes [`Self::packets_resend_delay`] to the specified value. See
383    /// the corresponding field's documentation for more information.
384    pub fn set_packets_resend_delay(&mut self, delay: Duration) -> &mut Self
385    {
386        self.packets_resend_delay = delay;
387        self
388    }
389
390    /// Changes the default port (the port used if no port is explicitly
391    /// specified in the hostname passed to [`super::Client::connect()`])
392    /// to the specified value.
393    /// 
394    /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
395    pub fn set_default_port(&mut self, port: u16) -> &mut Self
396    {
397        self.default_port = port;
398        self.default_port_changed = true;
399
400        self
401    }
402
403    /// Accesses the default port (the port used if no port is explicitly
404    /// specified in the hostname passed to [`super::Client::connect()`]).
405    /// 
406    /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
407    pub fn default_port(&self) -> u16
408    {
409        self.default_port
410    }
411
412    pub(super) fn separate_connection_cfg(self) -> (OptionsT<'a, ()>, T)
413    {
414        let mut opt = self.map_connection_cfg(Some);
415        let conn_cfg = opt.connection_cfg.take().unwrap();
416
417        (opt.map_connection_cfg(|_| ()), conn_cfg)
418    }
419}
420
421#[cfg(feature = "tls")]
422mod tls {
423    use std::sync::Arc;
424    use std::io::{self, BufReader};
425
426    use rustls::{ClientConfig, ClientConnection, RootCertStore, OwnedTrustAnchor, ServerName};
427    use webpki::TrustAnchor;
428    use tokio::net::TcpStream;
429
430    use crate::tls::{OptionsWithTls, CryptoBytes, CryptoError, Transport as TlsTransport};
431    use crate::errors::ConnectError;
432    use crate::transport::Transport;
433
434    impl super::ConnectionConfig<ClientConnection> for ClientConfig
435    {
436        fn create_connection(self, host: &str) -> Result<ClientConnection, ConnectError>
437        {
438            let server_name = ServerName::try_from(host).map_err(|_| ConnectError::InvalidHostname)?;
439            ClientConnection::new(Arc::new(self), server_name).map_err(ConnectError::TlsError)
440        }
441
442        fn create_transport(stream: TcpStream, connection: ClientConnection) -> Box<dyn Transport>
443        {
444            Box::new(TlsTransport::new(stream, connection))
445        }
446    }
447
448    impl<'a> super::Options<'a>
449    {
450        /// Converts this set of options to options with TLS support using the
451        /// specified TLS configuration. This specific function is for advanced uses
452        /// and requires the `rustls` crate.
453        /// 
454        /// For simple uses, see [`Self::enable_tls()`] and [`Self::enable_tls_custom_ca_cert()`].
455        pub fn enable_tls_ex(self, tls_config: ClientConfig) -> OptionsWithTls<'a>
456        {
457            let mut ret = self.map_connection_cfg(|_| tls_config);
458
459            if !ret.default_port_changed {
460                ret.default_port = 8883;
461            }
462
463            ret
464        }
465
466        /// Converts this set of options to options with TLS support. However,
467        /// this specific function will **SKIP ALL SERVER AUTHENTICITY CHECKS**,
468        /// meaning that the connection WILL **NOT** BE SECURE.
469        /// 
470        /// This is a **dangerous operation** and you should only use it for
471        /// debug purposes if you know what you're doing. As a result, it is
472        /// disabled by default, unless the `dangerous_tls` feature is enabled
473        /// in the crate.
474        /// 
475        /// Please use [`Self::enable_tls()`] and [`Self::enable_tls_custom_ca_cert()`]
476        /// instead.
477        #[cfg(feature = "dangerous_tls")]
478        pub fn enable_dangerous_non_verified_tls(self) -> OptionsWithTls<'a>
479        {
480            use crate::tls::dangerous;
481
482            let tls_config = ClientConfig::builder()
483                .with_safe_defaults()
484                .with_custom_certificate_verifier(Arc::new(dangerous::SkipServerCertVerification))
485                .with_no_client_auth();
486            
487            self.enable_tls_ex(tls_config)
488        }
489
490        /// Converts this set of options to options with TLS support. This
491        /// specific flavour loads the system's certificates to verify the
492        /// server's identity.
493        /// 
494        /// If you would like to provide your own CA certificate, you can
495        /// use the [`Self::enable_tls_custom_ca_cert()`] function.
496        pub fn enable_tls(self) -> io::Result<OptionsWithTls<'a>>
497        {
498            let certs = rustls_native_certs::load_native_certs()?;
499            let mut store = RootCertStore::empty();
500            store.add_parsable_certificates(&certs.into_iter().map(|c| c.0).collect::<Vec<_>>());
501
502            let tls_config = ClientConfig::builder()
503                .with_safe_defaults()
504                .with_root_certificates(store)
505                .with_no_client_auth();
506
507            Ok(self.enable_tls_ex(tls_config))
508        }
509
510        /// Converts this set of options to options with TLS support. This
511        /// specific flavour enables the developer to provide their own
512        /// CA certificate to verify the server's identity.
513        /// 
514        /// If you would like to use your OS's certificate store instead,
515        /// use [`Self::enable_tls()`] instead.
516        /// 
517        /// # Parameters
518        /// 
519        /// `ca_cert_bytes` should contain a reference to the raw bytes of
520        /// your certificate either in DER or PEM format. See [`CryptoBytes`]
521        /// for more information.
522        /// 
523        /// # Return value
524        /// 
525        /// Because the bytes specified in `ca_cert_bytes` can be invalid,
526        /// this function can fail. See [`CryptoError`] for more information.
527        /// 
528        /// # Example
529        /// 
530        #[cfg_attr(feature = "tls", doc = r##"```
531use std::fs;
532use lmc::Options;
533use lmc::tls::CryptoBytes;
534
535let cert_bytes = fs::read("test_data/ca.pem").expect("Failed to load CA certificate bytes");
536
537let opts = Options::new("client_id")
538    .enable_tls_custom_ca_cert(CryptoBytes::Pem(&cert_bytes))
539    .expect("Failed to load the specified TLS certificate");
540```"##)]
541        pub fn enable_tls_custom_ca_cert(self, ca_cert_bytes: CryptoBytes) -> Result<OptionsWithTls<'a>, CryptoError>
542        {
543            let mut store = RootCertStore::empty();
544
545            match ca_cert_bytes {
546                CryptoBytes::Der(bytes) => {
547                    let ca = TrustAnchor::try_from_cert_der(bytes).map_err(CryptoError::BadCert)?;
548                    let owned_ca = OwnedTrustAnchor::from_subject_spki_name_constraints(
549                        ca.subject,
550                        ca.spki,
551                        ca.name_constraints,
552                    );
553
554                    store.roots.push(owned_ca);
555                },
556                CryptoBytes::Pem(bytes) => {
557                    let mut rd = BufReader::new(bytes);
558                    let certs = rustls_pemfile::certs(&mut rd).map_err(CryptoError::IoError)?;
559
560                    if store.add_parsable_certificates(&certs).0 < 1 {
561                        return Err(CryptoError::NoValidItemsInPem);
562                    }
563                }
564            }
565
566            let tls_config = ClientConfig::builder()
567                .with_safe_defaults()
568                .with_root_certificates(store)
569                .with_no_client_auth();
570
571            Ok(self.enable_tls_ex(tls_config))
572        }
573    }
574}