lmc/options.rs
1use std::net::SocketAddr;
2use std::time::Duration;
3
4use tokio::net::TcpStream;
5
6use crate::QoS;
7use crate::transport::{Transport, TcpTransport};
8use crate::errors::ConnectError;
9
10/// [`LastWill`] describes a message to be published if the client disconnects unexpectedly,
11/// e.g. if the socket gets closed before a `DISCONNECT` packet could be sent by the client.
12pub struct LastWill<'a>
13{
14 /// The topic to publish in
15 pub topic: &'a str,
16
17 /// The message (aka payload) to publish
18 pub message: &'a [u8],
19
20 /// Whether the last will message should be retained
21 pub retain: bool,
22
23 /// The quality of service of the last will message
24 pub qos: QoS
25}
26
27/// A bitfield used to represent what versions of the Internet Protocol (IP) to enable.
28///
29/// The set is initially constructed with all versions enabled by default (IPv4 and IPV6)
30/// using [`IpVersionSet::all()`].
31#[derive(Debug, Clone, Copy)]
32pub struct IpVersionSet(u8);
33
34impl IpVersionSet
35{
36 const V4_BIT: u8 = 1;
37 const V6_BIT: u8 = 2;
38
39 /// Constructs an [`IpVersionSet`] with both IPv4 and IPv6 enabled.
40 pub fn all() -> Self
41 {
42 Self(Self::V4_BIT | Self::V6_BIT)
43 }
44
45 /// Creates a copy of this set that excludes IPv4.
46 pub fn without_v4(self) -> Self
47 {
48 Self(self.0 & !Self::V4_BIT)
49 }
50
51 /// Creates a copy of this set that excludes IPv6.
52 pub fn without_v6(self) -> Self
53 {
54 Self(self.0 & !Self::V6_BIT)
55 }
56
57 /// Checks whether the IP version associated with the specified [`SocketAddr`] is
58 /// included in this set.
59 pub fn supports(self, addr: &SocketAddr) -> bool
60 {
61 match addr {
62 SocketAddr::V4(_) => (self.0 & Self::V4_BIT) != 0,
63 SocketAddr::V6(_) => (self.0 & Self::V6_BIT) != 0
64 }
65 }
66}
67
68/// A trait to wrap the creation of transports. Mainly used for TLS purposes.
69pub trait ConnectionConfig<T>
70{
71 /// Creates a "connection" tied to the specified hostname. Does nothing
72 /// for non-TLS connections.
73 fn create_connection(self, host: &str) -> Result<T, ConnectError>;
74
75 /// Creates the transport based on a [`TcpStream`] and the previously-created
76 /// connection.
77 fn create_transport(stream: TcpStream, connection: T) -> Box<dyn Transport>;
78}
79
80impl ConnectionConfig<()> for ()
81{
82 fn create_connection(self, _host: &str) -> Result<(), ConnectError>
83 {
84 Ok(())
85 }
86
87 fn create_transport(stream: TcpStream, _connection: ()) -> Box<dyn Transport>
88 {
89 Box::new(TcpTransport::new(stream))
90 }
91}
92
93/// A struct containing all the MQTT protocol & implementation settings. To create this
94/// structure with its default values, use [`Options::new()`](OptionsT::new()).
95///
96/// # Example
97///
98#[cfg_attr(feature = "tls", doc = r##"```
99use lmc::{Options, QoS};
100
101let mut opts = Options::new("client_id")
102 .enable_tls()
103 .expect("Failed to load native system TLS certificates");
104
105opts.set_last_will("status", b"unexpected_disconnect", true, QoS::AtLeastOnce)
106 .set_keep_alive(10)
107 .set_clean_session()
108 .set_no_delay();
109```"##)]
110pub struct OptionsT<'a, T>
111{
112 /// Whether to establish a persistent session or not. If this is set to `true`,
113 /// then the broker will drop any pre-existing session information associated
114 /// with the specified [`Self::client_id`]. This means that when subscribing to
115 /// topics, any retained messages will be (re-)transmitted to this client. In
116 /// addition, the broker will not store any information about this session.
117 ///
118 /// Default is `false`.
119 pub clean_session: bool,
120
121 /// Time interval (in seconds) between any packet and a `PING` requests.
122 ///
123 /// Default is 30 seconds.
124 pub keep_alive: u16,
125
126 /// A string identifying this client. The broker may choose to use soemthing
127 /// different.
128 ///
129 /// It is also used by the broker to establish a persistent session (should
130 /// [`Self::clean_session`] be `false`).
131 pub client_id: &'a str,
132
133 /// The [`LastWill`] is used to publish a message if the client connection ends
134 /// in an unexpected manner. This is optional.
135 ///
136 /// Default is `None`.
137 pub last_will: Option<LastWill<'a>>,
138
139 /// An optional username used to authenticate the client to the broker.
140 ///
141 /// Default is `None`.
142 pub username: Option<&'a str>,
143
144 /// An optional password used to authenticate the client to the broker.
145 ///
146 /// Default is `None`.
147 pub password: Option<&'a [u8]>,
148
149 /// Sets the "no delay" flag to the TCP connection.
150 ///
151 /// Default is `false`.
152 ///
153 /// See [`std::net::TcpStream::set_nodelay()`]
154 pub no_delay: bool,
155
156 /// A set specifying which Internet Protocol versions to use (IPv4 and/or
157 /// IPv6) to establish the TCP connection.
158 ///
159 /// By default, both IP versions are enabled.
160 pub enabled_ip_versions: IpVersionSet,
161
162 /// The TLS configuration used to establish the connection. By default,
163 /// this field is empty. Enabling TLS can be done by first enabling the
164 /// `tls` feature in the crate and then using one of the dedicated
165 /// functions of [`OptionsT`], such as [`OptionsT::enable_tls()`].
166 pub connection_cfg: T,
167
168 /// The maximum amount of time that can be spent looking up the broker's
169 /// hostname.
170 ///
171 /// By default, this is set to 3 seconds.
172 pub dns_timeout: Duration,
173
174 /// The maximum amount of time that can be spent establishing the TCP
175 /// connection.
176 ///
177 /// By default, this is set to 10 seconds.
178 pub tcp_connect_timeout: Duration,
179
180 /// The maximum amount of time waiting for the broker's `CONNACK` packet.
181 ///
182 /// By default, this is set to 3 seconds.
183 pub mqtt_connect_timeout: Duration,
184
185 /// How long the client should wait before re-sending a packet if no
186 /// acknowledment packet is received. This delay has a low precision.
187 ///
188 /// Default (and minimum) is 1 second.
189 pub packets_resend_delay: Duration,
190
191 /// Default port to use if none is specified in the hostname string.
192 /// This field is not accessible. Use [`OptionsT::set_default_port()`]
193 /// to change it and [`OptionsT::default_port()`] to access it.
194 ///
195 /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
196 default_port: u16,
197
198 /// True if [`Self::default_port`] has been changed through a call to
199 /// [`OptionsT::set_default_port()`]. This is used to switch the default
200 /// port to 8883 when enabling TLS, if the developer didn't change it
201 /// beforehand.
202 default_port_changed: bool
203}
204
205/// Options with TLS disabled. See [`OptionsT`] for documentation.
206pub type Options<'a> = OptionsT<'a, ()>;
207
208impl<'a> Options<'a>
209{
210 /// Creates new options with the default values and TLS disabled. TLS
211 /// can be enabled later on using one of the functions of [`OptionsT`]
212 /// such as [`OptionsT::enable_tls()`] (available only if the `tls`
213 /// feature of the crate is enabled).
214 pub fn new(client_id: &'a str) -> Self
215 {
216 Self {
217 clean_session: false,
218 keep_alive: 30,
219 client_id: client_id.into(),
220 last_will: None,
221 username: None,
222 password: None,
223 no_delay: false,
224 enabled_ip_versions: IpVersionSet::all(),
225 connection_cfg: (),
226 dns_timeout: Duration::from_secs(3),
227 tcp_connect_timeout: Duration::from_secs(10),
228 mqtt_connect_timeout: Duration::from_secs(3),
229 packets_resend_delay: Duration::from_secs(1),
230 default_port: 1883,
231 default_port_changed: false
232 }
233 }
234}
235
236impl<'a, T> OptionsT<'a, T>
237{
238 /// Changes the type of the [`Self::tls_config`] field.
239 pub(super) fn map_connection_cfg<O, F>(self, f: F) -> OptionsT<'a, O>
240 where F: FnOnce(T) -> O
241 {
242 //Need `type_changing_struct_update` to be stable before we can use `..self`
243
244 OptionsT {
245 clean_session: self.clean_session,
246 keep_alive: self.keep_alive,
247 client_id: self.client_id,
248 last_will: self.last_will,
249 username: self.username,
250 password: self.password,
251 no_delay: self.no_delay,
252 enabled_ip_versions: self.enabled_ip_versions,
253 connection_cfg: f(self.connection_cfg),
254 dns_timeout: self.dns_timeout,
255 tcp_connect_timeout: self.tcp_connect_timeout,
256 mqtt_connect_timeout: self.mqtt_connect_timeout,
257 packets_resend_delay: self.packets_resend_delay,
258 default_port: self.default_port,
259 default_port_changed: self.default_port_changed
260 }
261 }
262
263 /// Sets the [`Self::clean_session`] flag to `true`. See the corresponding
264 /// field's documentation for more information.
265 pub fn set_clean_session(&mut self) -> &mut Self
266 {
267 self.clean_session = true;
268 self
269 }
270
271 /// Changes [`Self::keep_alive`] to the specified value, in seconds. See
272 /// the corresponding field's documentation for more information.
273 pub fn set_keep_alive(&mut self, keep_alive: u16) -> &mut Self
274 {
275 self.keep_alive = keep_alive;
276 self
277 }
278
279 /// Enables MQTT's last will functionality with the specified settings.
280 /// See [`Self::last_will`] for more information.
281 pub fn set_last_will_ex(&mut self, last_will: LastWill<'a>) -> &mut Self
282 {
283 self.last_will = Some(last_will);
284 self
285 }
286
287 /// Enables MQTT's last will functionality with the specified settings.
288 /// See [`Self::last_will`] for more information.
289 pub fn set_last_will(&mut self, topic: &'a str, message: &'a [u8], retain: bool, qos: QoS) -> &mut Self
290 {
291 self.set_last_will_ex(LastWill { topic, message, retain, qos })
292 }
293
294 /// Changes [`Self::username`] to the specified value. See the
295 /// corresponding field's documentation for more information.
296 pub fn set_username(&mut self, username: &'a str) -> &mut Self
297 {
298 self.username = Some(username);
299 self
300 }
301
302 /// Changes [`Self::password`] to the specified value. See the
303 /// corresponding field's documentation for more information.
304 pub fn set_password(&mut self, password: &'a [u8]) -> &mut Self
305 {
306 self.password = Some(password);
307 self
308 }
309
310 /// Sets [`Self::no_delay`] to `true`. See the corresponding field's
311 /// documentation for more information.
312 pub fn set_no_delay(&mut self) -> &mut Self
313 {
314 self.no_delay = true;
315 self
316 }
317
318 /// Removes IPv4 from the [`Self::enabled_ip_versions`] set. See the
319 /// corresponding field's documentation for more information.
320 pub fn disable_ipv4(&mut self) -> &mut Self
321 {
322 self.enabled_ip_versions = self.enabled_ip_versions.without_v4();
323 self
324 }
325
326 /// Removes IPv6 from the [`Self::enabled_ip_versions`] set. See the
327 /// corresponding field's documentation for more information.
328 pub fn disable_ipv6(&mut self) -> &mut Self
329 {
330 self.enabled_ip_versions = self.enabled_ip_versions.without_v6();
331 self
332 }
333
334 /// Changes [`Self::dns_timeout`] to the specified value. See the
335 /// corresponding field's documentation for more information.
336 pub fn set_dns_timeout(&mut self, to: Duration) -> &mut Self
337 {
338 self.dns_timeout = to;
339 self
340 }
341
342 /// Changes [`Self::tcp_connect_timeout`] to the specified value. See
343 /// the corresponding field's documentation for more information.
344 pub fn set_tcp_connect_timeout(&mut self, to: Duration) -> &mut Self
345 {
346 self.tcp_connect_timeout = to;
347 self
348 }
349
350 /// Changes [`Self::mqtt_connect_timeout`] to the specified value. See
351 /// the corresponding field's documentation for more information.
352 pub fn set_mqtt_connect_timeout(&mut self, to: Duration) -> &mut Self
353 {
354 self.mqtt_connect_timeout = to;
355 self
356 }
357
358 /// Changes [`Self::dns_timeout`], [`Self::tcp_connect_timeout`], and
359 /// [`Self::mqtt_connect_timeout`] to the specified values. See their
360 /// respective documentation for more information.
361 pub fn set_all_timeouts(&mut self, dns: Duration, tcp_connect: Duration, mqtt_connect: Duration) -> &mut Self
362 {
363 self.dns_timeout = dns;
364 self.tcp_connect_timeout = tcp_connect;
365 self.mqtt_connect_timeout = mqtt_connect;
366
367 self
368 }
369
370 /// Sets [`Self::dns_timeout`], [`Self::tcp_connect_timeout`], and
371 /// [`Self::mqtt_connect_timeout`] to the same specified value. See their
372 /// respective documentation for more information.
373 pub fn set_all_timeouts_to(&mut self, to: Duration) -> &mut Self
374 {
375 self.dns_timeout = to;
376 self.tcp_connect_timeout = to;
377 self.mqtt_connect_timeout = to;
378
379 self
380 }
381
382 /// Changes [`Self::packets_resend_delay`] to the specified value. See
383 /// the corresponding field's documentation for more information.
384 pub fn set_packets_resend_delay(&mut self, delay: Duration) -> &mut Self
385 {
386 self.packets_resend_delay = delay;
387 self
388 }
389
390 /// Changes the default port (the port used if no port is explicitly
391 /// specified in the hostname passed to [`super::Client::connect()`])
392 /// to the specified value.
393 ///
394 /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
395 pub fn set_default_port(&mut self, port: u16) -> &mut Self
396 {
397 self.default_port = port;
398 self.default_port_changed = true;
399
400 self
401 }
402
403 /// Accesses the default port (the port used if no port is explicitly
404 /// specified in the hostname passed to [`super::Client::connect()`]).
405 ///
406 /// Default is 1883 when TLS is **not** used and 8883 when TLS is **in use**.
407 pub fn default_port(&self) -> u16
408 {
409 self.default_port
410 }
411
412 pub(super) fn separate_connection_cfg(self) -> (OptionsT<'a, ()>, T)
413 {
414 let mut opt = self.map_connection_cfg(Some);
415 let conn_cfg = opt.connection_cfg.take().unwrap();
416
417 (opt.map_connection_cfg(|_| ()), conn_cfg)
418 }
419}
420
421#[cfg(feature = "tls")]
422mod tls {
423 use std::sync::Arc;
424 use std::io::{self, BufReader};
425
426 use rustls::{ClientConfig, ClientConnection, RootCertStore, OwnedTrustAnchor, ServerName};
427 use webpki::TrustAnchor;
428 use tokio::net::TcpStream;
429
430 use crate::tls::{OptionsWithTls, CryptoBytes, CryptoError, Transport as TlsTransport};
431 use crate::errors::ConnectError;
432 use crate::transport::Transport;
433
434 impl super::ConnectionConfig<ClientConnection> for ClientConfig
435 {
436 fn create_connection(self, host: &str) -> Result<ClientConnection, ConnectError>
437 {
438 let server_name = ServerName::try_from(host).map_err(|_| ConnectError::InvalidHostname)?;
439 ClientConnection::new(Arc::new(self), server_name).map_err(ConnectError::TlsError)
440 }
441
442 fn create_transport(stream: TcpStream, connection: ClientConnection) -> Box<dyn Transport>
443 {
444 Box::new(TlsTransport::new(stream, connection))
445 }
446 }
447
448 impl<'a> super::Options<'a>
449 {
450 /// Converts this set of options to options with TLS support using the
451 /// specified TLS configuration. This specific function is for advanced uses
452 /// and requires the `rustls` crate.
453 ///
454 /// For simple uses, see [`Self::enable_tls()`] and [`Self::enable_tls_custom_ca_cert()`].
455 pub fn enable_tls_ex(self, tls_config: ClientConfig) -> OptionsWithTls<'a>
456 {
457 let mut ret = self.map_connection_cfg(|_| tls_config);
458
459 if !ret.default_port_changed {
460 ret.default_port = 8883;
461 }
462
463 ret
464 }
465
466 /// Converts this set of options to options with TLS support. However,
467 /// this specific function will **SKIP ALL SERVER AUTHENTICITY CHECKS**,
468 /// meaning that the connection WILL **NOT** BE SECURE.
469 ///
470 /// This is a **dangerous operation** and you should only use it for
471 /// debug purposes if you know what you're doing. As a result, it is
472 /// disabled by default, unless the `dangerous_tls` feature is enabled
473 /// in the crate.
474 ///
475 /// Please use [`Self::enable_tls()`] and [`Self::enable_tls_custom_ca_cert()`]
476 /// instead.
477 #[cfg(feature = "dangerous_tls")]
478 pub fn enable_dangerous_non_verified_tls(self) -> OptionsWithTls<'a>
479 {
480 use crate::tls::dangerous;
481
482 let tls_config = ClientConfig::builder()
483 .with_safe_defaults()
484 .with_custom_certificate_verifier(Arc::new(dangerous::SkipServerCertVerification))
485 .with_no_client_auth();
486
487 self.enable_tls_ex(tls_config)
488 }
489
490 /// Converts this set of options to options with TLS support. This
491 /// specific flavour loads the system's certificates to verify the
492 /// server's identity.
493 ///
494 /// If you would like to provide your own CA certificate, you can
495 /// use the [`Self::enable_tls_custom_ca_cert()`] function.
496 pub fn enable_tls(self) -> io::Result<OptionsWithTls<'a>>
497 {
498 let certs = rustls_native_certs::load_native_certs()?;
499 let mut store = RootCertStore::empty();
500 store.add_parsable_certificates(&certs.into_iter().map(|c| c.0).collect::<Vec<_>>());
501
502 let tls_config = ClientConfig::builder()
503 .with_safe_defaults()
504 .with_root_certificates(store)
505 .with_no_client_auth();
506
507 Ok(self.enable_tls_ex(tls_config))
508 }
509
510 /// Converts this set of options to options with TLS support. This
511 /// specific flavour enables the developer to provide their own
512 /// CA certificate to verify the server's identity.
513 ///
514 /// If you would like to use your OS's certificate store instead,
515 /// use [`Self::enable_tls()`] instead.
516 ///
517 /// # Parameters
518 ///
519 /// `ca_cert_bytes` should contain a reference to the raw bytes of
520 /// your certificate either in DER or PEM format. See [`CryptoBytes`]
521 /// for more information.
522 ///
523 /// # Return value
524 ///
525 /// Because the bytes specified in `ca_cert_bytes` can be invalid,
526 /// this function can fail. See [`CryptoError`] for more information.
527 ///
528 /// # Example
529 ///
530 #[cfg_attr(feature = "tls", doc = r##"```
531use std::fs;
532use lmc::Options;
533use lmc::tls::CryptoBytes;
534
535let cert_bytes = fs::read("test_data/ca.pem").expect("Failed to load CA certificate bytes");
536
537let opts = Options::new("client_id")
538 .enable_tls_custom_ca_cert(CryptoBytes::Pem(&cert_bytes))
539 .expect("Failed to load the specified TLS certificate");
540```"##)]
541 pub fn enable_tls_custom_ca_cert(self, ca_cert_bytes: CryptoBytes) -> Result<OptionsWithTls<'a>, CryptoError>
542 {
543 let mut store = RootCertStore::empty();
544
545 match ca_cert_bytes {
546 CryptoBytes::Der(bytes) => {
547 let ca = TrustAnchor::try_from_cert_der(bytes).map_err(CryptoError::BadCert)?;
548 let owned_ca = OwnedTrustAnchor::from_subject_spki_name_constraints(
549 ca.subject,
550 ca.spki,
551 ca.name_constraints,
552 );
553
554 store.roots.push(owned_ca);
555 },
556 CryptoBytes::Pem(bytes) => {
557 let mut rd = BufReader::new(bytes);
558 let certs = rustls_pemfile::certs(&mut rd).map_err(CryptoError::IoError)?;
559
560 if store.add_parsable_certificates(&certs).0 < 1 {
561 return Err(CryptoError::NoValidItemsInPem);
562 }
563 }
564 }
565
566 let tls_config = ClientConfig::builder()
567 .with_safe_defaults()
568 .with_root_certificates(store)
569 .with_no_client_auth();
570
571 Ok(self.enable_tls_ex(tls_config))
572 }
573 }
574}