async_nats/options.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17#[cfg(feature = "nkeys")]
18use base64::engine::general_purpose::URL_SAFE_NO_PAD;
19#[cfg(feature = "nkeys")]
20use base64::engine::Engine;
21use futures_util::Future;
22use std::fmt::Formatter;
23use std::{fmt, path::PathBuf, pin::Pin, time::Duration};
24#[cfg(feature = "nkeys")]
25use std::{path::Path, sync::Arc};
26#[cfg(feature = "nkeys")]
27use tokio::io;
28use tokio_rustls::rustls;
29
30/// Connect options. Used to connect with NATS when custom config is needed.
31/// # Examples
32/// ```no_run
33/// # #[tokio::main]
34/// # async fn main() -> Result<(), async_nats::ConnectError> {
35/// let mut options = async_nats::ConnectOptions::new()
36/// .require_tls(true)
37/// .ping_interval(std::time::Duration::from_secs(10))
38/// .connect("demo.nats.io")
39/// .await?;
40/// # Ok(())
41/// # }
42/// ```
43pub struct ConnectOptions {
44 pub(crate) name: Option<String>,
45 pub(crate) no_echo: bool,
46 pub(crate) max_reconnects: Option<usize>,
47 pub(crate) connection_timeout: Duration,
48 pub(crate) auth: Auth,
49 pub(crate) tls_required: bool,
50 pub(crate) tls_first: bool,
51 pub(crate) certificates: Vec<PathBuf>,
52 pub(crate) client_cert: Option<PathBuf>,
53 pub(crate) client_key: Option<PathBuf>,
54 pub(crate) tls_client_config: Option<rustls::ClientConfig>,
55 pub(crate) ping_interval: Duration,
56 pub(crate) subscription_capacity: usize,
57 pub(crate) sender_capacity: usize,
58 pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
59 pub(crate) inbox_prefix: String,
60 pub(crate) request_timeout: Option<Duration>,
61 pub(crate) retry_on_initial_connect: bool,
62 pub(crate) ignore_discovered_servers: bool,
63 pub(crate) retain_servers_order: bool,
64 pub(crate) read_buffer_capacity: u16,
65 pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
66 pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
67}
68
69impl fmt::Debug for ConnectOptions {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
71 f.debug_map()
72 .entry(&"name", &self.name)
73 .entry(&"no_echo", &self.no_echo)
74 .entry(&"max_reconnects", &self.max_reconnects)
75 .entry(&"connection_timeout", &self.connection_timeout)
76 .entry(&"tls_required", &self.tls_required)
77 .entry(&"certificates", &self.certificates)
78 .entry(&"client_cert", &self.client_cert)
79 .entry(&"client_key", &self.client_key)
80 .entry(&"tls_client_config", &"XXXXXXXX")
81 .entry(&"tls_first", &self.tls_first)
82 .entry(&"ping_interval", &self.ping_interval)
83 .entry(&"sender_capacity", &self.sender_capacity)
84 .entry(&"inbox_prefix", &self.inbox_prefix)
85 .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
86 .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
87 .finish()
88 }
89}
90
91impl Default for ConnectOptions {
92 fn default() -> ConnectOptions {
93 ConnectOptions {
94 name: None,
95 no_echo: false,
96 max_reconnects: None,
97 connection_timeout: Duration::from_secs(5),
98 tls_required: false,
99 tls_first: false,
100 certificates: Vec::new(),
101 client_cert: None,
102 client_key: None,
103 tls_client_config: None,
104 ping_interval: Duration::from_secs(60),
105 sender_capacity: 2048,
106 subscription_capacity: 1024 * 64,
107 event_callback: None,
108 inbox_prefix: "_INBOX".to_string(),
109 request_timeout: Some(Duration::from_secs(10)),
110 retry_on_initial_connect: false,
111 ignore_discovered_servers: false,
112 retain_servers_order: false,
113 read_buffer_capacity: 65535,
114 reconnect_delay_callback: Box::new(|attempts| {
115 connector::reconnect_delay_callback_default(attempts)
116 }),
117 auth: Default::default(),
118 auth_callback: None,
119 }
120 }
121}
122
123impl ConnectOptions {
124 /// Enables customization of NATS connection.
125 ///
126 /// # Examples
127 /// ```no_run
128 /// # #[tokio::main]
129 /// # async fn main() -> Result<(), async_nats::ConnectError> {
130 /// let mut options = async_nats::ConnectOptions::new()
131 /// .require_tls(true)
132 /// .ping_interval(std::time::Duration::from_secs(10))
133 /// .connect("demo.nats.io")
134 /// .await?;
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub fn new() -> ConnectOptions {
139 ConnectOptions::default()
140 }
141
142 /// Connect to the NATS Server leveraging all passed options.
143 ///
144 /// # Examples
145 /// ```no_run
146 /// # #[tokio::main]
147 /// # async fn main() -> Result<(), async_nats::ConnectError> {
148 /// let nc = async_nats::ConnectOptions::new()
149 /// .require_tls(true)
150 /// .connect("demo.nats.io")
151 /// .await?;
152 /// # Ok(())
153 /// # }
154 /// ```
155 ///
156 /// ## Pass multiple URLs.
157 /// ```no_run
158 /// #[tokio::main]
159 /// # async fn main() -> Result<(), async_nats::Error> {
160 /// use async_nats::ServerAddr;
161 /// let client = async_nats::connect(vec![
162 /// "demo.nats.io".parse::<ServerAddr>()?,
163 /// "other.nats.io".parse::<ServerAddr>()?,
164 /// ])
165 /// .await
166 /// .unwrap();
167 /// # Ok(())
168 /// # }
169 /// ```
170 pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
171 crate::connect_with_options(addrs, self).await
172 }
173
174 /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
175 /// Requires an asynchronous function that accepts nonce and returns [Auth].
176 /// It will overwrite all other auth methods used.
177 ///
178 ///
179 /// # Example
180 /// ```no_run
181 /// # #[tokio::main]
182 /// # async fn main() -> Result<(), async_nats::ConnectError> {
183 /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
184 /// let mut auth = async_nats::Auth::new();
185 /// auth.username = Some("derek".to_string());
186 /// auth.password = Some("s3cr3t".to_string());
187 /// Ok(auth)
188 /// })
189 /// .connect("demo.nats.io")
190 /// .await?;
191 /// # Ok(())
192 /// # }
193 /// ```
194 pub fn with_auth_callback<F, Fut>(callback: F) -> Self
195 where
196 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
197 Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
198 {
199 let mut options = ConnectOptions::new();
200 options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
201 move |nonce| Box::pin(callback(nonce)),
202 )));
203 options
204 }
205
206 /// Authenticate against NATS Server with the provided token.
207 ///
208 /// # Examples
209 /// ```no_run
210 /// # #[tokio::main]
211 /// # async fn main() -> Result<(), async_nats::ConnectError> {
212 /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
213 /// .connect("demo.nats.io")
214 /// .await?;
215 /// # Ok(())
216 /// # }
217 /// ```
218 pub fn with_token(token: String) -> Self {
219 ConnectOptions::default().token(token)
220 }
221
222 /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
223 /// This can be used as a way to mix authentication methods.
224 ///
225 /// # Examples
226 /// ```no_run
227 /// # #[tokio::main]
228 /// # async fn main() -> Result<(), async_nats::ConnectError> {
229 /// let nc = async_nats::ConnectOptions::new()
230 /// .token("t0k3n!".into())
231 /// .connect("demo.nats.io")
232 /// .await?;
233 /// # Ok(())
234 /// # }
235 /// ```
236 pub fn token(mut self, token: String) -> Self {
237 self.auth.token = Some(token);
238 self
239 }
240
241 /// Authenticate against NATS Server with the provided username and password.
242 ///
243 /// # Examples
244 /// ```no_run
245 /// # #[tokio::main]
246 /// # async fn main() -> Result<(), async_nats::ConnectError> {
247 /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
248 /// .connect("demo.nats.io")
249 /// .await?;
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub fn with_user_and_password(user: String, pass: String) -> Self {
254 ConnectOptions::default().user_and_password(user, pass)
255 }
256
257 /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
258 /// This can be used as a way to mix authentication methods.
259 ///
260 /// # Examples
261 /// ```no_run
262 /// # #[tokio::main]
263 /// # async fn main() -> Result<(), async_nats::ConnectError> {
264 /// let nc = async_nats::ConnectOptions::new()
265 /// .user_and_password("derek".into(), "s3cr3t!".into())
266 /// .connect("demo.nats.io")
267 /// .await?;
268 /// # Ok(())
269 /// # }
270 /// ```
271 pub fn user_and_password(mut self, user: String, pass: String) -> Self {
272 self.auth.username = Some(user);
273 self.auth.password = Some(pass);
274 self
275 }
276
277 /// Authenticate with an NKey. Requires an NKey Seed secret.
278 ///
279 /// # Example
280 /// ```no_run
281 /// # #[tokio::main]
282 /// # async fn main() -> Result<(), async_nats::ConnectError> {
283 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
284 /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
285 /// .connect("localhost")
286 /// .await?;
287 /// # Ok(())
288 /// # }
289 /// ```
290 #[cfg(feature = "nkeys")]
291 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
292 pub fn with_nkey(seed: String) -> Self {
293 ConnectOptions::default().nkey(seed)
294 }
295
296 /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
297 /// Requires an NKey Seed Secret.
298 /// This can be used as a way to mix authentication methods.
299 ///
300 /// # Example
301 /// ```no_run
302 /// # #[tokio::main]
303 /// # async fn main() -> Result<(), async_nats::ConnectError> {
304 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
305 /// let nc = async_nats::ConnectOptions::new()
306 /// .nkey(seed.into())
307 /// .connect("localhost")
308 /// .await?;
309 /// # Ok(())
310 /// # }
311 /// ```
312 #[cfg(feature = "nkeys")]
313 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
314 pub fn nkey(mut self, seed: String) -> Self {
315 self.auth.nkey = Some(seed);
316 self
317 }
318
319 /// Authenticate with a JWT. Requires function to sign the server nonce.
320 /// The signing function is asynchronous.
321 ///
322 /// # Example
323 /// ```no_run
324 /// # #[tokio::main]
325 /// # async fn main() -> Result<(), async_nats::ConnectError> {
326 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
327 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
328 /// // load jwt from creds file or other secure source
329 /// async fn load_jwt() -> std::io::Result<String> {
330 /// todo!();
331 /// }
332 /// let jwt = load_jwt().await?;
333 /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
334 /// let key_pair = key_pair.clone();
335 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
336 /// })
337 /// .connect("localhost")
338 /// .await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 #[cfg(feature = "nkeys")]
343 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
344 pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
345 where
346 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
347 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
348 {
349 ConnectOptions::default().jwt(jwt, sign_cb)
350 }
351
352 /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
353 /// Requires an asynchronous function to sign the server nonce.
354 /// This can be used as a way to mix authentication methods.
355 ///
356 ///
357 /// # Example
358 /// ```no_run
359 /// # #[tokio::main]
360 /// # async fn main() -> Result<(), async_nats::ConnectError> {
361 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
362 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
363 /// // load jwt from creds file or other secure source
364 /// async fn load_jwt() -> std::io::Result<String> {
365 /// todo!();
366 /// }
367 /// let jwt = load_jwt().await?;
368 /// let nc = async_nats::ConnectOptions::new()
369 /// .jwt(jwt, move |nonce| {
370 /// let key_pair = key_pair.clone();
371 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
372 /// })
373 /// .connect("localhost")
374 /// .await?;
375 /// # Ok(())
376 /// # }
377 /// ```
378 #[cfg(feature = "nkeys")]
379 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
380 pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
381 where
382 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
383 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
384 {
385 let sign_cb = Arc::new(sign_cb);
386
387 let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
388 let sign_cb = sign_cb.clone();
389 Box::pin(async move {
390 let sig = sign_cb(nonce.as_bytes().to_vec())
391 .await
392 .map_err(AuthError::new)?;
393 Ok(URL_SAFE_NO_PAD.encode(sig))
394 })
395 }));
396
397 self.auth.jwt = Some(jwt);
398 self.auth.signature_callback = Some(jwt_sign_callback);
399 self
400 }
401
402 /// Authenticate with NATS using a `.creds` file.
403 /// Open the provided file, load its creds,
404 /// and perform the desired authentication
405 ///
406 /// # Example
407 /// ```no_run
408 /// # #[tokio::main]
409 /// # async fn main() -> Result<(), async_nats::ConnectError> {
410 /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
411 /// .await?
412 /// .connect("connect.ngs.global")
413 /// .await?;
414 /// # Ok(())
415 /// # }
416 /// ```
417 #[cfg(feature = "nkeys")]
418 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
419 pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
420 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
421 Self::with_credentials(&cred_file_contents)
422 }
423
424 /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
425 /// This will open the credentials file and load its credentials.
426 /// This can be used as a way to mix authentication methods.
427 ///
428 /// # Example
429 /// ```no_run
430 /// # #[tokio::main]
431 /// # async fn main() -> Result<(), async_nats::ConnectError> {
432 /// let nc = async_nats::ConnectOptions::new()
433 /// .credentials_file("path/to/my.creds")
434 /// .await?
435 /// .connect("connect.ngs.global")
436 /// .await?;
437 /// # Ok(())
438 /// # }
439 /// ```
440 #[cfg(feature = "nkeys")]
441 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
442 pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
443 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
444 self.credentials(&cred_file_contents)
445 }
446
447 /// Authenticate with NATS using a credential str, in the creds file format.
448 ///
449 /// # Example
450 /// ```no_run
451 /// # #[tokio::main]
452 /// # async fn main() -> Result<(), async_nats::ConnectError> {
453 /// let creds = "-----BEGIN NATS USER JWT-----
454 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
455 /// ------END NATS USER JWT------
456 ///
457 /// ************************* IMPORTANT *************************
458 /// NKEY Seed printed below can be used sign and prove identity.
459 /// NKEYs are sensitive and should be treated as secrets.
460 ///
461 /// -----BEGIN USER NKEY SEED-----
462 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
463 /// ------END USER NKEY SEED------
464 /// ";
465 ///
466 /// let nc = async_nats::ConnectOptions::with_credentials(creds)
467 /// .expect("failed to parse static creds")
468 /// .connect("connect.ngs.global")
469 /// .await?;
470 /// # Ok(())
471 /// # }
472 /// ```
473 #[cfg(feature = "nkeys")]
474 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
475 pub fn with_credentials(creds: &str) -> io::Result<Self> {
476 ConnectOptions::default().credentials(creds)
477 }
478
479 /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
480 /// The string should be in the credentials file format.
481 /// This can be used as a way to mix authentication methods.
482 ///
483 /// # Example
484 /// ```no_run
485 /// # #[tokio::main]
486 /// # async fn main() -> Result<(), async_nats::ConnectError> {
487 /// let creds = "-----BEGIN NATS USER JWT-----
488 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
489 /// ------END NATS USER JWT------
490 ///
491 /// ************************* IMPORTANT *************************
492 /// NKEY Seed printed below can be used sign and prove identity.
493 /// NKEYs are sensitive and should be treated as secrets.
494 ///
495 /// -----BEGIN USER NKEY SEED-----
496 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
497 /// ------END USER NKEY SEED------
498 /// ";
499 ///
500 /// let nc = async_nats::ConnectOptions::new()
501 /// .credentials(creds)
502 /// .expect("failed to parse static creds")
503 /// .connect("connect.ngs.global")
504 /// .await?;
505 /// # Ok(())
506 /// # }
507 /// ```
508 #[cfg(feature = "nkeys")]
509 #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
510 pub fn credentials(self, creds: &str) -> io::Result<Self> {
511 let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
512 let key_pair = std::sync::Arc::new(key_pair);
513
514 Ok(self.jwt(jwt.to_owned(), move |nonce| {
515 let key_pair = key_pair.clone();
516 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
517 }))
518 }
519
520 /// Loads root certificates by providing the path to them.
521 ///
522 /// # Examples
523 /// ```no_run
524 /// # #[tokio::main]
525 /// # async fn main() -> Result<(), async_nats::ConnectError> {
526 /// let nc = async_nats::ConnectOptions::new()
527 /// .add_root_certificates("mycerts.pem".into())
528 /// .connect("demo.nats.io")
529 /// .await?;
530 /// # Ok(())
531 /// # }
532 /// ```
533 pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
534 self.certificates = vec![path];
535 self
536 }
537
538 /// Loads client certificate by providing the path to it.
539 ///
540 /// # Examples
541 /// ```no_run
542 /// # #[tokio::main]
543 /// # async fn main() -> Result<(), async_nats::ConnectError> {
544 /// let nc = async_nats::ConnectOptions::new()
545 /// .add_client_certificate("cert.pem".into(), "key.pem".into())
546 /// .connect("demo.nats.io")
547 /// .await?;
548 /// # Ok(())
549 /// # }
550 /// ```
551 pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
552 self.client_cert = Some(cert);
553 self.client_key = Some(key);
554 self
555 }
556
557 /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
558 ///
559 /// # Examples
560 /// ```no_run
561 /// # #[tokio::main]
562 /// # async fn main() -> Result<(), async_nats::ConnectError> {
563 /// let nc = async_nats::ConnectOptions::new()
564 /// .require_tls(true)
565 /// .connect("demo.nats.io")
566 /// .await?;
567 /// # Ok(())
568 /// # }
569 /// ```
570 pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
571 self.tls_required = is_required;
572 self
573 }
574
575 /// Changes how tls connection is established. If `tls_first` is set,
576 /// client will try to establish tls before getting info from the server.
577 /// That requires the server to enable `handshake_first` option in the config.
578 pub fn tls_first(mut self) -> ConnectOptions {
579 self.tls_first = true;
580 self.tls_required = true;
581 self
582 }
583
584 /// Sets how often Client sends PING message to the server.
585 ///
586 /// # Examples
587 /// ```no_run
588 /// # use tokio::time::Duration;
589 /// # #[tokio::main]
590 /// # async fn main() -> Result<(), async_nats::ConnectError> {
591 /// async_nats::ConnectOptions::new()
592 /// .ping_interval(Duration::from_secs(24))
593 /// .connect("demo.nats.io")
594 /// .await?;
595 /// # Ok(())
596 /// # }
597 /// ```
598 pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
599 self.ping_interval = ping_interval;
600 self
601 }
602
603 /// Sets `no_echo` option which disables delivering messages that were published from the same
604 /// connection.
605 ///
606 /// # Examples
607 /// ```no_run
608 /// # #[tokio::main]
609 /// # async fn main() -> Result<(), async_nats::ConnectError> {
610 /// async_nats::ConnectOptions::new()
611 /// .no_echo()
612 /// .connect("demo.nats.io")
613 /// .await?;
614 /// # Ok(())
615 /// # }
616 /// ```
617 pub fn no_echo(mut self) -> ConnectOptions {
618 self.no_echo = true;
619 self
620 }
621
622 /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
623 /// callback and drop messages.
624 /// Default is set to 65536 messages buffer.
625 ///
626 /// # Examples
627 /// ```no_run
628 /// # #[tokio::main]
629 /// # async fn main() -> Result<(), async_nats::ConnectError> {
630 /// async_nats::ConnectOptions::new()
631 /// .subscription_capacity(1024)
632 /// .connect("demo.nats.io")
633 /// .await?;
634 /// # Ok(())
635 /// # }
636 /// ```
637 pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
638 self.subscription_capacity = capacity;
639 self
640 }
641
642 /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
643 /// Default is set to 5 seconds.
644 ///
645 /// # Examples
646 /// ```no_run
647 /// # #[tokio::main]
648 /// # async fn main() -> Result<(), async_nats::ConnectError> {
649 /// async_nats::ConnectOptions::new()
650 /// .connection_timeout(tokio::time::Duration::from_secs(5))
651 /// .connect("demo.nats.io")
652 /// .await?;
653 /// # Ok(())
654 /// # }
655 /// ```
656 pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
657 self.connection_timeout = timeout;
658 self
659 }
660
661 /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
662 ///
663 /// # Examples
664 /// ```no_run
665 /// # #[tokio::main]
666 /// # async fn main() -> Result<(), async_nats::ConnectError> {
667 /// async_nats::ConnectOptions::new()
668 /// .request_timeout(Some(std::time::Duration::from_secs(3)))
669 /// .connect("demo.nats.io")
670 /// .await?;
671 /// # Ok(())
672 /// # }
673 /// ```
674 pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
675 self.request_timeout = timeout;
676 self
677 }
678
679 /// Registers an asynchronous callback for errors that are received over the wire from the server.
680 ///
681 /// # Examples
682 /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
683 /// work around this
684 ///
685 /// ## Basic
686 /// If you don't need to move anything into the closure, simple signature can be used:
687 ///
688 /// ```no_run
689 /// # #[tokio::main]
690 /// # async fn main() -> Result<(), async_nats::ConnectError> {
691 /// async_nats::ConnectOptions::new()
692 /// .event_callback(|event| async move {
693 /// println!("event occurred: {}", event);
694 /// })
695 /// .connect("demo.nats.io")
696 /// .await?;
697 /// # Ok(())
698 /// # }
699 /// ```
700 ///
701 /// ## Listening to specific event kind
702 /// ```no_run
703 /// # #[tokio::main]
704 /// # async fn main() -> Result<(), async_nats::ConnectError> {
705 /// async_nats::ConnectOptions::new()
706 /// .event_callback(|event| async move {
707 /// match event {
708 /// async_nats::Event::Disconnected => println!("disconnected"),
709 /// async_nats::Event::Connected => println!("reconnected"),
710 /// async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
711 /// other => println!("other event happened: {}", other),
712 /// }
713 /// })
714 /// .connect("demo.nats.io")
715 /// .await?;
716 /// # Ok(())
717 /// # }
718 /// ```
719 ///
720 /// ## Advanced
721 /// If you need to move something into the closure, here's an example how to do that
722 ///
723 /// ```no_run
724 /// # #[tokio::main]
725 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
726 /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
727 /// async_nats::ConnectOptions::new()
728 /// .event_callback(move |event| {
729 /// let tx = tx.clone();
730 /// async move {
731 /// tx.send(event).await.unwrap();
732 /// }
733 /// })
734 /// .connect("demo.nats.io")
735 /// .await?;
736 /// # Ok(())
737 /// # }
738 /// ```
739 pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
740 where
741 F: Fn(Event) -> Fut + Send + Sync + 'static,
742 Fut: Future<Output = ()> + 'static + Send + Sync,
743 {
744 self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
745 Box::pin(cb(event))
746 })));
747 self
748 }
749
750 /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
751 ///
752 /// # Examples
753 /// ```no_run
754 /// # #[tokio::main]
755 /// # async fn main() -> Result<(), async_nats::ConnectError> {
756 /// async_nats::ConnectOptions::new()
757 /// .reconnect_delay_callback(|attempts| {
758 /// println!("no of attempts: {attempts}");
759 /// std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
760 /// })
761 /// .connect("demo.nats.io")
762 /// .await?;
763 /// # Ok(())
764 /// # }
765 /// ```
766 pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
767 where
768 F: Fn(usize) -> Duration + Send + Sync + 'static,
769 {
770 self.reconnect_delay_callback = Box::new(cb);
771 self
772 }
773
774 /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
775 /// This option enables overriding it.
776 ///
777 /// # Examples
778 /// ```
779 /// # #[tokio::main]
780 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
781 /// async_nats::ConnectOptions::new()
782 /// .client_capacity(256)
783 /// .connect("demo.nats.io")
784 /// .await?;
785 /// # Ok(())
786 /// # }
787 /// ```
788 pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
789 self.sender_capacity = capacity;
790 self
791 }
792
793 /// Sets custom prefix instead of default `_INBOX`.
794 ///
795 /// # Examples
796 ///
797 /// ```
798 /// # #[tokio::main]
799 /// # async fn main() -> Result<(), async_nats::Error> {
800 /// async_nats::ConnectOptions::new()
801 /// .custom_inbox_prefix("CUSTOM")
802 /// .connect("demo.nats.io")
803 /// .await?;
804 /// # Ok(())
805 /// # }
806 /// ```
807 pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
808 self.inbox_prefix = prefix.to_string();
809 self
810 }
811
812 /// Sets the name for the client.
813 ///
814 /// # Examples
815 /// ```
816 /// # #[tokio::main]
817 /// # async fn main() -> Result<(), async_nats::Error> {
818 /// async_nats::ConnectOptions::new()
819 /// .name("rust-service")
820 /// .connect("demo.nats.io")
821 /// .await?;
822 /// # Ok(())
823 /// # }
824 /// ```
825 pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
826 self.name = Some(name.to_string());
827 self
828 }
829
830 /// By default, [`ConnectOptions::connect`] will return an error if
831 /// the connection to the server cannot be established.
832 ///
833 /// Setting `retry_on_initial_connect` makes the client
834 /// establish the connection in the background.
835 pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
836 self.retry_on_initial_connect = true;
837 self
838 }
839
840 /// Specifies the number of consecutive reconnect attempts the client will
841 /// make before giving up. This is useful for preventing zombie services
842 /// from endlessly reaching the servers, but it can also be a footgun and
843 /// surprise for users who do not expect that the client can give up
844 /// entirely.
845 ///
846 /// Pass `None` or `0` for no limit.
847 ///
848 /// # Examples
849 /// ```
850 /// # #[tokio::main]
851 /// # async fn main() -> Result<(), async_nats::Error> {
852 /// async_nats::ConnectOptions::new()
853 /// .max_reconnects(None)
854 /// .connect("demo.nats.io")
855 /// .await?;
856 /// # Ok(())
857 /// # }
858 /// ```
859 pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
860 let val: Option<usize> = max_reconnects.into();
861 self.max_reconnects = if val == Some(0) { None } else { val };
862 self
863 }
864
865 /// By default, a server may advertise other servers in the cluster known to it.
866 /// By setting this option, the client will ignore the advertised servers.
867 /// This may be useful if the client may not be able to reach them.
868 pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
869 self.ignore_discovered_servers = true;
870 self
871 }
872
873 /// By default, client will pick random server to which it will try connect to.
874 /// This option disables that feature, forcing it to always respect the order
875 /// in which server addresses were passed.
876 pub fn retain_servers_order(mut self) -> ConnectOptions {
877 self.retain_servers_order = true;
878 self
879 }
880
881 /// Allows passing custom rustls tls config.
882 ///
883 /// # Examples
884 /// ```
885 /// # #[tokio::main]
886 /// # async fn main() -> Result<(), async_nats::Error> {
887 /// let mut root_store = async_nats::rustls::RootCertStore::empty();
888 ///
889 /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
890 ///
891 /// let tls_client = async_nats::rustls::ClientConfig::builder()
892 /// .with_root_certificates(root_store)
893 /// .with_no_client_auth();
894 ///
895 /// let client = async_nats::ConnectOptions::new()
896 /// .require_tls(true)
897 /// .tls_client_config(tls_client)
898 /// .connect("tls://demo.nats.io")
899 /// .await?;
900 ///
901 /// # Ok(())
902 /// # }
903 /// ```
904 pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
905 self.tls_client_config = Some(config);
906 self
907 }
908
909 /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
910 /// protocol messages.
911 ///
912 /// # Examples
913 /// ```
914 /// # #[tokio::main]
915 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
916 /// async_nats::ConnectOptions::new()
917 /// .read_buffer_capacity(65535)
918 /// .connect("demo.nats.io")
919 /// .await?;
920 /// # Ok(())
921 /// # }
922 /// ```
923 pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
924 self.read_buffer_capacity = size;
925 self
926 }
927}
928
929pub(crate) type AsyncCallbackArg1<A, T> =
930 Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
931
932pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
933
934impl<A, T> CallbackArg1<A, T> {
935 pub(crate) async fn call(&self, arg: A) -> T {
936 (self.0.as_ref())(arg).await
937 }
938}
939
940impl<A, T> fmt::Debug for CallbackArg1<A, T> {
941 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
942 f.write_str("callback")
943 }
944}
945
946/// Error report from signing callback.
947// This was needed because std::io::Error isn't Send.
948#[derive(Clone, PartialEq)]
949pub struct AuthError(String);
950
951impl AuthError {
952 pub fn new(s: impl ToString) -> Self {
953 Self(s.to_string())
954 }
955}
956
957impl std::fmt::Display for AuthError {
958 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
959 f.write_str(&format!("AuthError({})", &self.0))
960 }
961}
962
963impl std::fmt::Debug for AuthError {
964 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
965 f.write_str(&format!("AuthError({})", &self.0))
966 }
967}
968
969impl std::error::Error for AuthError {}