async_nats_flyradar/options.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::{connector, Dialer};
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17use base64::engine::general_purpose::URL_SAFE_NO_PAD;
18use base64::engine::Engine;
19use futures::Future;
20use std::fmt::Formatter;
21use std::{
22 fmt,
23 path::{Path, PathBuf},
24 pin::Pin,
25 sync::Arc,
26 time::Duration,
27};
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37/// .require_tls(true)
38/// .ping_interval(std::time::Duration::from_secs(10))
39/// .connect("demo.nats.io")
40/// .await?;
41/// # Ok(())
42/// # }
43/// ```
44pub struct ConnectOptions {
45 pub(crate) name: Option<String>,
46 pub(crate) no_echo: bool,
47 pub(crate) max_reconnects: Option<usize>,
48 pub(crate) connection_timeout: Duration,
49 pub(crate) auth: Auth,
50 pub(crate) tls_required: bool,
51 pub(crate) tls_first: bool,
52 pub(crate) certificates: Vec<PathBuf>,
53 pub(crate) client_cert: Option<PathBuf>,
54 pub(crate) client_key: Option<PathBuf>,
55 pub(crate) tls_client_config: Option<rustls::ClientConfig>,
56 pub(crate) ping_interval: Duration,
57 pub(crate) subscription_capacity: usize,
58 pub(crate) sender_capacity: usize,
59 pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
60 pub(crate) inbox_prefix: String,
61 pub(crate) request_timeout: Option<Duration>,
62 pub(crate) retry_on_initial_connect: bool,
63 pub(crate) ignore_discovered_servers: bool,
64 pub(crate) retain_servers_order: bool,
65 pub(crate) read_buffer_capacity: u16,
66 pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
67 pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
68 //INFO: Diff starts
69 pub(crate) dialer: Option<Arc<dyn Dialer + Send + Sync>>,
70 //INFO: Diff ends
71}
72
73impl fmt::Debug for ConnectOptions {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
75 f.debug_map()
76 .entry(&"name", &self.name)
77 .entry(&"no_echo", &self.no_echo)
78 .entry(&"max_reconnects", &self.max_reconnects)
79 .entry(&"connection_timeout", &self.connection_timeout)
80 .entry(&"tls_required", &self.tls_required)
81 .entry(&"certificates", &self.certificates)
82 .entry(&"client_cert", &self.client_cert)
83 .entry(&"client_key", &self.client_key)
84 .entry(&"tls_client_config", &"XXXXXXXX")
85 .entry(&"tls_first", &self.tls_first)
86 .entry(&"ping_interval", &self.ping_interval)
87 .entry(&"sender_capacity", &self.sender_capacity)
88 .entry(&"inbox_prefix", &self.inbox_prefix)
89 .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
90 .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
91 .finish()
92 }
93}
94
95impl Default for ConnectOptions {
96 fn default() -> ConnectOptions {
97 ConnectOptions {
98 name: None,
99 no_echo: false,
100 max_reconnects: None,
101 connection_timeout: Duration::from_secs(5),
102 tls_required: false,
103 tls_first: false,
104 certificates: Vec::new(),
105 client_cert: None,
106 client_key: None,
107 tls_client_config: None,
108 ping_interval: Duration::from_secs(60),
109 sender_capacity: 2048,
110 subscription_capacity: 1024 * 64,
111 event_callback: None,
112 inbox_prefix: "_INBOX".to_string(),
113 request_timeout: Some(Duration::from_secs(10)),
114 retry_on_initial_connect: false,
115 ignore_discovered_servers: false,
116 retain_servers_order: false,
117 read_buffer_capacity: 65535,
118 reconnect_delay_callback: Box::new(|attempts| {
119 connector::reconnect_delay_callback_default(attempts)
120 }),
121 auth: Default::default(),
122 auth_callback: None,
123 //INFO: Diff starts
124 dialer: None,
125 //INFO: Diff ends
126 }
127 }
128}
129
130impl ConnectOptions {
131 /// Enables customization of NATS connection.
132 ///
133 /// # Examples
134 /// ```no_run
135 /// # #[tokio::main]
136 /// # async fn main() -> Result<(), async_nats::ConnectError> {
137 /// let mut options = async_nats::ConnectOptions::new()
138 /// .require_tls(true)
139 /// .ping_interval(std::time::Duration::from_secs(10))
140 /// .connect("demo.nats.io")
141 /// .await?;
142 /// # Ok(())
143 /// # }
144 /// ```
145 pub fn new() -> ConnectOptions {
146 ConnectOptions::default()
147 }
148
149 /// Connect to the NATS Server leveraging all passed options.
150 ///
151 /// # Examples
152 /// ```no_run
153 /// # #[tokio::main]
154 /// # async fn main() -> Result<(), async_nats::ConnectError> {
155 /// let nc = async_nats::ConnectOptions::new()
156 /// .require_tls(true)
157 /// .connect("demo.nats.io")
158 /// .await?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// ## Pass multiple URLs.
164 /// ```no_run
165 /// #[tokio::main]
166 /// # async fn main() -> Result<(), async_nats::Error> {
167 /// use async_nats::ServerAddr;
168 /// let client = async_nats::connect(vec![
169 /// "demo.nats.io".parse::<ServerAddr>()?,
170 /// "other.nats.io".parse::<ServerAddr>()?,
171 /// ])
172 /// .await
173 /// .unwrap();
174 /// # Ok(())
175 /// # }
176 /// ```
177 pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
178 crate::connect_with_options(addrs, self).await
179 }
180
181 //INFO: Diff starts
182 pub fn with_dialer(mut self, dialer: Arc<dyn Dialer + Send + Sync>) -> Self {
183 self.dialer = Some(dialer);
184 self
185 }
186 //INFO: Diff ends
187
188 /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
189 /// Requires an asynchronous function that accepts nonce and returns [Auth].
190 /// It will overwrite all other auth methods used.
191 ///
192 ///
193 /// # Example
194 /// ```no_run
195 /// # #[tokio::main]
196 /// # async fn main() -> Result<(), async_nats::ConnectError> {
197 /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
198 /// let mut auth = async_nats::Auth::new();
199 /// auth.username = Some("derek".to_string());
200 /// auth.password = Some("s3cr3t".to_string());
201 /// Ok(auth)
202 /// })
203 /// .connect("demo.nats.io")
204 /// .await?;
205 /// # Ok(())
206 /// # }
207 /// ```
208 pub fn with_auth_callback<F, Fut>(callback: F) -> Self
209 where
210 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
211 Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
212 {
213 let mut options = ConnectOptions::new();
214 options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
215 move |nonce| Box::pin(callback(nonce)),
216 )));
217 options
218 }
219
220 /// Authenticate against NATS Server with the provided token.
221 ///
222 /// # Examples
223 /// ```no_run
224 /// # #[tokio::main]
225 /// # async fn main() -> Result<(), async_nats::ConnectError> {
226 /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
227 /// .connect("demo.nats.io")
228 /// .await?;
229 /// # Ok(())
230 /// # }
231 /// ```
232 pub fn with_token(token: String) -> Self {
233 ConnectOptions::default().token(token)
234 }
235
236 /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
237 /// This can be used as a way to mix authentication methods.
238 ///
239 /// # Examples
240 /// ```no_run
241 /// # #[tokio::main]
242 /// # async fn main() -> Result<(), async_nats::ConnectError> {
243 /// let nc = async_nats::ConnectOptions::new()
244 /// .token("t0k3n!".into())
245 /// .connect("demo.nats.io")
246 /// .await?;
247 /// # Ok(())
248 /// # }
249 /// ```
250 pub fn token(mut self, token: String) -> Self {
251 self.auth.token = Some(token);
252 self
253 }
254
255 /// Authenticate against NATS Server with the provided username and password.
256 ///
257 /// # Examples
258 /// ```no_run
259 /// # #[tokio::main]
260 /// # async fn main() -> Result<(), async_nats::ConnectError> {
261 /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
262 /// .connect("demo.nats.io")
263 /// .await?;
264 /// # Ok(())
265 /// # }
266 /// ```
267 pub fn with_user_and_password(user: String, pass: String) -> Self {
268 ConnectOptions::default().user_and_password(user, pass)
269 }
270
271 /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
272 /// This can be used as a way to mix authentication methods.
273 ///
274 /// # Examples
275 /// ```no_run
276 /// # #[tokio::main]
277 /// # async fn main() -> Result<(), async_nats::ConnectError> {
278 /// let nc = async_nats::ConnectOptions::new()
279 /// .user_and_password("derek".into(), "s3cr3t!".into())
280 /// .connect("demo.nats.io")
281 /// .await?;
282 /// # Ok(())
283 /// # }
284 /// ```
285 pub fn user_and_password(mut self, user: String, pass: String) -> Self {
286 self.auth.username = Some(user);
287 self.auth.password = Some(pass);
288 self
289 }
290
291 /// Authenticate with an NKey. Requires an NKey Seed secret.
292 ///
293 /// # Example
294 /// ```no_run
295 /// # #[tokio::main]
296 /// # async fn main() -> Result<(), async_nats::ConnectError> {
297 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
298 /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
299 /// .connect("localhost")
300 /// .await?;
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn with_nkey(seed: String) -> Self {
305 ConnectOptions::default().nkey(seed)
306 }
307
308 /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
309 /// Requires an NKey Seed Secret.
310 /// This can be used as a way to mix authentication methods.
311 ///
312 /// # Example
313 /// ```no_run
314 /// # #[tokio::main]
315 /// # async fn main() -> Result<(), async_nats::ConnectError> {
316 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
317 /// let nc = async_nats::ConnectOptions::new()
318 /// .nkey(seed.into())
319 /// .connect("localhost")
320 /// .await?;
321 /// # Ok(())
322 /// # }
323 /// ```
324 pub fn nkey(mut self, seed: String) -> Self {
325 self.auth.nkey = Some(seed);
326 self
327 }
328
329 /// Authenticate with a JWT. Requires function to sign the server nonce.
330 /// The signing function is asynchronous.
331 ///
332 /// # Example
333 /// ```no_run
334 /// # #[tokio::main]
335 /// # async fn main() -> Result<(), async_nats::ConnectError> {
336 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
337 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
338 /// // load jwt from creds file or other secure source
339 /// async fn load_jwt() -> std::io::Result<String> {
340 /// todo!();
341 /// }
342 /// let jwt = load_jwt().await?;
343 /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
344 /// let key_pair = key_pair.clone();
345 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
346 /// })
347 /// .connect("localhost")
348 /// .await?;
349 /// # Ok(())
350 /// # }
351 /// ```
352 pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
353 where
354 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
355 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
356 {
357 ConnectOptions::default().jwt(jwt, sign_cb)
358 }
359
360 /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
361 /// Requires an asynchronous function to sign the server nonce.
362 /// This can be used as a way to mix authentication methods.
363 ///
364 ///
365 /// # Example
366 /// ```no_run
367 /// # #[tokio::main]
368 /// # async fn main() -> Result<(), async_nats::ConnectError> {
369 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
370 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
371 /// // load jwt from creds file or other secure source
372 /// async fn load_jwt() -> std::io::Result<String> {
373 /// todo!();
374 /// }
375 /// let jwt = load_jwt().await?;
376 /// let nc = async_nats::ConnectOptions::new()
377 /// .jwt(jwt, move |nonce| {
378 /// let key_pair = key_pair.clone();
379 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
380 /// })
381 /// .connect("localhost")
382 /// .await?;
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
387 where
388 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
389 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
390 {
391 let sign_cb = Arc::new(sign_cb);
392
393 let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
394 let sign_cb = sign_cb.clone();
395 Box::pin(async move {
396 let sig = sign_cb(nonce.as_bytes().to_vec())
397 .await
398 .map_err(AuthError::new)?;
399 Ok(URL_SAFE_NO_PAD.encode(sig))
400 })
401 }));
402
403 self.auth.jwt = Some(jwt);
404 self.auth.signature_callback = Some(jwt_sign_callback);
405 self
406 }
407
408 /// Authenticate with NATS using a `.creds` file.
409 /// Open the provided file, load its creds,
410 /// and perform the desired authentication
411 ///
412 /// # Example
413 /// ```no_run
414 /// # #[tokio::main]
415 /// # async fn main() -> Result<(), async_nats::ConnectError> {
416 /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
417 /// .await?
418 /// .connect("connect.ngs.global")
419 /// .await?;
420 /// # Ok(())
421 /// # }
422 /// ```
423 pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
424 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
425 Self::with_credentials(&cred_file_contents)
426 }
427
428 /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
429 /// This will open the credentials file and load its credentials.
430 /// This can be used as a way to mix authentication methods.
431 ///
432 /// # Example
433 /// ```no_run
434 /// # #[tokio::main]
435 /// # async fn main() -> Result<(), async_nats::ConnectError> {
436 /// let nc = async_nats::ConnectOptions::new()
437 /// .credentials_file("path/to/my.creds")
438 /// .await?
439 /// .connect("connect.ngs.global")
440 /// .await?;
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
445 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
446 self.credentials(&cred_file_contents)
447 }
448
449 /// Authenticate with NATS using a credential str, in the creds file format.
450 ///
451 /// # Example
452 /// ```no_run
453 /// # #[tokio::main]
454 /// # async fn main() -> Result<(), async_nats::ConnectError> {
455 /// let creds = "-----BEGIN NATS USER JWT-----
456 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
457 /// ------END NATS USER JWT------
458 ///
459 /// ************************* IMPORTANT *************************
460 /// NKEY Seed printed below can be used sign and prove identity.
461 /// NKEYs are sensitive and should be treated as secrets.
462 ///
463 /// -----BEGIN USER NKEY SEED-----
464 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
465 /// ------END USER NKEY SEED------
466 /// ";
467 ///
468 /// let nc = async_nats::ConnectOptions::with_credentials(creds)
469 /// .expect("failed to parse static creds")
470 /// .connect("connect.ngs.global")
471 /// .await?;
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub fn with_credentials(creds: &str) -> io::Result<Self> {
476 ConnectOptions::default().credentials(creds)
477 }
478
479 /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
480 /// The string should be in the credentials file format.
481 /// This can be used as a way to mix authentication methods.
482 ///
483 /// # Example
484 /// ```no_run
485 /// # #[tokio::main]
486 /// # async fn main() -> Result<(), async_nats::ConnectError> {
487 /// let creds = "-----BEGIN NATS USER JWT-----
488 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
489 /// ------END NATS USER JWT------
490 ///
491 /// ************************* IMPORTANT *************************
492 /// NKEY Seed printed below can be used sign and prove identity.
493 /// NKEYs are sensitive and should be treated as secrets.
494 ///
495 /// -----BEGIN USER NKEY SEED-----
496 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
497 /// ------END USER NKEY SEED------
498 /// ";
499 ///
500 /// let nc = async_nats::ConnectOptions::new()
501 /// .credentials(creds)
502 /// .expect("failed to parse static creds")
503 /// .connect("connect.ngs.global")
504 /// .await?;
505 /// # Ok(())
506 /// # }
507 /// ```
508 pub fn credentials(self, creds: &str) -> io::Result<Self> {
509 let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
510 let key_pair = std::sync::Arc::new(key_pair);
511
512 Ok(self.jwt(jwt.to_owned(), move |nonce| {
513 let key_pair = key_pair.clone();
514 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
515 }))
516 }
517
518 /// Loads root certificates by providing the path to them.
519 ///
520 /// # Examples
521 /// ```no_run
522 /// # #[tokio::main]
523 /// # async fn main() -> Result<(), async_nats::ConnectError> {
524 /// let nc = async_nats::ConnectOptions::new()
525 /// .add_root_certificates("mycerts.pem".into())
526 /// .connect("demo.nats.io")
527 /// .await?;
528 /// # Ok(())
529 /// # }
530 /// ```
531 pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
532 self.certificates = vec![path];
533 self
534 }
535
536 /// Loads client certificate by providing the path to it.
537 ///
538 /// # Examples
539 /// ```no_run
540 /// # #[tokio::main]
541 /// # async fn main() -> Result<(), async_nats::ConnectError> {
542 /// let nc = async_nats::ConnectOptions::new()
543 /// .add_client_certificate("cert.pem".into(), "key.pem".into())
544 /// .connect("demo.nats.io")
545 /// .await?;
546 /// # Ok(())
547 /// # }
548 /// ```
549 pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
550 self.client_cert = Some(cert);
551 self.client_key = Some(key);
552 self
553 }
554
555 /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
556 ///
557 /// # Examples
558 /// ```no_run
559 /// # #[tokio::main]
560 /// # async fn main() -> Result<(), async_nats::ConnectError> {
561 /// let nc = async_nats::ConnectOptions::new()
562 /// .require_tls(true)
563 /// .connect("demo.nats.io")
564 /// .await?;
565 /// # Ok(())
566 /// # }
567 /// ```
568 pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
569 self.tls_required = is_required;
570 self
571 }
572
573 /// Changes how tls connection is established. If `tls_first` is set,
574 /// client will try to establish tls before getting info from the server.
575 /// That requires the server to enable `handshake_first` option in the config.
576 pub fn tls_first(mut self) -> ConnectOptions {
577 self.tls_first = true;
578 self.tls_required = true;
579 self
580 }
581
582 /// Sets how often Client sends PING message to the server.
583 ///
584 /// # Examples
585 /// ```no_run
586 /// # use tokio::time::Duration;
587 /// # #[tokio::main]
588 /// # async fn main() -> Result<(), async_nats::ConnectError> {
589 /// async_nats::ConnectOptions::new()
590 /// .ping_interval(Duration::from_secs(24))
591 /// .connect("demo.nats.io")
592 /// .await?;
593 /// # Ok(())
594 /// # }
595 /// ```
596 pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
597 self.ping_interval = ping_interval;
598 self
599 }
600
601 /// Sets `no_echo` option which disables delivering messages that were published from the same
602 /// connection.
603 ///
604 /// # Examples
605 /// ```no_run
606 /// # #[tokio::main]
607 /// # async fn main() -> Result<(), async_nats::ConnectError> {
608 /// async_nats::ConnectOptions::new()
609 /// .no_echo()
610 /// .connect("demo.nats.io")
611 /// .await?;
612 /// # Ok(())
613 /// # }
614 /// ```
615 pub fn no_echo(mut self) -> ConnectOptions {
616 self.no_echo = true;
617 self
618 }
619
620 /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
621 /// callback and drop messages.
622 /// Default is set to 65536 messages buffer.
623 ///
624 /// # Examples
625 /// ```no_run
626 /// # #[tokio::main]
627 /// # async fn main() -> Result<(), async_nats::ConnectError> {
628 /// async_nats::ConnectOptions::new()
629 /// .subscription_capacity(1024)
630 /// .connect("demo.nats.io")
631 /// .await?;
632 /// # Ok(())
633 /// # }
634 /// ```
635 pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
636 self.subscription_capacity = capacity;
637 self
638 }
639
640 /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
641 /// Default is set to 5 seconds.
642 ///
643 /// # Examples
644 /// ```no_run
645 /// # #[tokio::main]
646 /// # async fn main() -> Result<(), async_nats::ConnectError> {
647 /// async_nats::ConnectOptions::new()
648 /// .connection_timeout(tokio::time::Duration::from_secs(5))
649 /// .connect("demo.nats.io")
650 /// .await?;
651 /// # Ok(())
652 /// # }
653 /// ```
654 pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
655 self.connection_timeout = timeout;
656 self
657 }
658
659 /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
660 ///
661 /// # Examples
662 /// ```no_run
663 /// # #[tokio::main]
664 /// # async fn main() -> Result<(), async_nats::ConnectError> {
665 /// async_nats::ConnectOptions::new()
666 /// .request_timeout(Some(std::time::Duration::from_secs(3)))
667 /// .connect("demo.nats.io")
668 /// .await?;
669 /// # Ok(())
670 /// # }
671 /// ```
672 pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
673 self.request_timeout = timeout;
674 self
675 }
676
677 /// Registers an asynchronous callback for errors that are received over the wire from the server.
678 ///
679 /// # Examples
680 /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
681 /// work around this
682 ///
683 /// ## Basic
684 /// If you don't need to move anything into the closure, simple signature can be used:
685 ///
686 /// ```no_run
687 /// # #[tokio::main]
688 /// # async fn main() -> Result<(), async_nats::ConnectError> {
689 /// async_nats::ConnectOptions::new()
690 /// .event_callback(|event| async move {
691 /// println!("event occurred: {}", event);
692 /// })
693 /// .connect("demo.nats.io")
694 /// .await?;
695 /// # Ok(())
696 /// # }
697 /// ```
698 ///
699 /// ## Listening to specific event kind
700 /// ```no_run
701 /// # #[tokio::main]
702 /// # async fn main() -> Result<(), async_nats::ConnectError> {
703 /// async_nats::ConnectOptions::new()
704 /// .event_callback(|event| async move {
705 /// match event {
706 /// async_nats::Event::Disconnected => println!("disconnected"),
707 /// async_nats::Event::Connected => println!("reconnected"),
708 /// async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
709 /// other => println!("other event happened: {}", other),
710 /// }
711 /// })
712 /// .connect("demo.nats.io")
713 /// .await?;
714 /// # Ok(())
715 /// # }
716 /// ```
717 ///
718 /// ## Advanced
719 /// If you need to move something into the closure, here's an example how to do that
720 ///
721 /// ```no_run
722 /// # #[tokio::main]
723 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
724 /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
725 /// async_nats::ConnectOptions::new()
726 /// .event_callback(move |event| {
727 /// let tx = tx.clone();
728 /// async move {
729 /// tx.send(event).await.unwrap();
730 /// }
731 /// })
732 /// .connect("demo.nats.io")
733 /// .await?;
734 /// # Ok(())
735 /// # }
736 /// ```
737 pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
738 where
739 F: Fn(Event) -> Fut + Send + Sync + 'static,
740 Fut: Future<Output = ()> + 'static + Send + Sync,
741 {
742 self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
743 Box::pin(cb(event))
744 })));
745 self
746 }
747
748 /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
749 ///
750 /// # Examples
751 /// ```no_run
752 /// # #[tokio::main]
753 /// # async fn main() -> Result<(), async_nats::ConnectError> {
754 /// async_nats::ConnectOptions::new()
755 /// .reconnect_delay_callback(|attempts| {
756 /// println!("no of attempts: {attempts}");
757 /// std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
758 /// })
759 /// .connect("demo.nats.io")
760 /// .await?;
761 /// # Ok(())
762 /// # }
763 /// ```
764 pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
765 where
766 F: Fn(usize) -> Duration + Send + Sync + 'static,
767 {
768 self.reconnect_delay_callback = Box::new(cb);
769 self
770 }
771
772 /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
773 /// This option enables overriding it.
774 ///
775 /// # Examples
776 /// ```
777 /// # #[tokio::main]
778 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
779 /// async_nats::ConnectOptions::new()
780 /// .client_capacity(256)
781 /// .connect("demo.nats.io")
782 /// .await?;
783 /// # Ok(())
784 /// # }
785 /// ```
786 pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
787 self.sender_capacity = capacity;
788 self
789 }
790
791 /// Sets custom prefix instead of default `_INBOX`.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # #[tokio::main]
797 /// # async fn main() -> Result<(), async_nats::Error> {
798 /// async_nats::ConnectOptions::new()
799 /// .custom_inbox_prefix("CUSTOM")
800 /// .connect("demo.nats.io")
801 /// .await?;
802 /// # Ok(())
803 /// # }
804 /// ```
805 pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
806 self.inbox_prefix = prefix.to_string();
807 self
808 }
809
810 /// Sets the name for the client.
811 ///
812 /// # Examples
813 /// ```
814 /// # #[tokio::main]
815 /// # async fn main() -> Result<(), async_nats::Error> {
816 /// async_nats::ConnectOptions::new()
817 /// .name("rust-service")
818 /// .connect("demo.nats.io")
819 /// .await?;
820 /// # Ok(())
821 /// # }
822 /// ```
823 pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
824 self.name = Some(name.to_string());
825 self
826 }
827
828 /// By default, [`ConnectOptions::connect`] will return an error if
829 /// the connection to the server cannot be established.
830 ///
831 /// Setting `retry_on_initial_connect` makes the client
832 /// establish the connection in the background.
833 pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
834 self.retry_on_initial_connect = true;
835 self
836 }
837
838 /// Specifies the number of consecutive reconnect attempts the client will
839 /// make before giving up. This is useful for preventing zombie services
840 /// from endlessly reaching the servers, but it can also be a footgun and
841 /// surprise for users who do not expect that the client can give up
842 /// entirely.
843 ///
844 /// Pass `None` or `0` for no limit.
845 ///
846 /// # Examples
847 /// ```
848 /// # #[tokio::main]
849 /// # async fn main() -> Result<(), async_nats::Error> {
850 /// async_nats::ConnectOptions::new()
851 /// .max_reconnects(None)
852 /// .connect("demo.nats.io")
853 /// .await?;
854 /// # Ok(())
855 /// # }
856 /// ```
857 pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
858 let val: Option<usize> = max_reconnects.into();
859 self.max_reconnects = if val == Some(0) { None } else { val };
860 self
861 }
862
863 /// By default, a server may advertise other servers in the cluster known to it.
864 /// By setting this option, the client will ignore the advertised servers.
865 /// This may be useful if the client may not be able to reach them.
866 pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
867 self.ignore_discovered_servers = true;
868 self
869 }
870
871 /// By default, client will pick random server to which it will try connect to.
872 /// This option disables that feature, forcing it to always respect the order
873 /// in which server addresses were passed.
874 pub fn retain_servers_order(mut self) -> ConnectOptions {
875 self.retain_servers_order = true;
876 self
877 }
878
879 /// Allows passing custom rustls tls config.
880 ///
881 /// # Examples
882 /// ```
883 /// # #[tokio::main]
884 /// # async fn main() -> Result<(), async_nats::Error> {
885 /// let mut root_store = async_nats::rustls::RootCertStore::empty();
886 ///
887 /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
888 ///
889 /// let tls_client = async_nats::rustls::ClientConfig::builder()
890 /// .with_root_certificates(root_store)
891 /// .with_no_client_auth();
892 ///
893 /// let client = async_nats::ConnectOptions::new()
894 /// .require_tls(true)
895 /// .tls_client_config(tls_client)
896 /// .connect("tls://demo.nats.io")
897 /// .await?;
898 ///
899 /// # Ok(())
900 /// # }
901 /// ```
902 pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
903 self.tls_client_config = Some(config);
904 self
905 }
906
907 /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
908 /// protocol messages.
909 ///
910 /// # Examples
911 /// ```
912 /// # #[tokio::main]
913 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
914 /// async_nats::ConnectOptions::new()
915 /// .read_buffer_capacity(65535)
916 /// .connect("demo.nats.io")
917 /// .await?;
918 /// # Ok(())
919 /// # }
920 /// ```
921 pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
922 self.read_buffer_capacity = size;
923 self
924 }
925}
926
927pub(crate) type AsyncCallbackArg1<A, T> =
928 Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
929
930pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
931
932impl<A, T> CallbackArg1<A, T> {
933 pub(crate) async fn call(&self, arg: A) -> T {
934 (self.0.as_ref())(arg).await
935 }
936}
937
938impl<A, T> fmt::Debug for CallbackArg1<A, T> {
939 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
940 f.write_str("callback")
941 }
942}
943
944/// Error report from signing callback.
945// This was needed because std::io::Error isn't Send.
946#[derive(Clone, PartialEq)]
947pub struct AuthError(String);
948
949impl AuthError {
950 pub fn new(s: impl ToString) -> Self {
951 Self(s.to_string())
952 }
953}
954
955impl std::fmt::Display for AuthError {
956 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
957 f.write_str(&format!("AuthError({})", &self.0))
958 }
959}
960
961impl std::fmt::Debug for AuthError {
962 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
963 f.write_str(&format!("AuthError({})", &self.0))
964 }
965}
966
967impl std::error::Error for AuthError {}