async_nats/options.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::connector::{ReconnectToServer, ReconnectToServerCallback, Server};
17use crate::{Client, ConnectError, Event, ServerInfo, ToServerAddrs};
18#[cfg(feature = "nkeys")]
19use base64::engine::general_purpose::URL_SAFE_NO_PAD;
20#[cfg(feature = "nkeys")]
21use base64::engine::Engine;
22use futures_util::Future;
23use std::fmt::Formatter;
24use std::net::SocketAddr;
25#[cfg(feature = "nkeys")]
26use std::path::Path;
27use std::{fmt, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
28#[cfg(feature = "nkeys")]
29use tokio::io;
30use tokio_rustls::rustls;
31
32/// Connect options. Used to connect with NATS when custom config is needed.
33/// # Examples
34/// ```no_run
35/// # #[tokio::main]
36/// # async fn main() -> Result<(), async_nats::ConnectError> {
37/// let mut options = async_nats::ConnectOptions::new()
38/// .require_tls(true)
39/// .ping_interval(std::time::Duration::from_secs(10))
40/// .connect("demo.nats.io")
41/// .await?;
42/// # Ok(())
43/// # }
44/// ```
45#[derive(Clone)]
46pub struct ConnectOptions {
47 pub(crate) name: Option<String>,
48 pub(crate) no_echo: bool,
49 pub(crate) max_reconnects: Option<usize>,
50 pub(crate) connection_timeout: Duration,
51 pub(crate) auth: Auth,
52 pub(crate) tls_required: bool,
53 pub(crate) tls_first: bool,
54 pub(crate) certificates: Vec<PathBuf>,
55 pub(crate) client_cert: Option<PathBuf>,
56 pub(crate) client_key: Option<PathBuf>,
57 pub(crate) tls_client_config: Option<rustls::ClientConfig>,
58 pub(crate) ping_interval: Duration,
59 pub(crate) subscription_capacity: usize,
60 pub(crate) sender_capacity: usize,
61 pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
62 pub(crate) inbox_prefix: String,
63 pub(crate) request_timeout: Option<Duration>,
64 pub(crate) retry_on_initial_connect: bool,
65 pub(crate) ignore_discovered_servers: bool,
66 pub(crate) retain_servers_order: bool,
67 pub(crate) read_buffer_capacity: u16,
68 pub(crate) reconnect_delay_callback: Arc<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
69 pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
70 pub(crate) skip_subject_validation: bool,
71 pub(crate) local_address: Option<SocketAddr>,
72 pub(crate) reconnect_to_server_callback: Option<ReconnectToServerCallback>,
73}
74
75impl fmt::Debug for ConnectOptions {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
77 f.debug_map()
78 .entry(&"name", &self.name)
79 .entry(&"no_echo", &self.no_echo)
80 .entry(&"max_reconnects", &self.max_reconnects)
81 .entry(&"connection_timeout", &self.connection_timeout)
82 .entry(&"tls_required", &self.tls_required)
83 .entry(&"certificates", &self.certificates)
84 .entry(&"client_cert", &self.client_cert)
85 .entry(&"client_key", &self.client_key)
86 .entry(&"tls_client_config", &"XXXXXXXX")
87 .entry(&"tls_first", &self.tls_first)
88 .entry(&"ping_interval", &self.ping_interval)
89 .entry(&"sender_capacity", &self.sender_capacity)
90 .entry(&"inbox_prefix", &self.inbox_prefix)
91 .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
92 .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
93 .entry(&"skip_subject_validation", &self.skip_subject_validation)
94 .finish()
95 }
96}
97
98impl Default for ConnectOptions {
99 fn default() -> ConnectOptions {
100 ConnectOptions {
101 name: None,
102 no_echo: false,
103 max_reconnects: None,
104 connection_timeout: Duration::from_secs(5),
105 tls_required: false,
106 tls_first: false,
107 certificates: Vec::new(),
108 client_cert: None,
109 client_key: None,
110 tls_client_config: None,
111 ping_interval: Duration::from_secs(60),
112 sender_capacity: 2048,
113 subscription_capacity: 1024 * 64,
114 event_callback: None,
115 inbox_prefix: "_INBOX".to_string(),
116 request_timeout: Some(Duration::from_secs(10)),
117 retry_on_initial_connect: false,
118 ignore_discovered_servers: false,
119 retain_servers_order: false,
120 read_buffer_capacity: 65535,
121 reconnect_delay_callback: Arc::new(|attempts| {
122 connector::reconnect_delay_callback_default(attempts)
123 }),
124 auth: Default::default(),
125 auth_callback: None,
126 skip_subject_validation: false,
127 local_address: None,
128 reconnect_to_server_callback: None,
129 }
130 }
131}
132
133impl ConnectOptions {
134 /// Enables customization of NATS connection.
135 ///
136 /// # Examples
137 /// ```no_run
138 /// # #[tokio::main]
139 /// # async fn main() -> Result<(), async_nats::ConnectError> {
140 /// let mut options = async_nats::ConnectOptions::new()
141 /// .require_tls(true)
142 /// .ping_interval(std::time::Duration::from_secs(10))
143 /// .connect("demo.nats.io")
144 /// .await?;
145 /// # Ok(())
146 /// # }
147 /// ```
148 pub fn new() -> ConnectOptions {
149 ConnectOptions::default()
150 }
151
152 /// Connect to the NATS Server leveraging all passed options.
153 ///
154 /// # Examples
155 /// ```no_run
156 /// # #[tokio::main]
157 /// # async fn main() -> Result<(), async_nats::ConnectError> {
158 /// let nc = async_nats::ConnectOptions::new()
159 /// .require_tls(true)
160 /// .connect("demo.nats.io")
161 /// .await?;
162 /// # Ok(())
163 /// # }
164 /// ```
165 ///
166 /// ## Pass multiple URLs.
167 /// ```no_run
168 /// #[tokio::main]
169 /// # async fn main() -> Result<(), async_nats::Error> {
170 /// use async_nats::ServerAddr;
171 /// let client = async_nats::connect(vec![
172 /// "demo.nats.io".parse::<ServerAddr>()?,
173 /// "other.nats.io".parse::<ServerAddr>()?,
174 /// ])
175 /// .await
176 /// .unwrap();
177 /// # Ok(())
178 /// # }
179 /// ```
180 pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
181 crate::connect_with_options(addrs, self).await
182 }
183
184 /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
185 /// Requires an asynchronous function that accepts nonce and returns [Auth].
186 /// It will overwrite all other auth methods used.
187 ///
188 ///
189 /// # Example
190 /// ```no_run
191 /// # #[tokio::main]
192 /// # async fn main() -> Result<(), async_nats::ConnectError> {
193 /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
194 /// let mut auth = async_nats::Auth::new();
195 /// auth.username = Some("derek".to_string());
196 /// auth.password = Some("s3cr3t".to_string());
197 /// Ok(auth)
198 /// })
199 /// .connect("demo.nats.io")
200 /// .await?;
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub fn with_auth_callback<F, Fut>(callback: F) -> Self
205 where
206 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
207 Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
208 {
209 let mut options = ConnectOptions::new();
210 options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Arc::new(
211 move |nonce| Box::pin(callback(nonce)),
212 )));
213 options
214 }
215
216 /// Authenticate against NATS Server with the provided token.
217 ///
218 /// # Examples
219 /// ```no_run
220 /// # #[tokio::main]
221 /// # async fn main() -> Result<(), async_nats::ConnectError> {
222 /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
223 /// .connect("demo.nats.io")
224 /// .await?;
225 /// # Ok(())
226 /// # }
227 /// ```
228 pub fn with_token(token: String) -> Self {
229 ConnectOptions::default().token(token)
230 }
231
232 /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
233 /// This can be used as a way to mix authentication methods.
234 ///
235 /// # Examples
236 /// ```no_run
237 /// # #[tokio::main]
238 /// # async fn main() -> Result<(), async_nats::ConnectError> {
239 /// let nc = async_nats::ConnectOptions::new()
240 /// .token("t0k3n!".into())
241 /// .connect("demo.nats.io")
242 /// .await?;
243 /// # Ok(())
244 /// # }
245 /// ```
246 pub fn token(mut self, token: String) -> Self {
247 self.auth.token = Some(token);
248 self
249 }
250
251 /// Authenticate against NATS Server with the provided username and password.
252 ///
253 /// # Examples
254 /// ```no_run
255 /// # #[tokio::main]
256 /// # async fn main() -> Result<(), async_nats::ConnectError> {
257 /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
258 /// .connect("demo.nats.io")
259 /// .await?;
260 /// # Ok(())
261 /// # }
262 /// ```
263 pub fn with_user_and_password(user: String, pass: String) -> Self {
264 ConnectOptions::default().user_and_password(user, pass)
265 }
266
267 /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
268 /// This can be used as a way to mix authentication methods.
269 ///
270 /// # Examples
271 /// ```no_run
272 /// # #[tokio::main]
273 /// # async fn main() -> Result<(), async_nats::ConnectError> {
274 /// let nc = async_nats::ConnectOptions::new()
275 /// .user_and_password("derek".into(), "s3cr3t!".into())
276 /// .connect("demo.nats.io")
277 /// .await?;
278 /// # Ok(())
279 /// # }
280 /// ```
281 pub fn user_and_password(mut self, user: String, pass: String) -> Self {
282 self.auth.username = Some(user);
283 self.auth.password = Some(pass);
284 self
285 }
286
287 /// Authenticate with an NKey. Requires an NKey Seed secret.
288 ///
289 /// # Example
290 /// ```no_run
291 /// # #[tokio::main]
292 /// # async fn main() -> Result<(), async_nats::ConnectError> {
293 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
294 /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
295 /// .connect("localhost")
296 /// .await?;
297 /// # Ok(())
298 /// # }
299 /// ```
300 #[cfg(feature = "nkeys")]
301 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
302 pub fn with_nkey(seed: String) -> Self {
303 ConnectOptions::default().nkey(seed)
304 }
305
306 /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
307 /// Requires an NKey Seed Secret.
308 /// This can be used as a way to mix authentication methods.
309 ///
310 /// # Example
311 /// ```no_run
312 /// # #[tokio::main]
313 /// # async fn main() -> Result<(), async_nats::ConnectError> {
314 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
315 /// let nc = async_nats::ConnectOptions::new()
316 /// .nkey(seed.into())
317 /// .connect("localhost")
318 /// .await?;
319 /// # Ok(())
320 /// # }
321 /// ```
322 #[cfg(feature = "nkeys")]
323 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
324 pub fn nkey(mut self, seed: String) -> Self {
325 self.auth.nkey = Some(seed);
326 self
327 }
328
329 /// Authenticate with a JWT. Requires function to sign the server nonce.
330 /// The signing function is asynchronous.
331 ///
332 /// # Example
333 /// ```no_run
334 /// # #[tokio::main]
335 /// # async fn main() -> Result<(), async_nats::ConnectError> {
336 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
337 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
338 /// // load jwt from creds file or other secure source
339 /// async fn load_jwt() -> std::io::Result<String> {
340 /// todo!();
341 /// }
342 /// let jwt = load_jwt().await?;
343 /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
344 /// let key_pair = key_pair.clone();
345 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
346 /// })
347 /// .connect("localhost")
348 /// .await?;
349 /// # Ok(())
350 /// # }
351 /// ```
352 #[cfg(feature = "nkeys")]
353 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
354 pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
355 where
356 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
357 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
358 {
359 ConnectOptions::default().jwt(jwt, sign_cb)
360 }
361
362 /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
363 /// Requires an asynchronous function to sign the server nonce.
364 /// This can be used as a way to mix authentication methods.
365 ///
366 ///
367 /// # Example
368 /// ```no_run
369 /// # #[tokio::main]
370 /// # async fn main() -> Result<(), async_nats::ConnectError> {
371 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
372 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
373 /// // load jwt from creds file or other secure source
374 /// async fn load_jwt() -> std::io::Result<String> {
375 /// todo!();
376 /// }
377 /// let jwt = load_jwt().await?;
378 /// let nc = async_nats::ConnectOptions::new()
379 /// .jwt(jwt, move |nonce| {
380 /// let key_pair = key_pair.clone();
381 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
382 /// })
383 /// .connect("localhost")
384 /// .await?;
385 /// # Ok(())
386 /// # }
387 /// ```
388 #[cfg(feature = "nkeys")]
389 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
390 pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
391 where
392 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
393 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
394 {
395 let sign_cb = Arc::new(sign_cb);
396
397 let jwt_sign_callback = CallbackArg1(Arc::new(move |nonce: String| {
398 let sign_cb = sign_cb.clone();
399 Box::pin(async move {
400 let sig = sign_cb(nonce.as_bytes().to_vec())
401 .await
402 .map_err(AuthError::new)?;
403 Ok(URL_SAFE_NO_PAD.encode(sig))
404 })
405 }));
406
407 self.auth.jwt = Some(jwt);
408 self.auth.signature_callback = Some(jwt_sign_callback);
409 self
410 }
411
412 /// Authenticate with NATS using a `.creds` file.
413 /// Open the provided file, load its creds,
414 /// and perform the desired authentication
415 ///
416 /// # Example
417 /// ```no_run
418 /// # #[tokio::main]
419 /// # async fn main() -> Result<(), async_nats::ConnectError> {
420 /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
421 /// .await?
422 /// .connect("connect.ngs.global")
423 /// .await?;
424 /// # Ok(())
425 /// # }
426 /// ```
427 #[cfg(feature = "nkeys")]
428 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
429 pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
430 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
431 Self::with_credentials(&cred_file_contents)
432 }
433
434 /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
435 /// This will open the credentials file and load its credentials.
436 /// This can be used as a way to mix authentication methods.
437 ///
438 /// # Example
439 /// ```no_run
440 /// # #[tokio::main]
441 /// # async fn main() -> Result<(), async_nats::ConnectError> {
442 /// let nc = async_nats::ConnectOptions::new()
443 /// .credentials_file("path/to/my.creds")
444 /// .await?
445 /// .connect("connect.ngs.global")
446 /// .await?;
447 /// # Ok(())
448 /// # }
449 /// ```
450 #[cfg(feature = "nkeys")]
451 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
452 pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
453 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
454 self.credentials(&cred_file_contents)
455 }
456
457 /// Authenticate with NATS using a credential str, in the creds file format.
458 ///
459 /// # Example
460 /// ```no_run
461 /// # #[tokio::main]
462 /// # async fn main() -> Result<(), async_nats::ConnectError> {
463 /// let creds = "-----BEGIN NATS USER JWT-----
464 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
465 /// ------END NATS USER JWT------
466 ///
467 /// ************************* IMPORTANT *************************
468 /// NKEY Seed printed below can be used sign and prove identity.
469 /// NKEYs are sensitive and should be treated as secrets.
470 ///
471 /// -----BEGIN USER NKEY SEED-----
472 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
473 /// ------END USER NKEY SEED------
474 /// ";
475 ///
476 /// let nc = async_nats::ConnectOptions::with_credentials(creds)
477 /// .expect("failed to parse static creds")
478 /// .connect("connect.ngs.global")
479 /// .await?;
480 /// # Ok(())
481 /// # }
482 /// ```
483 #[cfg(feature = "nkeys")]
484 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
485 pub fn with_credentials(creds: &str) -> io::Result<Self> {
486 ConnectOptions::default().credentials(creds)
487 }
488
489 /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
490 /// The string should be in the credentials file format.
491 /// This can be used as a way to mix authentication methods.
492 ///
493 /// # Example
494 /// ```no_run
495 /// # #[tokio::main]
496 /// # async fn main() -> Result<(), async_nats::ConnectError> {
497 /// let creds = "-----BEGIN NATS USER JWT-----
498 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
499 /// ------END NATS USER JWT------
500 ///
501 /// ************************* IMPORTANT *************************
502 /// NKEY Seed printed below can be used sign and prove identity.
503 /// NKEYs are sensitive and should be treated as secrets.
504 ///
505 /// -----BEGIN USER NKEY SEED-----
506 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
507 /// ------END USER NKEY SEED------
508 /// ";
509 ///
510 /// let nc = async_nats::ConnectOptions::new()
511 /// .credentials(creds)
512 /// .expect("failed to parse static creds")
513 /// .connect("connect.ngs.global")
514 /// .await?;
515 /// # Ok(())
516 /// # }
517 /// ```
518 #[cfg(feature = "nkeys")]
519 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
520 pub fn credentials(self, creds: &str) -> io::Result<Self> {
521 let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
522 let key_pair = std::sync::Arc::new(key_pair);
523
524 Ok(self.jwt(jwt.to_owned(), move |nonce| {
525 let key_pair = key_pair.clone();
526 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
527 }))
528 }
529
530 /// Loads root certificates by providing the path to them.
531 ///
532 /// # Examples
533 /// ```no_run
534 /// # #[tokio::main]
535 /// # async fn main() -> Result<(), async_nats::ConnectError> {
536 /// let nc = async_nats::ConnectOptions::new()
537 /// .add_root_certificates("mycerts.pem".into())
538 /// .connect("demo.nats.io")
539 /// .await?;
540 /// # Ok(())
541 /// # }
542 /// ```
543 pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
544 self.certificates = vec![path];
545 self
546 }
547
548 /// Loads client certificate by providing the path to it.
549 ///
550 /// # Examples
551 /// ```no_run
552 /// # #[tokio::main]
553 /// # async fn main() -> Result<(), async_nats::ConnectError> {
554 /// let nc = async_nats::ConnectOptions::new()
555 /// .add_client_certificate("cert.pem".into(), "key.pem".into())
556 /// .connect("demo.nats.io")
557 /// .await?;
558 /// # Ok(())
559 /// # }
560 /// ```
561 pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
562 self.client_cert = Some(cert);
563 self.client_key = Some(key);
564 self
565 }
566
567 /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
568 ///
569 /// # Examples
570 /// ```no_run
571 /// # #[tokio::main]
572 /// # async fn main() -> Result<(), async_nats::ConnectError> {
573 /// let nc = async_nats::ConnectOptions::new()
574 /// .require_tls(true)
575 /// .connect("demo.nats.io")
576 /// .await?;
577 /// # Ok(())
578 /// # }
579 /// ```
580 pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
581 self.tls_required = is_required;
582 self
583 }
584
585 /// Changes how tls connection is established. If `tls_first` is set,
586 /// client will try to establish tls before getting info from the server.
587 /// That requires the server to enable `handshake_first` option in the config.
588 pub fn tls_first(mut self) -> ConnectOptions {
589 self.tls_first = true;
590 self.tls_required = true;
591 self
592 }
593
594 /// Sets how often Client sends PING message to the server.
595 ///
596 /// # Examples
597 /// ```no_run
598 /// # use tokio::time::Duration;
599 /// # #[tokio::main]
600 /// # async fn main() -> Result<(), async_nats::ConnectError> {
601 /// async_nats::ConnectOptions::new()
602 /// .ping_interval(Duration::from_secs(24))
603 /// .connect("demo.nats.io")
604 /// .await?;
605 /// # Ok(())
606 /// # }
607 /// ```
608 pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
609 self.ping_interval = ping_interval;
610 self
611 }
612
613 /// Sets `no_echo` option which disables delivering messages that were published from the same
614 /// connection.
615 ///
616 /// # Examples
617 /// ```no_run
618 /// # #[tokio::main]
619 /// # async fn main() -> Result<(), async_nats::ConnectError> {
620 /// async_nats::ConnectOptions::new()
621 /// .no_echo()
622 /// .connect("demo.nats.io")
623 /// .await?;
624 /// # Ok(())
625 /// # }
626 /// ```
627 pub fn no_echo(mut self) -> ConnectOptions {
628 self.no_echo = true;
629 self
630 }
631
632 /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
633 /// callback and drop messages.
634 /// Default is set to 65536 messages buffer.
635 ///
636 /// # Examples
637 /// ```no_run
638 /// # #[tokio::main]
639 /// # async fn main() -> Result<(), async_nats::ConnectError> {
640 /// async_nats::ConnectOptions::new()
641 /// .subscription_capacity(1024)
642 /// .connect("demo.nats.io")
643 /// .await?;
644 /// # Ok(())
645 /// # }
646 /// ```
647 pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
648 self.subscription_capacity = capacity;
649 self
650 }
651
652 /// Sets a timeout for the full connection establishment and handshake to avoid
653 /// hangs and deadlocks. This includes TCP/WebSocket connection, TLS setup,
654 /// waiting for the server INFO message, sending CONNECT/PING, and receiving
655 /// the initial server PONG response. Default is set to 5 seconds.
656 ///
657 /// # Examples
658 /// ```no_run
659 /// # #[tokio::main]
660 /// # async fn main() -> Result<(), async_nats::ConnectError> {
661 /// async_nats::ConnectOptions::new()
662 /// .connection_timeout(tokio::time::Duration::from_secs(5))
663 /// .connect("demo.nats.io")
664 /// .await?;
665 /// # Ok(())
666 /// # }
667 /// ```
668 pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
669 self.connection_timeout = timeout;
670 self
671 }
672
673 /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
674 ///
675 /// # Examples
676 /// ```no_run
677 /// # #[tokio::main]
678 /// # async fn main() -> Result<(), async_nats::ConnectError> {
679 /// async_nats::ConnectOptions::new()
680 /// .request_timeout(Some(std::time::Duration::from_secs(3)))
681 /// .connect("demo.nats.io")
682 /// .await?;
683 /// # Ok(())
684 /// # }
685 /// ```
686 pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
687 self.request_timeout = timeout;
688 self
689 }
690
691 /// Registers an asynchronous callback for errors that are received over the wire from the server.
692 ///
693 /// # Examples
694 /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
695 /// work around this
696 ///
697 /// ## Basic
698 /// If you don't need to move anything into the closure, simple signature can be used:
699 ///
700 /// ```no_run
701 /// # #[tokio::main]
702 /// # async fn main() -> Result<(), async_nats::ConnectError> {
703 /// async_nats::ConnectOptions::new()
704 /// .event_callback(|event| async move {
705 /// println!("event occurred: {}", event);
706 /// })
707 /// .connect("demo.nats.io")
708 /// .await?;
709 /// # Ok(())
710 /// # }
711 /// ```
712 ///
713 /// ## Listening to specific event kind
714 /// ```no_run
715 /// # #[tokio::main]
716 /// # async fn main() -> Result<(), async_nats::ConnectError> {
717 /// async_nats::ConnectOptions::new()
718 /// .event_callback(|event| async move {
719 /// match event {
720 /// async_nats::Event::Disconnected => println!("disconnected"),
721 /// async_nats::Event::Connected => println!("reconnected"),
722 /// async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
723 /// other => println!("other event happened: {}", other),
724 /// }
725 /// })
726 /// .connect("demo.nats.io")
727 /// .await?;
728 /// # Ok(())
729 /// # }
730 /// ```
731 ///
732 /// ## Advanced
733 /// If you need to move something into the closure, here's an example how to do that
734 ///
735 /// ```no_run
736 /// # #[tokio::main]
737 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
738 /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
739 /// async_nats::ConnectOptions::new()
740 /// .event_callback(move |event| {
741 /// let tx = tx.clone();
742 /// async move {
743 /// tx.send(event).await.unwrap();
744 /// }
745 /// })
746 /// .connect("demo.nats.io")
747 /// .await?;
748 /// # Ok(())
749 /// # }
750 /// ```
751 pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
752 where
753 F: Fn(Event) -> Fut + Send + Sync + 'static,
754 Fut: Future<Output = ()> + 'static + Send + Sync,
755 {
756 self.event_callback = Some(CallbackArg1::<Event, ()>(Arc::new(move |event| {
757 Box::pin(cb(event))
758 })));
759 self
760 }
761
762 /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
763 ///
764 /// # Examples
765 /// ```no_run
766 /// # #[tokio::main]
767 /// # async fn main() -> Result<(), async_nats::ConnectError> {
768 /// async_nats::ConnectOptions::new()
769 /// .reconnect_delay_callback(|attempts| {
770 /// println!("no of attempts: {attempts}");
771 /// std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
772 /// })
773 /// .connect("demo.nats.io")
774 /// .await?;
775 /// # Ok(())
776 /// # }
777 /// ```
778 pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
779 where
780 F: Fn(usize) -> Duration + Send + Sync + 'static,
781 {
782 self.reconnect_delay_callback = Arc::new(cb);
783 self
784 }
785
786 /// Sets a callback invoked on each reconnect attempt to select a specific
787 /// server from the pool.
788 ///
789 /// The callback receives a snapshot of available servers (with per-server
790 /// metadata such as reconnect count) and the last known [`ServerInfo`].
791 /// It should return a [`ReconnectToServer`] specifying which server to try
792 /// and how long to wait, or `None` to use default server selection.
793 ///
794 /// If the returned server address is not in the pool, the library falls back
795 /// to default selection and emits a [`ClientError`][crate::ClientError] event.
796 ///
797 /// When this callback returns `Some`, its delay takes precedence over
798 /// [`reconnect_delay_callback`][ConnectOptions::reconnect_delay_callback].
799 ///
800 /// # Examples
801 /// ```no_run
802 /// # #[tokio::main]
803 /// # async fn main() -> Result<(), async_nats::ConnectError> {
804 /// async_nats::ConnectOptions::new()
805 /// .reconnect_to_server_callback(|servers, _info| async move {
806 /// // Always try the first available server immediately.
807 /// servers.first().map(|s| async_nats::ReconnectToServer {
808 /// addr: s.addr.clone(),
809 /// delay: Some(std::time::Duration::ZERO),
810 /// })
811 /// })
812 /// .connect("demo.nats.io")
813 /// .await?;
814 /// # Ok(())
815 /// # }
816 /// ```
817 pub fn reconnect_to_server_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
818 where
819 F: Fn(Vec<Server>, ServerInfo) -> Fut + Send + Sync + 'static,
820 Fut: Future<Output = Option<ReconnectToServer>> + Send + Sync + 'static,
821 {
822 self.reconnect_to_server_callback = Some(CallbackArg1(Arc::new(move |(servers, info)| {
823 Box::pin(cb(servers, info))
824 })));
825 self
826 }
827
828 /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
829 /// This option enables overriding it.
830 ///
831 /// # Examples
832 /// ```
833 /// # #[tokio::main]
834 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
835 /// async_nats::ConnectOptions::new()
836 /// .client_capacity(256)
837 /// .connect("demo.nats.io")
838 /// .await?;
839 /// # Ok(())
840 /// # }
841 /// ```
842 pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
843 self.sender_capacity = capacity;
844 self
845 }
846
847 /// Sets custom prefix instead of default `_INBOX`.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// # #[tokio::main]
853 /// # async fn main() -> Result<(), async_nats::Error> {
854 /// async_nats::ConnectOptions::new()
855 /// .custom_inbox_prefix("CUSTOM")
856 /// .connect("demo.nats.io")
857 /// .await?;
858 /// # Ok(())
859 /// # }
860 /// ```
861 pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
862 self.inbox_prefix = prefix.to_string();
863 self
864 }
865
866 /// Sets the name for the client.
867 ///
868 /// # Examples
869 /// ```
870 /// # #[tokio::main]
871 /// # async fn main() -> Result<(), async_nats::Error> {
872 /// async_nats::ConnectOptions::new()
873 /// .name("rust-service")
874 /// .connect("demo.nats.io")
875 /// .await?;
876 /// # Ok(())
877 /// # }
878 /// ```
879 pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
880 self.name = Some(name.to_string());
881 self
882 }
883
884 /// By default, [`ConnectOptions::connect`] will return an error if
885 /// the connection to the server cannot be established.
886 ///
887 /// Setting `retry_on_initial_connect` makes the client
888 /// establish the connection in the background.
889 pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
890 self.retry_on_initial_connect = true;
891 self
892 }
893
894 /// Specifies the number of consecutive reconnect attempts the client will
895 /// make before giving up. This is useful for preventing zombie services
896 /// from endlessly reaching the servers, but it can also be a footgun and
897 /// surprise for users who do not expect that the client can give up
898 /// entirely.
899 ///
900 /// Pass `None` or `0` for no limit.
901 ///
902 /// # Examples
903 /// ```
904 /// # #[tokio::main]
905 /// # async fn main() -> Result<(), async_nats::Error> {
906 /// async_nats::ConnectOptions::new()
907 /// .max_reconnects(None)
908 /// .connect("demo.nats.io")
909 /// .await?;
910 /// # Ok(())
911 /// # }
912 /// ```
913 pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
914 let val: Option<usize> = max_reconnects.into();
915 self.max_reconnects = if val == Some(0) { None } else { val };
916 self
917 }
918
919 /// Disables subject validation for publish operations.
920 ///
921 /// By default, the client validates publish subjects to ensure they don't contain
922 /// whitespace characters (space, tab, CR, LF).
923 ///
924 /// This option only affects **publish** validation. Subscribe and queue group
925 /// validation always runs regardless of this setting, matching the behavior
926 /// of the Go and Java NATS clients.
927 ///
928 /// # Warning
929 /// Using invalid subjects may cause protocol errors with the NATS server.
930 /// Only disable validation if you are certain all published subjects are valid.
931 ///
932 /// # Examples
933 /// ```no_run
934 /// # #[tokio::main]
935 /// # async fn main() -> Result<(), async_nats::ConnectError> {
936 /// async_nats::ConnectOptions::new()
937 /// .skip_subject_validation(true)
938 /// .connect("demo.nats.io")
939 /// .await?;
940 /// # Ok(())
941 /// # }
942 /// ```
943 pub fn skip_subject_validation(mut self, skip: bool) -> ConnectOptions {
944 self.skip_subject_validation = skip;
945 self
946 }
947
948 /// By default, a server may advertise other servers in the cluster known to it.
949 /// By setting this option, the client will ignore the advertised servers.
950 /// This may be useful if the client may not be able to reach them.
951 pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
952 self.ignore_discovered_servers = true;
953 self
954 }
955
956 /// By default, client will pick random server to which it will try connect to.
957 /// This option disables that feature, forcing it to always respect the order
958 /// in which server addresses were passed.
959 pub fn retain_servers_order(mut self) -> ConnectOptions {
960 self.retain_servers_order = true;
961 self
962 }
963
964 /// Allows passing custom rustls tls config.
965 ///
966 /// # Examples
967 /// ```
968 /// # #[tokio::main]
969 /// # async fn main() -> Result<(), async_nats::Error> {
970 /// let mut root_store = async_nats::rustls::RootCertStore::empty();
971 ///
972 /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs().certs);
973 ///
974 /// let tls_client = async_nats::rustls::ClientConfig::builder()
975 /// .with_root_certificates(root_store)
976 /// .with_no_client_auth();
977 ///
978 /// let client = async_nats::ConnectOptions::new()
979 /// .require_tls(true)
980 /// .tls_client_config(tls_client)
981 /// .connect("tls://demo.nats.io")
982 /// .await?;
983 ///
984 /// # Ok(())
985 /// # }
986 /// ```
987 pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
988 self.tls_client_config = Some(config);
989 self
990 }
991
992 /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
993 /// protocol messages.
994 ///
995 /// # Examples
996 /// ```
997 /// # #[tokio::main]
998 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
999 /// async_nats::ConnectOptions::new()
1000 /// .read_buffer_capacity(65535)
1001 /// .connect("demo.nats.io")
1002 /// .await?;
1003 /// # Ok(())
1004 /// # }
1005 /// ```
1006 pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
1007 self.read_buffer_capacity = size;
1008 self
1009 }
1010
1011 /// Sets the local socket address that the client will bind to when connecting
1012 /// to the server. This is useful when the client machine has multiple
1013 /// network interfaces and you want to control which one is used, or when
1014 /// you need to bind to a specific local port.
1015 ///
1016 /// Use port `0` to let the operating system assign an ephemeral port.
1017 ///
1018 /// # Examples
1019 /// ```no_run
1020 /// # #[tokio::main]
1021 /// # async fn main() -> Result<(), async_nats::ConnectError> {
1022 /// // Bind to a specific IP with an OS-assigned port
1023 /// let addr: std::net::SocketAddr = "192.168.1.10:0".parse().unwrap();
1024 /// async_nats::ConnectOptions::new()
1025 /// .local_address(addr)
1026 /// .connect("demo.nats.io")
1027 /// .await?;
1028 ///
1029 /// // Bind to a specific IP and port
1030 /// let addr: std::net::SocketAddr = "192.168.1.10:9898".parse().unwrap();
1031 /// async_nats::ConnectOptions::new()
1032 /// .local_address(addr)
1033 /// .connect("demo.nats.io")
1034 /// .await?;
1035 /// # Ok(())
1036 /// # }
1037 /// ```
1038 pub fn local_address(mut self, address: SocketAddr) -> ConnectOptions {
1039 self.local_address = Some(address);
1040 self
1041 }
1042}
1043
1044pub(crate) type AsyncCallbackArg1<A, T> =
1045 Arc<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
1046
1047#[derive(Clone)]
1048pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
1049
1050impl<A, T> CallbackArg1<A, T> {
1051 pub(crate) async fn call(&self, arg: A) -> T {
1052 (self.0.as_ref())(arg).await
1053 }
1054}
1055
1056impl<A, T> fmt::Debug for CallbackArg1<A, T> {
1057 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1058 f.write_str("callback")
1059 }
1060}
1061
1062/// Error report from signing callback.
1063// This was needed because std::io::Error isn't Send.
1064#[derive(Clone, PartialEq)]
1065pub struct AuthError(String);
1066
1067impl AuthError {
1068 pub fn new(s: impl ToString) -> Self {
1069 Self(s.to_string())
1070 }
1071}
1072
1073impl std::fmt::Display for AuthError {
1074 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1075 f.write_str(&format!("AuthError({})", &self.0))
1076 }
1077}
1078
1079impl std::fmt::Debug for AuthError {
1080 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1081 f.write_str(&format!("AuthError({})", &self.0))
1082 }
1083}
1084
1085impl std::error::Error for AuthError {}