async_nats/
options.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17#[cfg(feature = "nkeys")]
18use base64::engine::general_purpose::URL_SAFE_NO_PAD;
19#[cfg(feature = "nkeys")]
20use base64::engine::Engine;
21use futures_util::Future;
22use std::fmt::Formatter;
23use std::{fmt, path::PathBuf, pin::Pin, time::Duration};
24#[cfg(feature = "nkeys")]
25use std::{path::Path, sync::Arc};
26#[cfg(feature = "nkeys")]
27use tokio::io;
28use tokio_rustls::rustls;
29
30/// Connect options. Used to connect with NATS when custom config is needed.
31/// # Examples
32/// ```no_run
33/// # #[tokio::main]
34/// # async fn main() -> Result<(), async_nats::ConnectError> {
35/// let mut options = async_nats::ConnectOptions::new()
36///     .require_tls(true)
37///     .ping_interval(std::time::Duration::from_secs(10))
38///     .connect("demo.nats.io")
39///     .await?;
40/// # Ok(())
41/// # }
42/// ```
43pub struct ConnectOptions {
44    pub(crate) name: Option<String>,
45    pub(crate) no_echo: bool,
46    pub(crate) max_reconnects: Option<usize>,
47    pub(crate) connection_timeout: Duration,
48    pub(crate) auth: Auth,
49    pub(crate) tls_required: bool,
50    pub(crate) tls_first: bool,
51    pub(crate) certificates: Vec<PathBuf>,
52    pub(crate) client_cert: Option<PathBuf>,
53    pub(crate) client_key: Option<PathBuf>,
54    pub(crate) tls_client_config: Option<rustls::ClientConfig>,
55    pub(crate) ping_interval: Duration,
56    pub(crate) subscription_capacity: usize,
57    pub(crate) sender_capacity: usize,
58    pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
59    pub(crate) inbox_prefix: String,
60    pub(crate) request_timeout: Option<Duration>,
61    pub(crate) retry_on_initial_connect: bool,
62    pub(crate) ignore_discovered_servers: bool,
63    pub(crate) retain_servers_order: bool,
64    pub(crate) read_buffer_capacity: u16,
65    pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
66    pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
67}
68
69impl fmt::Debug for ConnectOptions {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
71        f.debug_map()
72            .entry(&"name", &self.name)
73            .entry(&"no_echo", &self.no_echo)
74            .entry(&"max_reconnects", &self.max_reconnects)
75            .entry(&"connection_timeout", &self.connection_timeout)
76            .entry(&"tls_required", &self.tls_required)
77            .entry(&"certificates", &self.certificates)
78            .entry(&"client_cert", &self.client_cert)
79            .entry(&"client_key", &self.client_key)
80            .entry(&"tls_client_config", &"XXXXXXXX")
81            .entry(&"tls_first", &self.tls_first)
82            .entry(&"ping_interval", &self.ping_interval)
83            .entry(&"sender_capacity", &self.sender_capacity)
84            .entry(&"inbox_prefix", &self.inbox_prefix)
85            .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
86            .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
87            .finish()
88    }
89}
90
91impl Default for ConnectOptions {
92    fn default() -> ConnectOptions {
93        ConnectOptions {
94            name: None,
95            no_echo: false,
96            max_reconnects: None,
97            connection_timeout: Duration::from_secs(5),
98            tls_required: false,
99            tls_first: false,
100            certificates: Vec::new(),
101            client_cert: None,
102            client_key: None,
103            tls_client_config: None,
104            ping_interval: Duration::from_secs(60),
105            sender_capacity: 2048,
106            subscription_capacity: 1024 * 64,
107            event_callback: None,
108            inbox_prefix: "_INBOX".to_string(),
109            request_timeout: Some(Duration::from_secs(10)),
110            retry_on_initial_connect: false,
111            ignore_discovered_servers: false,
112            retain_servers_order: false,
113            read_buffer_capacity: 65535,
114            reconnect_delay_callback: Box::new(|attempts| {
115                connector::reconnect_delay_callback_default(attempts)
116            }),
117            auth: Default::default(),
118            auth_callback: None,
119        }
120    }
121}
122
123impl ConnectOptions {
124    /// Enables customization of NATS connection.
125    ///
126    /// # Examples
127    /// ```no_run
128    /// # #[tokio::main]
129    /// # async fn main() -> Result<(), async_nats::ConnectError> {
130    /// let mut options = async_nats::ConnectOptions::new()
131    ///     .require_tls(true)
132    ///     .ping_interval(std::time::Duration::from_secs(10))
133    ///     .connect("demo.nats.io")
134    ///     .await?;
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub fn new() -> ConnectOptions {
139        ConnectOptions::default()
140    }
141
142    /// Connect to the NATS Server leveraging all passed options.
143    ///
144    /// # Examples
145    /// ```no_run
146    /// # #[tokio::main]
147    /// # async fn main() -> Result<(), async_nats::ConnectError> {
148    /// let nc = async_nats::ConnectOptions::new()
149    ///     .require_tls(true)
150    ///     .connect("demo.nats.io")
151    ///     .await?;
152    /// # Ok(())
153    /// # }
154    /// ```
155    ///
156    /// ## Pass multiple URLs.
157    /// ```no_run
158    /// #[tokio::main]
159    /// # async fn main() -> Result<(), async_nats::Error> {
160    /// use async_nats::ServerAddr;
161    /// let client = async_nats::connect(vec![
162    ///     "demo.nats.io".parse::<ServerAddr>()?,
163    ///     "other.nats.io".parse::<ServerAddr>()?,
164    /// ])
165    /// .await
166    /// .unwrap();
167    /// # Ok(())
168    /// # }
169    /// ```
170    pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
171        crate::connect_with_options(addrs, self).await
172    }
173
174    /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
175    /// Requires an asynchronous function that accepts nonce and returns [Auth].
176    /// It will overwrite all other auth methods used.
177    ///
178    ///
179    /// # Example
180    /// ```no_run
181    /// # #[tokio::main]
182    /// # async fn main() -> Result<(), async_nats::ConnectError> {
183    /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
184    ///     let mut auth = async_nats::Auth::new();
185    ///     auth.username = Some("derek".to_string());
186    ///     auth.password = Some("s3cr3t".to_string());
187    ///     Ok(auth)
188    /// })
189    /// .connect("demo.nats.io")
190    /// .await?;
191    /// # Ok(())
192    /// # }
193    /// ```
194    pub fn with_auth_callback<F, Fut>(callback: F) -> Self
195    where
196        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
197        Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
198    {
199        let mut options = ConnectOptions::new();
200        options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
201            move |nonce| Box::pin(callback(nonce)),
202        )));
203        options
204    }
205
206    /// Authenticate against NATS Server with the provided token.
207    ///
208    /// # Examples
209    /// ```no_run
210    /// # #[tokio::main]
211    /// # async fn main() -> Result<(), async_nats::ConnectError> {
212    /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
213    ///     .connect("demo.nats.io")
214    ///     .await?;
215    /// # Ok(())
216    /// # }
217    /// ```
218    pub fn with_token(token: String) -> Self {
219        ConnectOptions::default().token(token)
220    }
221
222    /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
223    /// This can be used as a way to mix authentication methods.
224    ///
225    /// # Examples
226    /// ```no_run
227    /// # #[tokio::main]
228    /// # async fn main() -> Result<(), async_nats::ConnectError> {
229    /// let nc = async_nats::ConnectOptions::new()
230    ///     .token("t0k3n!".into())
231    ///     .connect("demo.nats.io")
232    ///     .await?;
233    /// # Ok(())
234    /// # }
235    /// ```
236    pub fn token(mut self, token: String) -> Self {
237        self.auth.token = Some(token);
238        self
239    }
240
241    /// Authenticate against NATS Server with the provided username and password.
242    ///
243    /// # Examples
244    /// ```no_run
245    /// # #[tokio::main]
246    /// # async fn main() -> Result<(), async_nats::ConnectError> {
247    /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
248    ///     .connect("demo.nats.io")
249    ///     .await?;
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub fn with_user_and_password(user: String, pass: String) -> Self {
254        ConnectOptions::default().user_and_password(user, pass)
255    }
256
257    /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
258    /// This can be used as a way to mix authentication methods.
259    ///
260    /// # Examples
261    /// ```no_run
262    /// # #[tokio::main]
263    /// # async fn main() -> Result<(), async_nats::ConnectError> {
264    /// let nc = async_nats::ConnectOptions::new()
265    ///     .user_and_password("derek".into(), "s3cr3t!".into())
266    ///     .connect("demo.nats.io")
267    ///     .await?;
268    /// # Ok(())
269    /// # }
270    /// ```
271    pub fn user_and_password(mut self, user: String, pass: String) -> Self {
272        self.auth.username = Some(user);
273        self.auth.password = Some(pass);
274        self
275    }
276
277    /// Authenticate with an NKey. Requires an NKey Seed secret.
278    ///
279    /// # Example
280    /// ```no_run
281    /// # #[tokio::main]
282    /// # async fn main() -> Result<(), async_nats::ConnectError> {
283    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
284    /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
285    ///     .connect("localhost")
286    ///     .await?;
287    /// # Ok(())
288    /// # }
289    /// ```
290    #[cfg(feature = "nkeys")]
291    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
292    pub fn with_nkey(seed: String) -> Self {
293        ConnectOptions::default().nkey(seed)
294    }
295
296    /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
297    /// Requires an NKey Seed Secret.
298    /// This can be used as a way to mix authentication methods.
299    ///
300    /// # Example
301    /// ```no_run
302    /// # #[tokio::main]
303    /// # async fn main() -> Result<(), async_nats::ConnectError> {
304    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
305    /// let nc = async_nats::ConnectOptions::new()
306    ///     .nkey(seed.into())
307    ///     .connect("localhost")
308    ///     .await?;
309    /// # Ok(())
310    /// # }
311    /// ```
312    #[cfg(feature = "nkeys")]
313    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
314    pub fn nkey(mut self, seed: String) -> Self {
315        self.auth.nkey = Some(seed);
316        self
317    }
318
319    /// Authenticate with a JWT. Requires function to sign the server nonce.
320    /// The signing function is asynchronous.
321    ///
322    /// # Example
323    /// ```no_run
324    /// # #[tokio::main]
325    /// # async fn main() -> Result<(), async_nats::ConnectError> {
326    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
327    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
328    /// // load jwt from creds file or other secure source
329    /// async fn load_jwt() -> std::io::Result<String> {
330    ///     todo!();
331    /// }
332    /// let jwt = load_jwt().await?;
333    /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
334    ///     let key_pair = key_pair.clone();
335    ///     async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
336    /// })
337    /// .connect("localhost")
338    /// .await?;
339    /// # Ok(())
340    /// # }
341    /// ```
342    #[cfg(feature = "nkeys")]
343    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
344    pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
345    where
346        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
347        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
348    {
349        ConnectOptions::default().jwt(jwt, sign_cb)
350    }
351
352    /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
353    /// Requires an asynchronous function to sign the server nonce.
354    /// This can be used as a way to mix authentication methods.
355    ///
356    ///
357    /// # Example
358    /// ```no_run
359    /// # #[tokio::main]
360    /// # async fn main() -> Result<(), async_nats::ConnectError> {
361    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
362    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
363    /// // load jwt from creds file or other secure source
364    /// async fn load_jwt() -> std::io::Result<String> {
365    ///     todo!();
366    /// }
367    /// let jwt = load_jwt().await?;
368    /// let nc = async_nats::ConnectOptions::new()
369    ///     .jwt(jwt, move |nonce| {
370    ///         let key_pair = key_pair.clone();
371    ///         async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
372    ///     })
373    ///     .connect("localhost")
374    ///     .await?;
375    /// # Ok(())
376    /// # }
377    /// ```
378    #[cfg(feature = "nkeys")]
379    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
380    pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
381    where
382        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
383        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
384    {
385        let sign_cb = Arc::new(sign_cb);
386
387        let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
388            let sign_cb = sign_cb.clone();
389            Box::pin(async move {
390                let sig = sign_cb(nonce.as_bytes().to_vec())
391                    .await
392                    .map_err(AuthError::new)?;
393                Ok(URL_SAFE_NO_PAD.encode(sig))
394            })
395        }));
396
397        self.auth.jwt = Some(jwt);
398        self.auth.signature_callback = Some(jwt_sign_callback);
399        self
400    }
401
402    /// Authenticate with NATS using a `.creds` file.
403    /// Open the provided file, load its creds,
404    /// and perform the desired authentication
405    ///
406    /// # Example
407    /// ```no_run
408    /// # #[tokio::main]
409    /// # async fn main() -> Result<(), async_nats::ConnectError> {
410    /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
411    ///     .await?
412    ///     .connect("connect.ngs.global")
413    ///     .await?;
414    /// # Ok(())
415    /// # }
416    /// ```
417    #[cfg(feature = "nkeys")]
418    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
419    pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
420        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
421        Self::with_credentials(&cred_file_contents)
422    }
423
424    /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
425    /// This will open the credentials file and load its credentials.
426    /// This can be used as a way to mix authentication methods.
427    ///
428    /// # Example
429    /// ```no_run
430    /// # #[tokio::main]
431    /// # async fn main() -> Result<(), async_nats::ConnectError> {
432    /// let nc = async_nats::ConnectOptions::new()
433    ///     .credentials_file("path/to/my.creds")
434    ///     .await?
435    ///     .connect("connect.ngs.global")
436    ///     .await?;
437    /// # Ok(())
438    /// # }
439    /// ```
440    #[cfg(feature = "nkeys")]
441    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
442    pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
443        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
444        self.credentials(&cred_file_contents)
445    }
446
447    /// Authenticate with NATS using a credential str, in the creds file format.
448    ///
449    /// # Example
450    /// ```no_run
451    /// # #[tokio::main]
452    /// # async fn main() -> Result<(), async_nats::ConnectError> {
453    /// let creds = "-----BEGIN NATS USER JWT-----
454    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
455    /// ------END NATS USER JWT------
456    ///
457    /// ************************* IMPORTANT *************************
458    /// NKEY Seed printed below can be used sign and prove identity.
459    /// NKEYs are sensitive and should be treated as secrets.
460    ///
461    /// -----BEGIN USER NKEY SEED-----
462    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
463    /// ------END USER NKEY SEED------
464    /// ";
465    ///
466    /// let nc = async_nats::ConnectOptions::with_credentials(creds)
467    ///     .expect("failed to parse static creds")
468    ///     .connect("connect.ngs.global")
469    ///     .await?;
470    /// # Ok(())
471    /// # }
472    /// ```
473    #[cfg(feature = "nkeys")]
474    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
475    pub fn with_credentials(creds: &str) -> io::Result<Self> {
476        ConnectOptions::default().credentials(creds)
477    }
478
479    /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
480    /// The string should be in the credentials file format.
481    /// This can be used as a way to mix authentication methods.
482    ///
483    /// # Example
484    /// ```no_run
485    /// # #[tokio::main]
486    /// # async fn main() -> Result<(), async_nats::ConnectError> {
487    /// let creds = "-----BEGIN NATS USER JWT-----
488    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
489    /// ------END NATS USER JWT------
490    ///
491    /// ************************* IMPORTANT *************************
492    /// NKEY Seed printed below can be used sign and prove identity.
493    /// NKEYs are sensitive and should be treated as secrets.
494    ///
495    /// -----BEGIN USER NKEY SEED-----
496    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
497    /// ------END USER NKEY SEED------
498    /// ";
499    ///
500    /// let nc = async_nats::ConnectOptions::new()
501    ///     .credentials(creds)
502    ///     .expect("failed to parse static creds")
503    ///     .connect("connect.ngs.global")
504    ///     .await?;
505    /// # Ok(())
506    /// # }
507    /// ```
508    #[cfg(feature = "nkeys")]
509    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
510    pub fn credentials(self, creds: &str) -> io::Result<Self> {
511        let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
512        let key_pair = std::sync::Arc::new(key_pair);
513
514        Ok(self.jwt(jwt.to_owned(), move |nonce| {
515            let key_pair = key_pair.clone();
516            async move { key_pair.sign(&nonce).map_err(AuthError::new) }
517        }))
518    }
519
520    /// Loads root certificates by providing the path to them.
521    ///
522    /// # Examples
523    /// ```no_run
524    /// # #[tokio::main]
525    /// # async fn main() -> Result<(), async_nats::ConnectError> {
526    /// let nc = async_nats::ConnectOptions::new()
527    ///     .add_root_certificates("mycerts.pem".into())
528    ///     .connect("demo.nats.io")
529    ///     .await?;
530    /// # Ok(())
531    /// # }
532    /// ```
533    pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
534        self.certificates = vec![path];
535        self
536    }
537
538    /// Loads client certificate by providing the path to it.
539    ///
540    /// # Examples
541    /// ```no_run
542    /// # #[tokio::main]
543    /// # async fn main() -> Result<(), async_nats::ConnectError> {
544    /// let nc = async_nats::ConnectOptions::new()
545    ///     .add_client_certificate("cert.pem".into(), "key.pem".into())
546    ///     .connect("demo.nats.io")
547    ///     .await?;
548    /// # Ok(())
549    /// # }
550    /// ```
551    pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
552        self.client_cert = Some(cert);
553        self.client_key = Some(key);
554        self
555    }
556
557    /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
558    ///
559    /// # Examples
560    /// ```no_run
561    /// # #[tokio::main]
562    /// # async fn main() -> Result<(), async_nats::ConnectError> {
563    /// let nc = async_nats::ConnectOptions::new()
564    ///     .require_tls(true)
565    ///     .connect("demo.nats.io")
566    ///     .await?;
567    /// # Ok(())
568    /// # }
569    /// ```
570    pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
571        self.tls_required = is_required;
572        self
573    }
574
575    /// Changes how tls connection is established. If `tls_first` is set,
576    /// client will try to establish tls before getting info from the server.
577    /// That requires the server to enable `handshake_first` option in the config.
578    pub fn tls_first(mut self) -> ConnectOptions {
579        self.tls_first = true;
580        self.tls_required = true;
581        self
582    }
583
584    /// Sets how often Client sends PING message to the server.
585    ///
586    /// # Examples
587    /// ```no_run
588    /// # use tokio::time::Duration;
589    /// # #[tokio::main]
590    /// # async fn main() -> Result<(), async_nats::ConnectError> {
591    /// async_nats::ConnectOptions::new()
592    ///     .ping_interval(Duration::from_secs(24))
593    ///     .connect("demo.nats.io")
594    ///     .await?;
595    /// # Ok(())
596    /// # }
597    /// ```
598    pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
599        self.ping_interval = ping_interval;
600        self
601    }
602
603    /// Sets `no_echo` option which disables delivering messages that were published from the same
604    /// connection.
605    ///
606    /// # Examples
607    /// ```no_run
608    /// # #[tokio::main]
609    /// # async fn main() -> Result<(), async_nats::ConnectError> {
610    /// async_nats::ConnectOptions::new()
611    ///     .no_echo()
612    ///     .connect("demo.nats.io")
613    ///     .await?;
614    /// # Ok(())
615    /// # }
616    /// ```
617    pub fn no_echo(mut self) -> ConnectOptions {
618        self.no_echo = true;
619        self
620    }
621
622    /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
623    /// callback and drop messages.
624    /// Default is set to 65536 messages buffer.
625    ///
626    /// # Examples
627    /// ```no_run
628    /// # #[tokio::main]
629    /// # async fn main() -> Result<(), async_nats::ConnectError> {
630    /// async_nats::ConnectOptions::new()
631    ///     .subscription_capacity(1024)
632    ///     .connect("demo.nats.io")
633    ///     .await?;
634    /// # Ok(())
635    /// # }
636    /// ```
637    pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
638        self.subscription_capacity = capacity;
639        self
640    }
641
642    /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
643    /// Default is set to 5 seconds.
644    ///
645    /// # Examples
646    /// ```no_run
647    /// # #[tokio::main]
648    /// # async fn main() -> Result<(), async_nats::ConnectError> {
649    /// async_nats::ConnectOptions::new()
650    ///     .connection_timeout(tokio::time::Duration::from_secs(5))
651    ///     .connect("demo.nats.io")
652    ///     .await?;
653    /// # Ok(())
654    /// # }
655    /// ```
656    pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
657        self.connection_timeout = timeout;
658        self
659    }
660
661    /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
662    ///
663    /// # Examples
664    /// ```no_run
665    /// # #[tokio::main]
666    /// # async fn main() -> Result<(), async_nats::ConnectError> {
667    /// async_nats::ConnectOptions::new()
668    ///     .request_timeout(Some(std::time::Duration::from_secs(3)))
669    ///     .connect("demo.nats.io")
670    ///     .await?;
671    /// # Ok(())
672    /// # }
673    /// ```
674    pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
675        self.request_timeout = timeout;
676        self
677    }
678
679    /// Registers an asynchronous callback for errors that are received over the wire from the server.
680    ///
681    /// # Examples
682    /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
683    /// work around this
684    ///
685    /// ## Basic
686    /// If you don't need to move anything into the closure, simple signature can be used:
687    ///
688    /// ```no_run
689    /// # #[tokio::main]
690    /// # async fn main() -> Result<(), async_nats::ConnectError> {
691    /// async_nats::ConnectOptions::new()
692    ///     .event_callback(|event| async move {
693    ///         println!("event occurred: {}", event);
694    ///     })
695    ///     .connect("demo.nats.io")
696    ///     .await?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    ///
701    /// ## Listening to specific event kind
702    /// ```no_run
703    /// # #[tokio::main]
704    /// # async fn main() -> Result<(), async_nats::ConnectError> {
705    /// async_nats::ConnectOptions::new()
706    ///     .event_callback(|event| async move {
707    ///         match event {
708    ///             async_nats::Event::Disconnected => println!("disconnected"),
709    ///             async_nats::Event::Connected => println!("reconnected"),
710    ///             async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
711    ///             other => println!("other event happened: {}", other),
712    ///         }
713    ///     })
714    ///     .connect("demo.nats.io")
715    ///     .await?;
716    /// # Ok(())
717    /// # }
718    /// ```
719    ///
720    /// ## Advanced
721    /// If you need to move something into the closure, here's an example how to do that
722    ///
723    /// ```no_run
724    /// # #[tokio::main]
725    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
726    /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
727    /// async_nats::ConnectOptions::new()
728    ///     .event_callback(move |event| {
729    ///         let tx = tx.clone();
730    ///         async move {
731    ///             tx.send(event).await.unwrap();
732    ///         }
733    ///     })
734    ///     .connect("demo.nats.io")
735    ///     .await?;
736    /// # Ok(())
737    /// # }
738    /// ```
739    pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
740    where
741        F: Fn(Event) -> Fut + Send + Sync + 'static,
742        Fut: Future<Output = ()> + 'static + Send + Sync,
743    {
744        self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
745            Box::pin(cb(event))
746        })));
747        self
748    }
749
750    /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
751    ///
752    /// # Examples
753    /// ```no_run
754    /// # #[tokio::main]
755    /// # async fn main() -> Result<(), async_nats::ConnectError> {
756    /// async_nats::ConnectOptions::new()
757    ///     .reconnect_delay_callback(|attempts| {
758    ///         println!("no of attempts: {attempts}");
759    ///         std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
760    ///     })
761    ///     .connect("demo.nats.io")
762    ///     .await?;
763    /// # Ok(())
764    /// # }
765    /// ```
766    pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
767    where
768        F: Fn(usize) -> Duration + Send + Sync + 'static,
769    {
770        self.reconnect_delay_callback = Box::new(cb);
771        self
772    }
773
774    /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
775    /// This option enables overriding it.
776    ///
777    /// # Examples
778    /// ```
779    /// # #[tokio::main]
780    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
781    /// async_nats::ConnectOptions::new()
782    ///     .client_capacity(256)
783    ///     .connect("demo.nats.io")
784    ///     .await?;
785    /// # Ok(())
786    /// # }
787    /// ```
788    pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
789        self.sender_capacity = capacity;
790        self
791    }
792
793    /// Sets custom prefix instead of default `_INBOX`.
794    ///
795    /// # Examples
796    ///
797    /// ```
798    /// # #[tokio::main]
799    /// # async fn main() -> Result<(), async_nats::Error> {
800    /// async_nats::ConnectOptions::new()
801    ///     .custom_inbox_prefix("CUSTOM")
802    ///     .connect("demo.nats.io")
803    ///     .await?;
804    /// # Ok(())
805    /// # }
806    /// ```
807    pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
808        self.inbox_prefix = prefix.to_string();
809        self
810    }
811
812    /// Sets the name for the client.
813    ///
814    /// # Examples
815    /// ```
816    /// # #[tokio::main]
817    /// # async fn main() -> Result<(), async_nats::Error> {
818    /// async_nats::ConnectOptions::new()
819    ///     .name("rust-service")
820    ///     .connect("demo.nats.io")
821    ///     .await?;
822    /// # Ok(())
823    /// # }
824    /// ```
825    pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
826        self.name = Some(name.to_string());
827        self
828    }
829
830    /// By default, [`ConnectOptions::connect`] will return an error if
831    /// the connection to the server cannot be established.
832    ///
833    /// Setting `retry_on_initial_connect` makes the client
834    /// establish the connection in the background.
835    pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
836        self.retry_on_initial_connect = true;
837        self
838    }
839
840    /// Specifies the number of consecutive reconnect attempts the client will
841    /// make before giving up. This is useful for preventing zombie services
842    /// from endlessly reaching the servers, but it can also be a footgun and
843    /// surprise for users who do not expect that the client can give up
844    /// entirely.
845    ///
846    /// Pass `None` or `0` for no limit.
847    ///
848    /// # Examples
849    /// ```
850    /// # #[tokio::main]
851    /// # async fn main() -> Result<(), async_nats::Error> {
852    /// async_nats::ConnectOptions::new()
853    ///     .max_reconnects(None)
854    ///     .connect("demo.nats.io")
855    ///     .await?;
856    /// # Ok(())
857    /// # }
858    /// ```
859    pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
860        let val: Option<usize> = max_reconnects.into();
861        self.max_reconnects = if val == Some(0) { None } else { val };
862        self
863    }
864
865    /// By default, a server may advertise other servers in the cluster known to it.
866    /// By setting this option, the client will ignore the advertised servers.
867    /// This may be useful if the client may not be able to reach them.
868    pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
869        self.ignore_discovered_servers = true;
870        self
871    }
872
873    /// By default, client will pick random server to which it will try connect to.
874    /// This option disables that feature, forcing it to always respect the order
875    /// in which server addresses were passed.
876    pub fn retain_servers_order(mut self) -> ConnectOptions {
877        self.retain_servers_order = true;
878        self
879    }
880
881    /// Allows passing custom rustls tls config.
882    ///
883    /// # Examples
884    /// ```
885    /// # #[tokio::main]
886    /// # async fn main() -> Result<(), async_nats::Error> {
887    /// let mut root_store = async_nats::rustls::RootCertStore::empty();
888    ///
889    /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
890    ///
891    /// let tls_client = async_nats::rustls::ClientConfig::builder()
892    ///     .with_root_certificates(root_store)
893    ///     .with_no_client_auth();
894    ///
895    /// let client = async_nats::ConnectOptions::new()
896    ///     .require_tls(true)
897    ///     .tls_client_config(tls_client)
898    ///     .connect("tls://demo.nats.io")
899    ///     .await?;
900    ///
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
905        self.tls_client_config = Some(config);
906        self
907    }
908
909    /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
910    /// protocol messages.
911    ///
912    /// # Examples
913    /// ```
914    /// # #[tokio::main]
915    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
916    /// async_nats::ConnectOptions::new()
917    ///     .read_buffer_capacity(65535)
918    ///     .connect("demo.nats.io")
919    ///     .await?;
920    /// # Ok(())
921    /// # }
922    /// ```
923    pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
924        self.read_buffer_capacity = size;
925        self
926    }
927}
928
929pub(crate) type AsyncCallbackArg1<A, T> =
930    Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
931
932pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
933
934impl<A, T> CallbackArg1<A, T> {
935    pub(crate) async fn call(&self, arg: A) -> T {
936        (self.0.as_ref())(arg).await
937    }
938}
939
940impl<A, T> fmt::Debug for CallbackArg1<A, T> {
941    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
942        f.write_str("callback")
943    }
944}
945
946/// Error report from signing callback.
947// This was needed because std::io::Error isn't Send.
948#[derive(Clone, PartialEq)]
949pub struct AuthError(String);
950
951impl AuthError {
952    pub fn new(s: impl ToString) -> Self {
953        Self(s.to_string())
954    }
955}
956
957impl std::fmt::Display for AuthError {
958    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
959        f.write_str(&format!("AuthError({})", &self.0))
960    }
961}
962
963impl std::fmt::Debug for AuthError {
964    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
965        f.write_str(&format!("AuthError({})", &self.0))
966    }
967}
968
969impl std::error::Error for AuthError {}