async_nats/options.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17#[cfg(feature = "nkeys")]
18use base64::engine::general_purpose::URL_SAFE_NO_PAD;
19#[cfg(feature = "nkeys")]
20use base64::engine::Engine;
21use futures_util::Future;
22use std::fmt::Formatter;
23use std::net::SocketAddr;
24#[cfg(feature = "nkeys")]
25use std::path::Path;
26use std::{fmt, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
27#[cfg(feature = "nkeys")]
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37/// .require_tls(true)
38/// .ping_interval(std::time::Duration::from_secs(10))
39/// .connect("demo.nats.io")
40/// .await?;
41/// # Ok(())
42/// # }
43/// ```
44#[derive(Clone)]
45pub struct ConnectOptions {
46 pub(crate) name: Option<String>,
47 pub(crate) no_echo: bool,
48 pub(crate) max_reconnects: Option<usize>,
49 pub(crate) connection_timeout: Duration,
50 pub(crate) auth: Auth,
51 pub(crate) tls_required: bool,
52 pub(crate) tls_first: bool,
53 pub(crate) certificates: Vec<PathBuf>,
54 pub(crate) client_cert: Option<PathBuf>,
55 pub(crate) client_key: Option<PathBuf>,
56 pub(crate) tls_client_config: Option<rustls::ClientConfig>,
57 pub(crate) ping_interval: Duration,
58 pub(crate) subscription_capacity: usize,
59 pub(crate) sender_capacity: usize,
60 pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
61 pub(crate) inbox_prefix: String,
62 pub(crate) request_timeout: Option<Duration>,
63 pub(crate) retry_on_initial_connect: bool,
64 pub(crate) ignore_discovered_servers: bool,
65 pub(crate) retain_servers_order: bool,
66 pub(crate) read_buffer_capacity: u16,
67 pub(crate) reconnect_delay_callback: Arc<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
68 pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
69 pub(crate) skip_subject_validation: bool,
70 pub(crate) local_address: Option<SocketAddr>,
71}
72
73impl fmt::Debug for ConnectOptions {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
75 f.debug_map()
76 .entry(&"name", &self.name)
77 .entry(&"no_echo", &self.no_echo)
78 .entry(&"max_reconnects", &self.max_reconnects)
79 .entry(&"connection_timeout", &self.connection_timeout)
80 .entry(&"tls_required", &self.tls_required)
81 .entry(&"certificates", &self.certificates)
82 .entry(&"client_cert", &self.client_cert)
83 .entry(&"client_key", &self.client_key)
84 .entry(&"tls_client_config", &"XXXXXXXX")
85 .entry(&"tls_first", &self.tls_first)
86 .entry(&"ping_interval", &self.ping_interval)
87 .entry(&"sender_capacity", &self.sender_capacity)
88 .entry(&"inbox_prefix", &self.inbox_prefix)
89 .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
90 .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
91 .entry(&"skip_subject_validation", &self.skip_subject_validation)
92 .finish()
93 }
94}
95
96impl Default for ConnectOptions {
97 fn default() -> ConnectOptions {
98 ConnectOptions {
99 name: None,
100 no_echo: false,
101 max_reconnects: None,
102 connection_timeout: Duration::from_secs(5),
103 tls_required: false,
104 tls_first: false,
105 certificates: Vec::new(),
106 client_cert: None,
107 client_key: None,
108 tls_client_config: None,
109 ping_interval: Duration::from_secs(60),
110 sender_capacity: 2048,
111 subscription_capacity: 1024 * 64,
112 event_callback: None,
113 inbox_prefix: "_INBOX".to_string(),
114 request_timeout: Some(Duration::from_secs(10)),
115 retry_on_initial_connect: false,
116 ignore_discovered_servers: false,
117 retain_servers_order: false,
118 read_buffer_capacity: 65535,
119 reconnect_delay_callback: Arc::new(|attempts| {
120 connector::reconnect_delay_callback_default(attempts)
121 }),
122 auth: Default::default(),
123 auth_callback: None,
124 skip_subject_validation: false,
125 local_address: None,
126 }
127 }
128}
129
130impl ConnectOptions {
131 /// Enables customization of NATS connection.
132 ///
133 /// # Examples
134 /// ```no_run
135 /// # #[tokio::main]
136 /// # async fn main() -> Result<(), async_nats::ConnectError> {
137 /// let mut options = async_nats::ConnectOptions::new()
138 /// .require_tls(true)
139 /// .ping_interval(std::time::Duration::from_secs(10))
140 /// .connect("demo.nats.io")
141 /// .await?;
142 /// # Ok(())
143 /// # }
144 /// ```
145 pub fn new() -> ConnectOptions {
146 ConnectOptions::default()
147 }
148
149 /// Connect to the NATS Server leveraging all passed options.
150 ///
151 /// # Examples
152 /// ```no_run
153 /// # #[tokio::main]
154 /// # async fn main() -> Result<(), async_nats::ConnectError> {
155 /// let nc = async_nats::ConnectOptions::new()
156 /// .require_tls(true)
157 /// .connect("demo.nats.io")
158 /// .await?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// ## Pass multiple URLs.
164 /// ```no_run
165 /// #[tokio::main]
166 /// # async fn main() -> Result<(), async_nats::Error> {
167 /// use async_nats::ServerAddr;
168 /// let client = async_nats::connect(vec![
169 /// "demo.nats.io".parse::<ServerAddr>()?,
170 /// "other.nats.io".parse::<ServerAddr>()?,
171 /// ])
172 /// .await
173 /// .unwrap();
174 /// # Ok(())
175 /// # }
176 /// ```
177 pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
178 crate::connect_with_options(addrs, self).await
179 }
180
181 /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
182 /// Requires an asynchronous function that accepts nonce and returns [Auth].
183 /// It will overwrite all other auth methods used.
184 ///
185 ///
186 /// # Example
187 /// ```no_run
188 /// # #[tokio::main]
189 /// # async fn main() -> Result<(), async_nats::ConnectError> {
190 /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
191 /// let mut auth = async_nats::Auth::new();
192 /// auth.username = Some("derek".to_string());
193 /// auth.password = Some("s3cr3t".to_string());
194 /// Ok(auth)
195 /// })
196 /// .connect("demo.nats.io")
197 /// .await?;
198 /// # Ok(())
199 /// # }
200 /// ```
201 pub fn with_auth_callback<F, Fut>(callback: F) -> Self
202 where
203 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
204 Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
205 {
206 let mut options = ConnectOptions::new();
207 options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Arc::new(
208 move |nonce| Box::pin(callback(nonce)),
209 )));
210 options
211 }
212
213 /// Authenticate against NATS Server with the provided token.
214 ///
215 /// # Examples
216 /// ```no_run
217 /// # #[tokio::main]
218 /// # async fn main() -> Result<(), async_nats::ConnectError> {
219 /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
220 /// .connect("demo.nats.io")
221 /// .await?;
222 /// # Ok(())
223 /// # }
224 /// ```
225 pub fn with_token(token: String) -> Self {
226 ConnectOptions::default().token(token)
227 }
228
229 /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
230 /// This can be used as a way to mix authentication methods.
231 ///
232 /// # Examples
233 /// ```no_run
234 /// # #[tokio::main]
235 /// # async fn main() -> Result<(), async_nats::ConnectError> {
236 /// let nc = async_nats::ConnectOptions::new()
237 /// .token("t0k3n!".into())
238 /// .connect("demo.nats.io")
239 /// .await?;
240 /// # Ok(())
241 /// # }
242 /// ```
243 pub fn token(mut self, token: String) -> Self {
244 self.auth.token = Some(token);
245 self
246 }
247
248 /// Authenticate against NATS Server with the provided username and password.
249 ///
250 /// # Examples
251 /// ```no_run
252 /// # #[tokio::main]
253 /// # async fn main() -> Result<(), async_nats::ConnectError> {
254 /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
255 /// .connect("demo.nats.io")
256 /// .await?;
257 /// # Ok(())
258 /// # }
259 /// ```
260 pub fn with_user_and_password(user: String, pass: String) -> Self {
261 ConnectOptions::default().user_and_password(user, pass)
262 }
263
264 /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
265 /// This can be used as a way to mix authentication methods.
266 ///
267 /// # Examples
268 /// ```no_run
269 /// # #[tokio::main]
270 /// # async fn main() -> Result<(), async_nats::ConnectError> {
271 /// let nc = async_nats::ConnectOptions::new()
272 /// .user_and_password("derek".into(), "s3cr3t!".into())
273 /// .connect("demo.nats.io")
274 /// .await?;
275 /// # Ok(())
276 /// # }
277 /// ```
278 pub fn user_and_password(mut self, user: String, pass: String) -> Self {
279 self.auth.username = Some(user);
280 self.auth.password = Some(pass);
281 self
282 }
283
284 /// Authenticate with an NKey. Requires an NKey Seed secret.
285 ///
286 /// # Example
287 /// ```no_run
288 /// # #[tokio::main]
289 /// # async fn main() -> Result<(), async_nats::ConnectError> {
290 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
291 /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
292 /// .connect("localhost")
293 /// .await?;
294 /// # Ok(())
295 /// # }
296 /// ```
297 #[cfg(feature = "nkeys")]
298 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
299 pub fn with_nkey(seed: String) -> Self {
300 ConnectOptions::default().nkey(seed)
301 }
302
303 /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
304 /// Requires an NKey Seed Secret.
305 /// This can be used as a way to mix authentication methods.
306 ///
307 /// # Example
308 /// ```no_run
309 /// # #[tokio::main]
310 /// # async fn main() -> Result<(), async_nats::ConnectError> {
311 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
312 /// let nc = async_nats::ConnectOptions::new()
313 /// .nkey(seed.into())
314 /// .connect("localhost")
315 /// .await?;
316 /// # Ok(())
317 /// # }
318 /// ```
319 #[cfg(feature = "nkeys")]
320 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
321 pub fn nkey(mut self, seed: String) -> Self {
322 self.auth.nkey = Some(seed);
323 self
324 }
325
326 /// Authenticate with a JWT. Requires function to sign the server nonce.
327 /// The signing function is asynchronous.
328 ///
329 /// # Example
330 /// ```no_run
331 /// # #[tokio::main]
332 /// # async fn main() -> Result<(), async_nats::ConnectError> {
333 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
334 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
335 /// // load jwt from creds file or other secure source
336 /// async fn load_jwt() -> std::io::Result<String> {
337 /// todo!();
338 /// }
339 /// let jwt = load_jwt().await?;
340 /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
341 /// let key_pair = key_pair.clone();
342 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
343 /// })
344 /// .connect("localhost")
345 /// .await?;
346 /// # Ok(())
347 /// # }
348 /// ```
349 #[cfg(feature = "nkeys")]
350 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
351 pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
352 where
353 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
354 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
355 {
356 ConnectOptions::default().jwt(jwt, sign_cb)
357 }
358
359 /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
360 /// Requires an asynchronous function to sign the server nonce.
361 /// This can be used as a way to mix authentication methods.
362 ///
363 ///
364 /// # Example
365 /// ```no_run
366 /// # #[tokio::main]
367 /// # async fn main() -> Result<(), async_nats::ConnectError> {
368 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
369 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
370 /// // load jwt from creds file or other secure source
371 /// async fn load_jwt() -> std::io::Result<String> {
372 /// todo!();
373 /// }
374 /// let jwt = load_jwt().await?;
375 /// let nc = async_nats::ConnectOptions::new()
376 /// .jwt(jwt, move |nonce| {
377 /// let key_pair = key_pair.clone();
378 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
379 /// })
380 /// .connect("localhost")
381 /// .await?;
382 /// # Ok(())
383 /// # }
384 /// ```
385 #[cfg(feature = "nkeys")]
386 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
387 pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
388 where
389 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
390 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
391 {
392 let sign_cb = Arc::new(sign_cb);
393
394 let jwt_sign_callback = CallbackArg1(Arc::new(move |nonce: String| {
395 let sign_cb = sign_cb.clone();
396 Box::pin(async move {
397 let sig = sign_cb(nonce.as_bytes().to_vec())
398 .await
399 .map_err(AuthError::new)?;
400 Ok(URL_SAFE_NO_PAD.encode(sig))
401 })
402 }));
403
404 self.auth.jwt = Some(jwt);
405 self.auth.signature_callback = Some(jwt_sign_callback);
406 self
407 }
408
409 /// Authenticate with NATS using a `.creds` file.
410 /// Open the provided file, load its creds,
411 /// and perform the desired authentication
412 ///
413 /// # Example
414 /// ```no_run
415 /// # #[tokio::main]
416 /// # async fn main() -> Result<(), async_nats::ConnectError> {
417 /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
418 /// .await?
419 /// .connect("connect.ngs.global")
420 /// .await?;
421 /// # Ok(())
422 /// # }
423 /// ```
424 #[cfg(feature = "nkeys")]
425 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
426 pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
427 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
428 Self::with_credentials(&cred_file_contents)
429 }
430
431 /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
432 /// This will open the credentials file and load its credentials.
433 /// This can be used as a way to mix authentication methods.
434 ///
435 /// # Example
436 /// ```no_run
437 /// # #[tokio::main]
438 /// # async fn main() -> Result<(), async_nats::ConnectError> {
439 /// let nc = async_nats::ConnectOptions::new()
440 /// .credentials_file("path/to/my.creds")
441 /// .await?
442 /// .connect("connect.ngs.global")
443 /// .await?;
444 /// # Ok(())
445 /// # }
446 /// ```
447 #[cfg(feature = "nkeys")]
448 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
449 pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
450 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
451 self.credentials(&cred_file_contents)
452 }
453
454 /// Authenticate with NATS using a credential str, in the creds file format.
455 ///
456 /// # Example
457 /// ```no_run
458 /// # #[tokio::main]
459 /// # async fn main() -> Result<(), async_nats::ConnectError> {
460 /// let creds = "-----BEGIN NATS USER JWT-----
461 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
462 /// ------END NATS USER JWT------
463 ///
464 /// ************************* IMPORTANT *************************
465 /// NKEY Seed printed below can be used sign and prove identity.
466 /// NKEYs are sensitive and should be treated as secrets.
467 ///
468 /// -----BEGIN USER NKEY SEED-----
469 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
470 /// ------END USER NKEY SEED------
471 /// ";
472 ///
473 /// let nc = async_nats::ConnectOptions::with_credentials(creds)
474 /// .expect("failed to parse static creds")
475 /// .connect("connect.ngs.global")
476 /// .await?;
477 /// # Ok(())
478 /// # }
479 /// ```
480 #[cfg(feature = "nkeys")]
481 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
482 pub fn with_credentials(creds: &str) -> io::Result<Self> {
483 ConnectOptions::default().credentials(creds)
484 }
485
486 /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
487 /// The string should be in the credentials file format.
488 /// This can be used as a way to mix authentication methods.
489 ///
490 /// # Example
491 /// ```no_run
492 /// # #[tokio::main]
493 /// # async fn main() -> Result<(), async_nats::ConnectError> {
494 /// let creds = "-----BEGIN NATS USER JWT-----
495 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
496 /// ------END NATS USER JWT------
497 ///
498 /// ************************* IMPORTANT *************************
499 /// NKEY Seed printed below can be used sign and prove identity.
500 /// NKEYs are sensitive and should be treated as secrets.
501 ///
502 /// -----BEGIN USER NKEY SEED-----
503 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
504 /// ------END USER NKEY SEED------
505 /// ";
506 ///
507 /// let nc = async_nats::ConnectOptions::new()
508 /// .credentials(creds)
509 /// .expect("failed to parse static creds")
510 /// .connect("connect.ngs.global")
511 /// .await?;
512 /// # Ok(())
513 /// # }
514 /// ```
515 #[cfg(feature = "nkeys")]
516 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
517 pub fn credentials(self, creds: &str) -> io::Result<Self> {
518 let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
519 let key_pair = std::sync::Arc::new(key_pair);
520
521 Ok(self.jwt(jwt.to_owned(), move |nonce| {
522 let key_pair = key_pair.clone();
523 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
524 }))
525 }
526
527 /// Loads root certificates by providing the path to them.
528 ///
529 /// # Examples
530 /// ```no_run
531 /// # #[tokio::main]
532 /// # async fn main() -> Result<(), async_nats::ConnectError> {
533 /// let nc = async_nats::ConnectOptions::new()
534 /// .add_root_certificates("mycerts.pem".into())
535 /// .connect("demo.nats.io")
536 /// .await?;
537 /// # Ok(())
538 /// # }
539 /// ```
540 pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
541 self.certificates = vec![path];
542 self
543 }
544
545 /// Loads client certificate by providing the path to it.
546 ///
547 /// # Examples
548 /// ```no_run
549 /// # #[tokio::main]
550 /// # async fn main() -> Result<(), async_nats::ConnectError> {
551 /// let nc = async_nats::ConnectOptions::new()
552 /// .add_client_certificate("cert.pem".into(), "key.pem".into())
553 /// .connect("demo.nats.io")
554 /// .await?;
555 /// # Ok(())
556 /// # }
557 /// ```
558 pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
559 self.client_cert = Some(cert);
560 self.client_key = Some(key);
561 self
562 }
563
564 /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
565 ///
566 /// # Examples
567 /// ```no_run
568 /// # #[tokio::main]
569 /// # async fn main() -> Result<(), async_nats::ConnectError> {
570 /// let nc = async_nats::ConnectOptions::new()
571 /// .require_tls(true)
572 /// .connect("demo.nats.io")
573 /// .await?;
574 /// # Ok(())
575 /// # }
576 /// ```
577 pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
578 self.tls_required = is_required;
579 self
580 }
581
582 /// Changes how tls connection is established. If `tls_first` is set,
583 /// client will try to establish tls before getting info from the server.
584 /// That requires the server to enable `handshake_first` option in the config.
585 pub fn tls_first(mut self) -> ConnectOptions {
586 self.tls_first = true;
587 self.tls_required = true;
588 self
589 }
590
591 /// Sets how often Client sends PING message to the server.
592 ///
593 /// # Examples
594 /// ```no_run
595 /// # use tokio::time::Duration;
596 /// # #[tokio::main]
597 /// # async fn main() -> Result<(), async_nats::ConnectError> {
598 /// async_nats::ConnectOptions::new()
599 /// .ping_interval(Duration::from_secs(24))
600 /// .connect("demo.nats.io")
601 /// .await?;
602 /// # Ok(())
603 /// # }
604 /// ```
605 pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
606 self.ping_interval = ping_interval;
607 self
608 }
609
610 /// Sets `no_echo` option which disables delivering messages that were published from the same
611 /// connection.
612 ///
613 /// # Examples
614 /// ```no_run
615 /// # #[tokio::main]
616 /// # async fn main() -> Result<(), async_nats::ConnectError> {
617 /// async_nats::ConnectOptions::new()
618 /// .no_echo()
619 /// .connect("demo.nats.io")
620 /// .await?;
621 /// # Ok(())
622 /// # }
623 /// ```
624 pub fn no_echo(mut self) -> ConnectOptions {
625 self.no_echo = true;
626 self
627 }
628
629 /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
630 /// callback and drop messages.
631 /// Default is set to 65536 messages buffer.
632 ///
633 /// # Examples
634 /// ```no_run
635 /// # #[tokio::main]
636 /// # async fn main() -> Result<(), async_nats::ConnectError> {
637 /// async_nats::ConnectOptions::new()
638 /// .subscription_capacity(1024)
639 /// .connect("demo.nats.io")
640 /// .await?;
641 /// # Ok(())
642 /// # }
643 /// ```
644 pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
645 self.subscription_capacity = capacity;
646 self
647 }
648
649 /// Sets a timeout for the full connection establishment and handshake to avoid
650 /// hangs and deadlocks. This includes TCP/WebSocket connection, TLS setup,
651 /// waiting for the server INFO message, sending CONNECT/PING, and receiving
652 /// the initial server PONG response. Default is set to 5 seconds.
653 ///
654 /// # Examples
655 /// ```no_run
656 /// # #[tokio::main]
657 /// # async fn main() -> Result<(), async_nats::ConnectError> {
658 /// async_nats::ConnectOptions::new()
659 /// .connection_timeout(tokio::time::Duration::from_secs(5))
660 /// .connect("demo.nats.io")
661 /// .await?;
662 /// # Ok(())
663 /// # }
664 /// ```
665 pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
666 self.connection_timeout = timeout;
667 self
668 }
669
670 /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
671 ///
672 /// # Examples
673 /// ```no_run
674 /// # #[tokio::main]
675 /// # async fn main() -> Result<(), async_nats::ConnectError> {
676 /// async_nats::ConnectOptions::new()
677 /// .request_timeout(Some(std::time::Duration::from_secs(3)))
678 /// .connect("demo.nats.io")
679 /// .await?;
680 /// # Ok(())
681 /// # }
682 /// ```
683 pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
684 self.request_timeout = timeout;
685 self
686 }
687
688 /// Registers an asynchronous callback for errors that are received over the wire from the server.
689 ///
690 /// # Examples
691 /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
692 /// work around this
693 ///
694 /// ## Basic
695 /// If you don't need to move anything into the closure, simple signature can be used:
696 ///
697 /// ```no_run
698 /// # #[tokio::main]
699 /// # async fn main() -> Result<(), async_nats::ConnectError> {
700 /// async_nats::ConnectOptions::new()
701 /// .event_callback(|event| async move {
702 /// println!("event occurred: {}", event);
703 /// })
704 /// .connect("demo.nats.io")
705 /// .await?;
706 /// # Ok(())
707 /// # }
708 /// ```
709 ///
710 /// ## Listening to specific event kind
711 /// ```no_run
712 /// # #[tokio::main]
713 /// # async fn main() -> Result<(), async_nats::ConnectError> {
714 /// async_nats::ConnectOptions::new()
715 /// .event_callback(|event| async move {
716 /// match event {
717 /// async_nats::Event::Disconnected => println!("disconnected"),
718 /// async_nats::Event::Connected => println!("reconnected"),
719 /// async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
720 /// other => println!("other event happened: {}", other),
721 /// }
722 /// })
723 /// .connect("demo.nats.io")
724 /// .await?;
725 /// # Ok(())
726 /// # }
727 /// ```
728 ///
729 /// ## Advanced
730 /// If you need to move something into the closure, here's an example how to do that
731 ///
732 /// ```no_run
733 /// # #[tokio::main]
734 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
735 /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
736 /// async_nats::ConnectOptions::new()
737 /// .event_callback(move |event| {
738 /// let tx = tx.clone();
739 /// async move {
740 /// tx.send(event).await.unwrap();
741 /// }
742 /// })
743 /// .connect("demo.nats.io")
744 /// .await?;
745 /// # Ok(())
746 /// # }
747 /// ```
748 pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
749 where
750 F: Fn(Event) -> Fut + Send + Sync + 'static,
751 Fut: Future<Output = ()> + 'static + Send + Sync,
752 {
753 self.event_callback = Some(CallbackArg1::<Event, ()>(Arc::new(move |event| {
754 Box::pin(cb(event))
755 })));
756 self
757 }
758
759 /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
760 ///
761 /// # Examples
762 /// ```no_run
763 /// # #[tokio::main]
764 /// # async fn main() -> Result<(), async_nats::ConnectError> {
765 /// async_nats::ConnectOptions::new()
766 /// .reconnect_delay_callback(|attempts| {
767 /// println!("no of attempts: {attempts}");
768 /// std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
769 /// })
770 /// .connect("demo.nats.io")
771 /// .await?;
772 /// # Ok(())
773 /// # }
774 /// ```
775 pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
776 where
777 F: Fn(usize) -> Duration + Send + Sync + 'static,
778 {
779 self.reconnect_delay_callback = Arc::new(cb);
780 self
781 }
782
783 /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
784 /// This option enables overriding it.
785 ///
786 /// # Examples
787 /// ```
788 /// # #[tokio::main]
789 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
790 /// async_nats::ConnectOptions::new()
791 /// .client_capacity(256)
792 /// .connect("demo.nats.io")
793 /// .await?;
794 /// # Ok(())
795 /// # }
796 /// ```
797 pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
798 self.sender_capacity = capacity;
799 self
800 }
801
802 /// Sets custom prefix instead of default `_INBOX`.
803 ///
804 /// # Examples
805 ///
806 /// ```
807 /// # #[tokio::main]
808 /// # async fn main() -> Result<(), async_nats::Error> {
809 /// async_nats::ConnectOptions::new()
810 /// .custom_inbox_prefix("CUSTOM")
811 /// .connect("demo.nats.io")
812 /// .await?;
813 /// # Ok(())
814 /// # }
815 /// ```
816 pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
817 self.inbox_prefix = prefix.to_string();
818 self
819 }
820
821 /// Sets the name for the client.
822 ///
823 /// # Examples
824 /// ```
825 /// # #[tokio::main]
826 /// # async fn main() -> Result<(), async_nats::Error> {
827 /// async_nats::ConnectOptions::new()
828 /// .name("rust-service")
829 /// .connect("demo.nats.io")
830 /// .await?;
831 /// # Ok(())
832 /// # }
833 /// ```
834 pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
835 self.name = Some(name.to_string());
836 self
837 }
838
839 /// By default, [`ConnectOptions::connect`] will return an error if
840 /// the connection to the server cannot be established.
841 ///
842 /// Setting `retry_on_initial_connect` makes the client
843 /// establish the connection in the background.
844 pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
845 self.retry_on_initial_connect = true;
846 self
847 }
848
849 /// Specifies the number of consecutive reconnect attempts the client will
850 /// make before giving up. This is useful for preventing zombie services
851 /// from endlessly reaching the servers, but it can also be a footgun and
852 /// surprise for users who do not expect that the client can give up
853 /// entirely.
854 ///
855 /// Pass `None` or `0` for no limit.
856 ///
857 /// # Examples
858 /// ```
859 /// # #[tokio::main]
860 /// # async fn main() -> Result<(), async_nats::Error> {
861 /// async_nats::ConnectOptions::new()
862 /// .max_reconnects(None)
863 /// .connect("demo.nats.io")
864 /// .await?;
865 /// # Ok(())
866 /// # }
867 /// ```
868 pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
869 let val: Option<usize> = max_reconnects.into();
870 self.max_reconnects = if val == Some(0) { None } else { val };
871 self
872 }
873
874 /// Disables subject validation for publish operations.
875 ///
876 /// By default, the client validates publish subjects to ensure they don't contain
877 /// whitespace characters (space, tab, CR, LF).
878 ///
879 /// This option only affects **publish** validation. Subscribe and queue group
880 /// validation always runs regardless of this setting, matching the behavior
881 /// of the Go and Java NATS clients.
882 ///
883 /// # Warning
884 /// Using invalid subjects may cause protocol errors with the NATS server.
885 /// Only disable validation if you are certain all published subjects are valid.
886 ///
887 /// # Examples
888 /// ```no_run
889 /// # #[tokio::main]
890 /// # async fn main() -> Result<(), async_nats::ConnectError> {
891 /// async_nats::ConnectOptions::new()
892 /// .skip_subject_validation(true)
893 /// .connect("demo.nats.io")
894 /// .await?;
895 /// # Ok(())
896 /// # }
897 /// ```
898 pub fn skip_subject_validation(mut self, skip: bool) -> ConnectOptions {
899 self.skip_subject_validation = skip;
900 self
901 }
902
903 /// By default, a server may advertise other servers in the cluster known to it.
904 /// By setting this option, the client will ignore the advertised servers.
905 /// This may be useful if the client may not be able to reach them.
906 pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
907 self.ignore_discovered_servers = true;
908 self
909 }
910
911 /// By default, client will pick random server to which it will try connect to.
912 /// This option disables that feature, forcing it to always respect the order
913 /// in which server addresses were passed.
914 pub fn retain_servers_order(mut self) -> ConnectOptions {
915 self.retain_servers_order = true;
916 self
917 }
918
919 /// Allows passing custom rustls tls config.
920 ///
921 /// # Examples
922 /// ```
923 /// # #[tokio::main]
924 /// # async fn main() -> Result<(), async_nats::Error> {
925 /// let mut root_store = async_nats::rustls::RootCertStore::empty();
926 ///
927 /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs().certs);
928 ///
929 /// let tls_client = async_nats::rustls::ClientConfig::builder()
930 /// .with_root_certificates(root_store)
931 /// .with_no_client_auth();
932 ///
933 /// let client = async_nats::ConnectOptions::new()
934 /// .require_tls(true)
935 /// .tls_client_config(tls_client)
936 /// .connect("tls://demo.nats.io")
937 /// .await?;
938 ///
939 /// # Ok(())
940 /// # }
941 /// ```
942 pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
943 self.tls_client_config = Some(config);
944 self
945 }
946
947 /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
948 /// protocol messages.
949 ///
950 /// # Examples
951 /// ```
952 /// # #[tokio::main]
953 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
954 /// async_nats::ConnectOptions::new()
955 /// .read_buffer_capacity(65535)
956 /// .connect("demo.nats.io")
957 /// .await?;
958 /// # Ok(())
959 /// # }
960 /// ```
961 pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
962 self.read_buffer_capacity = size;
963 self
964 }
965
966 /// Sets the local socket address that the client will bind to when connecting
967 /// to the server. This is useful when the client machine has multiple
968 /// network interfaces and you want to control which one is used, or when
969 /// you need to bind to a specific local port.
970 ///
971 /// Use port `0` to let the operating system assign an ephemeral port.
972 ///
973 /// # Examples
974 /// ```no_run
975 /// # #[tokio::main]
976 /// # async fn main() -> Result<(), async_nats::ConnectError> {
977 /// // Bind to a specific IP with an OS-assigned port
978 /// let addr: std::net::SocketAddr = "192.168.1.10:0".parse().unwrap();
979 /// async_nats::ConnectOptions::new()
980 /// .local_address(addr)
981 /// .connect("demo.nats.io")
982 /// .await?;
983 ///
984 /// // Bind to a specific IP and port
985 /// let addr: std::net::SocketAddr = "192.168.1.10:9898".parse().unwrap();
986 /// async_nats::ConnectOptions::new()
987 /// .local_address(addr)
988 /// .connect("demo.nats.io")
989 /// .await?;
990 /// # Ok(())
991 /// # }
992 /// ```
993 pub fn local_address(mut self, address: SocketAddr) -> ConnectOptions {
994 self.local_address = Some(address);
995 self
996 }
997}
998
999pub(crate) type AsyncCallbackArg1<A, T> =
1000 Arc<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
1001
1002#[derive(Clone)]
1003pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
1004
1005impl<A, T> CallbackArg1<A, T> {
1006 pub(crate) async fn call(&self, arg: A) -> T {
1007 (self.0.as_ref())(arg).await
1008 }
1009}
1010
1011impl<A, T> fmt::Debug for CallbackArg1<A, T> {
1012 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1013 f.write_str("callback")
1014 }
1015}
1016
1017/// Error report from signing callback.
1018// This was needed because std::io::Error isn't Send.
1019#[derive(Clone, PartialEq)]
1020pub struct AuthError(String);
1021
1022impl AuthError {
1023 pub fn new(s: impl ToString) -> Self {
1024 Self(s.to_string())
1025 }
1026}
1027
1028impl std::fmt::Display for AuthError {
1029 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1030 f.write_str(&format!("AuthError({})", &self.0))
1031 }
1032}
1033
1034impl std::fmt::Debug for AuthError {
1035 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1036 f.write_str(&format!("AuthError({})", &self.0))
1037 }
1038}
1039
1040impl std::error::Error for AuthError {}