mqtt_async_client/client/
builder.rs

1use crate::{
2    client::{
3        Client,
4        ClientOptions,
5        client::ConnectionMode,
6        KeepAlive,
7    },
8    Error, Result,
9    util::{
10        TokioRuntime,
11    }
12};
13
14use url::Url;
15
16#[cfg(any(feature = "tls", feature = "websocket"))]
17use ::rustls;
18#[cfg(feature = "tls")]
19use std::sync::Arc;
20use tokio::time::Duration;
21
22/// A fluent builder interface to configure a Client.
23///
24/// Note that you must call `.set_url()` or `.set_url_string()` to
25/// configure a host to connect to before you call `.build()`.
26///
27/// To configure TLS options, pass an appropriate
28/// `rustls::ClientConfig` instance to
29/// `ClientBuilder.set_tls_client_config()`. Check out
30/// `rustls::ClientConfig.set_single_client_cert()` to set a TLS
31/// client certificate and key, and `rustls::ClientConfig.root_store`
32/// to set accepted CA roots for the server certificate. See the
33/// example `mqttc` in this repository for uses of all of these.
34#[derive(Default)]
35pub struct ClientBuilder {
36    url: Option<Url>,
37    username: Option<String>,
38    password: Option<Vec<u8>>,
39    keep_alive: Option<KeepAlive>,
40    runtime: TokioRuntime,
41    client_id: Option<String>,
42    packet_buffer_len: Option<usize>,
43    max_packet_len: Option<usize>,
44    operation_timeout: Option<Duration>,
45    connection_mode: ConnectionMode,
46    automatic_connect: Option<bool>,
47    connect_retry_delay: Option<Duration>,
48}
49
50impl ClientBuilder {
51    /// Build a new `Client` with this configuration.
52    pub fn build(&mut self) -> Result<Client> {
53        Client::new(ClientOptions {
54            url: self
55                .url
56                .clone()
57                .ok_or(Error::from("You must set a url for the client"))?,
58            username: self.username.clone(),
59            password: self.password.clone(),
60            keep_alive: self.keep_alive.unwrap_or(KeepAlive::from_secs(30)),
61            runtime: self.runtime.clone(),
62            client_id: self.client_id.clone(),
63            packet_buffer_len: self.packet_buffer_len.unwrap_or(100),
64            max_packet_len: self.max_packet_len.unwrap_or(64 * 1024),
65            operation_timeout: self.operation_timeout.unwrap_or(Duration::from_secs(20)),
66            connection_mode: self.connection_mode.clone(),
67            automatic_connect: self.automatic_connect.unwrap_or(true),
68            connect_retry_delay: self.connect_retry_delay.unwrap_or(Duration::from_secs(30)),
69        })
70    }
71
72    /// Set the destination url for this mqtt connection to the given string (returning an error if
73    /// the provided string is not a valid URL).
74    ///
75    /// See [Self::set_url] for more details
76    pub fn set_url_string(&mut self, url: &str) -> Result<&mut Self> {
77        use std::convert::TryFrom;
78        let url = Url::try_from(url).map_err(|e| Error::StdError(Box::new(e)))?;
79        self.set_url(url)
80    }
81
82    /// Set the destination url for this mqtt connection.
83    ///
84    /// Supported schema are:
85    ///   - mqtt: An mqtt session over tcp (default TCP port 1883)
86    ///   - mqtts: An mqtt session over tls (default TCP port 8883)
87    ///   - ws: An mqtt session over a websocket (default TCP port 80, requires cargo feature "websocket")
88    ///   - wss: An mqtt session over a secure websocket (default TCP port 443, requires cargo feature "websocket")
89    ///
90    /// If the selected scheme is mqtts or wss, then it will preserve the previously provided tls
91    /// configuration, if one has been given, or make a new default one otherwise.
92    pub fn set_url(&mut self, url: Url) -> Result<&mut Self> {
93        #[cfg(any(feature = "tls", feature = "websocket"))]
94        let rustls_config = match &self.connection_mode {
95            #[cfg(feature = "tls")]
96            ConnectionMode::Tls(config) => config.clone(),
97            #[cfg(feature = "websocket")]
98            ConnectionMode::WebsocketSecure(config) => config.clone(),
99            _ => Arc::new(rustls::ClientConfig::new()),
100        };
101        self.connection_mode = match url.scheme() {
102            "mqtt" => ConnectionMode::Tcp,
103            #[cfg(feature = "tls")]
104            "mqtts" => ConnectionMode::Tls(rustls_config),
105            #[cfg(feature = "websocket")]
106            "ws" => ConnectionMode::Websocket,
107            #[cfg(feature = "websocket")]
108            "wss" => ConnectionMode::WebsocketSecure(rustls_config),
109            scheme => return Err(Error::String(format!("Unsupported scheme: {}", scheme))),
110        };
111        self.url = Some(url);
112        Ok(self)
113    }
114
115    /// Set username to authenticate with.
116    ///
117    /// The default value is no username.
118    pub fn set_username(&mut self, username: Option<String>) -> &mut Self {
119        self.username = username;
120        self
121    }
122
123    /// Set password to authenticate with.
124    ///
125    /// The default is no password.
126    pub fn set_password(&mut self, password: Option<Vec<u8>>) -> &mut Self {
127        self.password = password;
128        self
129    }
130
131    /// Set keep alive time.
132    ///
133    /// This controls how often ping requests are sent when the connection is idle.
134    /// See [MQTT 3.1.1 specification section 3.1.2.10](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/csprd02/mqtt-v3.1.1-csprd02.html#_Keep_Alive)
135    ///
136    /// The default value is 30 seconds.
137    pub fn set_keep_alive(&mut self, keep_alive: KeepAlive) -> &mut Self {
138        self.keep_alive = Some(keep_alive);
139        self
140    }
141
142    /// Set the tokio runtime to spawn background tasks onto.
143    ///
144    /// The default is to use the default tokio runtime, i.e. `tokio::spawn()`.
145    pub fn set_tokio_runtime(&mut self, rt: TokioRuntime) -> &mut Self {
146        self.runtime = rt;
147        self
148    }
149
150    /// Set the ClientId to connect with.
151    pub fn set_client_id(&mut self, client_id: Option<String>) -> &mut Self {
152        self.client_id = client_id;
153        self
154    }
155
156    /// Set the inbound and outbound packet buffer length.
157    ///
158    /// The default is 100.
159    pub fn set_packet_buffer_len(&mut self, packet_buffer_len: usize) -> &mut Self {
160        self.packet_buffer_len = Some(packet_buffer_len);
161        self
162    }
163
164    /// Set the maximum packet length.
165    ///
166    /// The default is 64 * 1024 bytes.
167    pub fn set_max_packet_len(&mut self, max_packet_len: usize) -> &mut Self {
168        self.max_packet_len = Some(max_packet_len);
169        self
170    }
171
172    /// Set the timeout for operations.
173    ///
174    /// The default is 20 seconds.
175    pub fn set_operation_timeout(&mut self, operation_timeout: Duration) -> &mut Self {
176        self.operation_timeout = Some(operation_timeout);
177        self
178    }
179
180    /// Set the TLS ClientConfig for the client-server connection.
181    ///
182    /// Enables TLS. By default TLS is disabled.
183    #[cfg(feature = "tls")]
184    pub fn set_tls_client_config(&mut self, tls_client_config: rustls::ClientConfig) -> &mut Self {
185        match self.connection_mode {
186            ref mut mode @ ConnectionMode::Tcp => {
187                let _ = self.url.as_mut().map(|url| url.set_scheme("mqtts"));
188                *mode = ConnectionMode::Tls(Arc::new(tls_client_config))
189            }
190            ConnectionMode::Tls(ref mut config) => *config = Arc::new(tls_client_config),
191            #[cfg(feature = "websocket")]
192            ref mut mode @ ConnectionMode::Websocket => {
193                *mode = ConnectionMode::WebsocketSecure(Arc::new(tls_client_config))
194            }
195            #[cfg(feature = "websocket")]
196            ConnectionMode::WebsocketSecure(ref mut config) => {
197                let _ = self.url.as_mut().map(|url| url.set_scheme("https"));
198                *config = Arc::new(tls_client_config)
199            }
200        }
201        self
202    }
203
204    #[cfg(feature = "websocket")]
205    /// Set the connection to use a websocket
206    pub fn set_websocket(&mut self) -> &mut Self {
207        self.connection_mode = ConnectionMode::Websocket;
208        self
209    }
210
211    /// Sets the connection mode to the given value
212    ///
213    /// The default is to use Tcp
214    pub fn set_connection_mode(&mut self, mode: ConnectionMode) -> &mut Self {
215        self.connection_mode = mode;
216        self
217    }
218
219    /// Set whether to automatically connect and reconnect.
220    ///
221    /// The default is true.
222    pub fn set_automatic_connect(&mut self, automatic_connect: bool) -> &mut Self {
223        self.automatic_connect = Some(automatic_connect);
224        self
225    }
226
227    /// Set the delay between connect retries.
228    ///
229    /// The default is 30s.
230    pub fn set_connect_retry_delay(&mut self, connect_retry_delay: Duration) -> &mut Self {
231        self.connect_retry_delay = Some(connect_retry_delay);
232        self
233    }
234}