mqtt_async_client/client/
builder.rs1use crate::{
2 client::{
3 Client,
4 ClientOptions,
5 client::ConnectionMode,
6 KeepAlive,
7 },
8 Error, Result,
9 util::{
10 TokioRuntime,
11 }
12};
13
14use url::Url;
15
16#[cfg(any(feature = "tls", feature = "websocket"))]
17use ::rustls;
18#[cfg(feature = "tls")]
19use std::sync::Arc;
20use tokio::time::Duration;
21
22#[derive(Default)]
35pub struct ClientBuilder {
36 url: Option<Url>,
37 username: Option<String>,
38 password: Option<Vec<u8>>,
39 keep_alive: Option<KeepAlive>,
40 runtime: TokioRuntime,
41 client_id: Option<String>,
42 packet_buffer_len: Option<usize>,
43 max_packet_len: Option<usize>,
44 operation_timeout: Option<Duration>,
45 connection_mode: ConnectionMode,
46 automatic_connect: Option<bool>,
47 connect_retry_delay: Option<Duration>,
48}
49
50impl ClientBuilder {
51 pub fn build(&mut self) -> Result<Client> {
53 Client::new(ClientOptions {
54 url: self
55 .url
56 .clone()
57 .ok_or(Error::from("You must set a url for the client"))?,
58 username: self.username.clone(),
59 password: self.password.clone(),
60 keep_alive: self.keep_alive.unwrap_or(KeepAlive::from_secs(30)),
61 runtime: self.runtime.clone(),
62 client_id: self.client_id.clone(),
63 packet_buffer_len: self.packet_buffer_len.unwrap_or(100),
64 max_packet_len: self.max_packet_len.unwrap_or(64 * 1024),
65 operation_timeout: self.operation_timeout.unwrap_or(Duration::from_secs(20)),
66 connection_mode: self.connection_mode.clone(),
67 automatic_connect: self.automatic_connect.unwrap_or(true),
68 connect_retry_delay: self.connect_retry_delay.unwrap_or(Duration::from_secs(30)),
69 })
70 }
71
72 pub fn set_url_string(&mut self, url: &str) -> Result<&mut Self> {
77 use std::convert::TryFrom;
78 let url = Url::try_from(url).map_err(|e| Error::StdError(Box::new(e)))?;
79 self.set_url(url)
80 }
81
82 pub fn set_url(&mut self, url: Url) -> Result<&mut Self> {
93 #[cfg(any(feature = "tls", feature = "websocket"))]
94 let rustls_config = match &self.connection_mode {
95 #[cfg(feature = "tls")]
96 ConnectionMode::Tls(config) => config.clone(),
97 #[cfg(feature = "websocket")]
98 ConnectionMode::WebsocketSecure(config) => config.clone(),
99 _ => Arc::new(rustls::ClientConfig::new()),
100 };
101 self.connection_mode = match url.scheme() {
102 "mqtt" => ConnectionMode::Tcp,
103 #[cfg(feature = "tls")]
104 "mqtts" => ConnectionMode::Tls(rustls_config),
105 #[cfg(feature = "websocket")]
106 "ws" => ConnectionMode::Websocket,
107 #[cfg(feature = "websocket")]
108 "wss" => ConnectionMode::WebsocketSecure(rustls_config),
109 scheme => return Err(Error::String(format!("Unsupported scheme: {}", scheme))),
110 };
111 self.url = Some(url);
112 Ok(self)
113 }
114
115 pub fn set_username(&mut self, username: Option<String>) -> &mut Self {
119 self.username = username;
120 self
121 }
122
123 pub fn set_password(&mut self, password: Option<Vec<u8>>) -> &mut Self {
127 self.password = password;
128 self
129 }
130
131 pub fn set_keep_alive(&mut self, keep_alive: KeepAlive) -> &mut Self {
138 self.keep_alive = Some(keep_alive);
139 self
140 }
141
142 pub fn set_tokio_runtime(&mut self, rt: TokioRuntime) -> &mut Self {
146 self.runtime = rt;
147 self
148 }
149
150 pub fn set_client_id(&mut self, client_id: Option<String>) -> &mut Self {
152 self.client_id = client_id;
153 self
154 }
155
156 pub fn set_packet_buffer_len(&mut self, packet_buffer_len: usize) -> &mut Self {
160 self.packet_buffer_len = Some(packet_buffer_len);
161 self
162 }
163
164 pub fn set_max_packet_len(&mut self, max_packet_len: usize) -> &mut Self {
168 self.max_packet_len = Some(max_packet_len);
169 self
170 }
171
172 pub fn set_operation_timeout(&mut self, operation_timeout: Duration) -> &mut Self {
176 self.operation_timeout = Some(operation_timeout);
177 self
178 }
179
180 #[cfg(feature = "tls")]
184 pub fn set_tls_client_config(&mut self, tls_client_config: rustls::ClientConfig) -> &mut Self {
185 match self.connection_mode {
186 ref mut mode @ ConnectionMode::Tcp => {
187 let _ = self.url.as_mut().map(|url| url.set_scheme("mqtts"));
188 *mode = ConnectionMode::Tls(Arc::new(tls_client_config))
189 }
190 ConnectionMode::Tls(ref mut config) => *config = Arc::new(tls_client_config),
191 #[cfg(feature = "websocket")]
192 ref mut mode @ ConnectionMode::Websocket => {
193 *mode = ConnectionMode::WebsocketSecure(Arc::new(tls_client_config))
194 }
195 #[cfg(feature = "websocket")]
196 ConnectionMode::WebsocketSecure(ref mut config) => {
197 let _ = self.url.as_mut().map(|url| url.set_scheme("https"));
198 *config = Arc::new(tls_client_config)
199 }
200 }
201 self
202 }
203
204 #[cfg(feature = "websocket")]
205 pub fn set_websocket(&mut self) -> &mut Self {
207 self.connection_mode = ConnectionMode::Websocket;
208 self
209 }
210
211 pub fn set_connection_mode(&mut self, mode: ConnectionMode) -> &mut Self {
215 self.connection_mode = mode;
216 self
217 }
218
219 pub fn set_automatic_connect(&mut self, automatic_connect: bool) -> &mut Self {
223 self.automatic_connect = Some(automatic_connect);
224 self
225 }
226
227 pub fn set_connect_retry_delay(&mut self, connect_retry_delay: Duration) -> &mut Self {
231 self.connect_retry_delay = Some(connect_retry_delay);
232 self
233 }
234}