Skip to main content

async_nats/
options.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::connector::{ReconnectToServer, ReconnectToServerCallback, Server};
17use crate::{Client, ConnectError, Event, ServerInfo, ToServerAddrs};
18#[cfg(feature = "nkeys")]
19use base64::engine::general_purpose::URL_SAFE_NO_PAD;
20#[cfg(feature = "nkeys")]
21use base64::engine::Engine;
22use futures_util::Future;
23use std::fmt::Formatter;
24use std::net::SocketAddr;
25#[cfg(feature = "nkeys")]
26use std::path::Path;
27use std::{fmt, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
28#[cfg(feature = "nkeys")]
29use tokio::io;
30use tokio_rustls::rustls;
31
32/// Connect options. Used to connect with NATS when custom config is needed.
33/// # Examples
34/// ```no_run
35/// # #[tokio::main]
36/// # async fn main() -> Result<(), async_nats::ConnectError> {
37/// let mut options = async_nats::ConnectOptions::new()
38///     .require_tls(true)
39///     .ping_interval(std::time::Duration::from_secs(10))
40///     .connect("demo.nats.io")
41///     .await?;
42/// # Ok(())
43/// # }
44/// ```
45#[derive(Clone)]
46pub struct ConnectOptions {
47    pub(crate) name: Option<String>,
48    pub(crate) no_echo: bool,
49    pub(crate) max_reconnects: Option<usize>,
50    pub(crate) connection_timeout: Duration,
51    pub(crate) auth: Auth,
52    pub(crate) tls_required: bool,
53    pub(crate) tls_first: bool,
54    pub(crate) certificates: Vec<PathBuf>,
55    pub(crate) client_cert: Option<PathBuf>,
56    pub(crate) client_key: Option<PathBuf>,
57    pub(crate) tls_client_config: Option<rustls::ClientConfig>,
58    pub(crate) ping_interval: Duration,
59    pub(crate) subscription_capacity: usize,
60    pub(crate) sender_capacity: usize,
61    pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
62    pub(crate) inbox_prefix: String,
63    pub(crate) request_timeout: Option<Duration>,
64    pub(crate) retry_on_initial_connect: bool,
65    pub(crate) ignore_discovered_servers: bool,
66    pub(crate) retain_servers_order: bool,
67    pub(crate) read_buffer_capacity: u16,
68    pub(crate) reconnect_delay_callback: Arc<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
69    pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
70    pub(crate) skip_subject_validation: bool,
71    pub(crate) local_address: Option<SocketAddr>,
72    pub(crate) reconnect_to_server_callback: Option<ReconnectToServerCallback>,
73}
74
75impl fmt::Debug for ConnectOptions {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
77        f.debug_map()
78            .entry(&"name", &self.name)
79            .entry(&"no_echo", &self.no_echo)
80            .entry(&"max_reconnects", &self.max_reconnects)
81            .entry(&"connection_timeout", &self.connection_timeout)
82            .entry(&"tls_required", &self.tls_required)
83            .entry(&"certificates", &self.certificates)
84            .entry(&"client_cert", &self.client_cert)
85            .entry(&"client_key", &self.client_key)
86            .entry(&"tls_client_config", &"XXXXXXXX")
87            .entry(&"tls_first", &self.tls_first)
88            .entry(&"ping_interval", &self.ping_interval)
89            .entry(&"sender_capacity", &self.sender_capacity)
90            .entry(&"inbox_prefix", &self.inbox_prefix)
91            .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
92            .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
93            .entry(&"skip_subject_validation", &self.skip_subject_validation)
94            .finish()
95    }
96}
97
98impl Default for ConnectOptions {
99    fn default() -> ConnectOptions {
100        ConnectOptions {
101            name: None,
102            no_echo: false,
103            max_reconnects: None,
104            connection_timeout: Duration::from_secs(5),
105            tls_required: false,
106            tls_first: false,
107            certificates: Vec::new(),
108            client_cert: None,
109            client_key: None,
110            tls_client_config: None,
111            ping_interval: Duration::from_secs(60),
112            sender_capacity: 2048,
113            subscription_capacity: 1024 * 64,
114            event_callback: None,
115            inbox_prefix: "_INBOX".to_string(),
116            request_timeout: Some(Duration::from_secs(10)),
117            retry_on_initial_connect: false,
118            ignore_discovered_servers: false,
119            retain_servers_order: false,
120            read_buffer_capacity: 65535,
121            reconnect_delay_callback: Arc::new(|attempts| {
122                connector::reconnect_delay_callback_default(attempts)
123            }),
124            auth: Default::default(),
125            auth_callback: None,
126            skip_subject_validation: false,
127            local_address: None,
128            reconnect_to_server_callback: None,
129        }
130    }
131}
132
133impl ConnectOptions {
134    /// Enables customization of NATS connection.
135    ///
136    /// # Examples
137    /// ```no_run
138    /// # #[tokio::main]
139    /// # async fn main() -> Result<(), async_nats::ConnectError> {
140    /// let mut options = async_nats::ConnectOptions::new()
141    ///     .require_tls(true)
142    ///     .ping_interval(std::time::Duration::from_secs(10))
143    ///     .connect("demo.nats.io")
144    ///     .await?;
145    /// # Ok(())
146    /// # }
147    /// ```
148    pub fn new() -> ConnectOptions {
149        ConnectOptions::default()
150    }
151
152    /// Connect to the NATS Server leveraging all passed options.
153    ///
154    /// # Examples
155    /// ```no_run
156    /// # #[tokio::main]
157    /// # async fn main() -> Result<(), async_nats::ConnectError> {
158    /// let nc = async_nats::ConnectOptions::new()
159    ///     .require_tls(true)
160    ///     .connect("demo.nats.io")
161    ///     .await?;
162    /// # Ok(())
163    /// # }
164    /// ```
165    ///
166    /// ## Pass multiple URLs.
167    /// ```no_run
168    /// #[tokio::main]
169    /// # async fn main() -> Result<(), async_nats::Error> {
170    /// use async_nats::ServerAddr;
171    /// let client = async_nats::connect(vec![
172    ///     "demo.nats.io".parse::<ServerAddr>()?,
173    ///     "other.nats.io".parse::<ServerAddr>()?,
174    /// ])
175    /// .await
176    /// .unwrap();
177    /// # Ok(())
178    /// # }
179    /// ```
180    pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
181        crate::connect_with_options(addrs, self).await
182    }
183
184    /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
185    /// Requires an asynchronous function that accepts nonce and returns [Auth].
186    /// It will overwrite all other auth methods used.
187    ///
188    ///
189    /// # Example
190    /// ```no_run
191    /// # #[tokio::main]
192    /// # async fn main() -> Result<(), async_nats::ConnectError> {
193    /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
194    ///     let mut auth = async_nats::Auth::new();
195    ///     auth.username = Some("derek".to_string());
196    ///     auth.password = Some("s3cr3t".to_string());
197    ///     Ok(auth)
198    /// })
199    /// .connect("demo.nats.io")
200    /// .await?;
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub fn with_auth_callback<F, Fut>(callback: F) -> Self
205    where
206        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
207        Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
208    {
209        let mut options = ConnectOptions::new();
210        options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Arc::new(
211            move |nonce| Box::pin(callback(nonce)),
212        )));
213        options
214    }
215
216    /// Authenticate against NATS Server with the provided token.
217    ///
218    /// # Examples
219    /// ```no_run
220    /// # #[tokio::main]
221    /// # async fn main() -> Result<(), async_nats::ConnectError> {
222    /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
223    ///     .connect("demo.nats.io")
224    ///     .await?;
225    /// # Ok(())
226    /// # }
227    /// ```
228    pub fn with_token(token: String) -> Self {
229        ConnectOptions::default().token(token)
230    }
231
232    /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
233    /// This can be used as a way to mix authentication methods.
234    ///
235    /// # Examples
236    /// ```no_run
237    /// # #[tokio::main]
238    /// # async fn main() -> Result<(), async_nats::ConnectError> {
239    /// let nc = async_nats::ConnectOptions::new()
240    ///     .token("t0k3n!".into())
241    ///     .connect("demo.nats.io")
242    ///     .await?;
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub fn token(mut self, token: String) -> Self {
247        self.auth.token = Some(token);
248        self
249    }
250
251    /// Authenticate against NATS Server with the provided username and password.
252    ///
253    /// # Examples
254    /// ```no_run
255    /// # #[tokio::main]
256    /// # async fn main() -> Result<(), async_nats::ConnectError> {
257    /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
258    ///     .connect("demo.nats.io")
259    ///     .await?;
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub fn with_user_and_password(user: String, pass: String) -> Self {
264        ConnectOptions::default().user_and_password(user, pass)
265    }
266
267    /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
268    /// This can be used as a way to mix authentication methods.
269    ///
270    /// # Examples
271    /// ```no_run
272    /// # #[tokio::main]
273    /// # async fn main() -> Result<(), async_nats::ConnectError> {
274    /// let nc = async_nats::ConnectOptions::new()
275    ///     .user_and_password("derek".into(), "s3cr3t!".into())
276    ///     .connect("demo.nats.io")
277    ///     .await?;
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub fn user_and_password(mut self, user: String, pass: String) -> Self {
282        self.auth.username = Some(user);
283        self.auth.password = Some(pass);
284        self
285    }
286
287    /// Authenticate with an NKey. Requires an NKey Seed secret.
288    ///
289    /// # Example
290    /// ```no_run
291    /// # #[tokio::main]
292    /// # async fn main() -> Result<(), async_nats::ConnectError> {
293    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
294    /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
295    ///     .connect("localhost")
296    ///     .await?;
297    /// # Ok(())
298    /// # }
299    /// ```
300    #[cfg(feature = "nkeys")]
301    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
302    pub fn with_nkey(seed: String) -> Self {
303        ConnectOptions::default().nkey(seed)
304    }
305
306    /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
307    /// Requires an NKey Seed Secret.
308    /// This can be used as a way to mix authentication methods.
309    ///
310    /// # Example
311    /// ```no_run
312    /// # #[tokio::main]
313    /// # async fn main() -> Result<(), async_nats::ConnectError> {
314    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
315    /// let nc = async_nats::ConnectOptions::new()
316    ///     .nkey(seed.into())
317    ///     .connect("localhost")
318    ///     .await?;
319    /// # Ok(())
320    /// # }
321    /// ```
322    #[cfg(feature = "nkeys")]
323    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
324    pub fn nkey(mut self, seed: String) -> Self {
325        self.auth.nkey = Some(seed);
326        self
327    }
328
329    /// Authenticate with a JWT. Requires function to sign the server nonce.
330    /// The signing function is asynchronous.
331    ///
332    /// # Example
333    /// ```no_run
334    /// # #[tokio::main]
335    /// # async fn main() -> Result<(), async_nats::ConnectError> {
336    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
337    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
338    /// // load jwt from creds file or other secure source
339    /// async fn load_jwt() -> std::io::Result<String> {
340    ///     todo!();
341    /// }
342    /// let jwt = load_jwt().await?;
343    /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
344    ///     let key_pair = key_pair.clone();
345    ///     async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
346    /// })
347    /// .connect("localhost")
348    /// .await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    #[cfg(feature = "nkeys")]
353    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
354    pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
355    where
356        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
357        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
358    {
359        ConnectOptions::default().jwt(jwt, sign_cb)
360    }
361
362    /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
363    /// Requires an asynchronous function to sign the server nonce.
364    /// This can be used as a way to mix authentication methods.
365    ///
366    ///
367    /// # Example
368    /// ```no_run
369    /// # #[tokio::main]
370    /// # async fn main() -> Result<(), async_nats::ConnectError> {
371    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
372    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
373    /// // load jwt from creds file or other secure source
374    /// async fn load_jwt() -> std::io::Result<String> {
375    ///     todo!();
376    /// }
377    /// let jwt = load_jwt().await?;
378    /// let nc = async_nats::ConnectOptions::new()
379    ///     .jwt(jwt, move |nonce| {
380    ///         let key_pair = key_pair.clone();
381    ///         async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
382    ///     })
383    ///     .connect("localhost")
384    ///     .await?;
385    /// # Ok(())
386    /// # }
387    /// ```
388    #[cfg(feature = "nkeys")]
389    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
390    pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
391    where
392        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
393        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
394    {
395        let sign_cb = Arc::new(sign_cb);
396
397        let jwt_sign_callback = CallbackArg1(Arc::new(move |nonce: String| {
398            let sign_cb = sign_cb.clone();
399            Box::pin(async move {
400                let sig = sign_cb(nonce.as_bytes().to_vec())
401                    .await
402                    .map_err(AuthError::new)?;
403                Ok(URL_SAFE_NO_PAD.encode(sig))
404            })
405        }));
406
407        self.auth.jwt = Some(jwt);
408        self.auth.signature_callback = Some(jwt_sign_callback);
409        self
410    }
411
412    /// Authenticate with NATS using a `.creds` file.
413    /// Open the provided file, load its creds,
414    /// and perform the desired authentication
415    ///
416    /// # Example
417    /// ```no_run
418    /// # #[tokio::main]
419    /// # async fn main() -> Result<(), async_nats::ConnectError> {
420    /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
421    ///     .await?
422    ///     .connect("connect.ngs.global")
423    ///     .await?;
424    /// # Ok(())
425    /// # }
426    /// ```
427    #[cfg(feature = "nkeys")]
428    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
429    pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
430        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
431        Self::with_credentials(&cred_file_contents)
432    }
433
434    /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
435    /// This will open the credentials file and load its credentials.
436    /// This can be used as a way to mix authentication methods.
437    ///
438    /// # Example
439    /// ```no_run
440    /// # #[tokio::main]
441    /// # async fn main() -> Result<(), async_nats::ConnectError> {
442    /// let nc = async_nats::ConnectOptions::new()
443    ///     .credentials_file("path/to/my.creds")
444    ///     .await?
445    ///     .connect("connect.ngs.global")
446    ///     .await?;
447    /// # Ok(())
448    /// # }
449    /// ```
450    #[cfg(feature = "nkeys")]
451    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
452    pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
453        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
454        self.credentials(&cred_file_contents)
455    }
456
457    /// Authenticate with NATS using a credential str, in the creds file format.
458    ///
459    /// # Example
460    /// ```no_run
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<(), async_nats::ConnectError> {
463    /// let creds = "-----BEGIN NATS USER JWT-----
464    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
465    /// ------END NATS USER JWT------
466    ///
467    /// ************************* IMPORTANT *************************
468    /// NKEY Seed printed below can be used sign and prove identity.
469    /// NKEYs are sensitive and should be treated as secrets.
470    ///
471    /// -----BEGIN USER NKEY SEED-----
472    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
473    /// ------END USER NKEY SEED------
474    /// ";
475    ///
476    /// let nc = async_nats::ConnectOptions::with_credentials(creds)
477    ///     .expect("failed to parse static creds")
478    ///     .connect("connect.ngs.global")
479    ///     .await?;
480    /// # Ok(())
481    /// # }
482    /// ```
483    #[cfg(feature = "nkeys")]
484    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
485    pub fn with_credentials(creds: &str) -> io::Result<Self> {
486        ConnectOptions::default().credentials(creds)
487    }
488
489    /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
490    /// The string should be in the credentials file format.
491    /// This can be used as a way to mix authentication methods.
492    ///
493    /// # Example
494    /// ```no_run
495    /// # #[tokio::main]
496    /// # async fn main() -> Result<(), async_nats::ConnectError> {
497    /// let creds = "-----BEGIN NATS USER JWT-----
498    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
499    /// ------END NATS USER JWT------
500    ///
501    /// ************************* IMPORTANT *************************
502    /// NKEY Seed printed below can be used sign and prove identity.
503    /// NKEYs are sensitive and should be treated as secrets.
504    ///
505    /// -----BEGIN USER NKEY SEED-----
506    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
507    /// ------END USER NKEY SEED------
508    /// ";
509    ///
510    /// let nc = async_nats::ConnectOptions::new()
511    ///     .credentials(creds)
512    ///     .expect("failed to parse static creds")
513    ///     .connect("connect.ngs.global")
514    ///     .await?;
515    /// # Ok(())
516    /// # }
517    /// ```
518    #[cfg(feature = "nkeys")]
519    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
520    pub fn credentials(self, creds: &str) -> io::Result<Self> {
521        let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
522        let key_pair = std::sync::Arc::new(key_pair);
523
524        Ok(self.jwt(jwt.to_owned(), move |nonce| {
525            let key_pair = key_pair.clone();
526            async move { key_pair.sign(&nonce).map_err(AuthError::new) }
527        }))
528    }
529
530    /// Loads root certificates by providing the path to them.
531    ///
532    /// # Examples
533    /// ```no_run
534    /// # #[tokio::main]
535    /// # async fn main() -> Result<(), async_nats::ConnectError> {
536    /// let nc = async_nats::ConnectOptions::new()
537    ///     .add_root_certificates("mycerts.pem".into())
538    ///     .connect("demo.nats.io")
539    ///     .await?;
540    /// # Ok(())
541    /// # }
542    /// ```
543    pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
544        self.certificates = vec![path];
545        self
546    }
547
548    /// Loads client certificate by providing the path to it.
549    ///
550    /// # Examples
551    /// ```no_run
552    /// # #[tokio::main]
553    /// # async fn main() -> Result<(), async_nats::ConnectError> {
554    /// let nc = async_nats::ConnectOptions::new()
555    ///     .add_client_certificate("cert.pem".into(), "key.pem".into())
556    ///     .connect("demo.nats.io")
557    ///     .await?;
558    /// # Ok(())
559    /// # }
560    /// ```
561    pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
562        self.client_cert = Some(cert);
563        self.client_key = Some(key);
564        self
565    }
566
567    /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
568    ///
569    /// # Examples
570    /// ```no_run
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), async_nats::ConnectError> {
573    /// let nc = async_nats::ConnectOptions::new()
574    ///     .require_tls(true)
575    ///     .connect("demo.nats.io")
576    ///     .await?;
577    /// # Ok(())
578    /// # }
579    /// ```
580    pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
581        self.tls_required = is_required;
582        self
583    }
584
585    /// Changes how tls connection is established. If `tls_first` is set,
586    /// client will try to establish tls before getting info from the server.
587    /// That requires the server to enable `handshake_first` option in the config.
588    pub fn tls_first(mut self) -> ConnectOptions {
589        self.tls_first = true;
590        self.tls_required = true;
591        self
592    }
593
594    /// Sets how often Client sends PING message to the server.
595    ///
596    /// # Examples
597    /// ```no_run
598    /// # use tokio::time::Duration;
599    /// # #[tokio::main]
600    /// # async fn main() -> Result<(), async_nats::ConnectError> {
601    /// async_nats::ConnectOptions::new()
602    ///     .ping_interval(Duration::from_secs(24))
603    ///     .connect("demo.nats.io")
604    ///     .await?;
605    /// # Ok(())
606    /// # }
607    /// ```
608    pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
609        self.ping_interval = ping_interval;
610        self
611    }
612
613    /// Sets `no_echo` option which disables delivering messages that were published from the same
614    /// connection.
615    ///
616    /// # Examples
617    /// ```no_run
618    /// # #[tokio::main]
619    /// # async fn main() -> Result<(), async_nats::ConnectError> {
620    /// async_nats::ConnectOptions::new()
621    ///     .no_echo()
622    ///     .connect("demo.nats.io")
623    ///     .await?;
624    /// # Ok(())
625    /// # }
626    /// ```
627    pub fn no_echo(mut self) -> ConnectOptions {
628        self.no_echo = true;
629        self
630    }
631
632    /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
633    /// callback and drop messages.
634    /// Default is set to 65536 messages buffer.
635    ///
636    /// # Examples
637    /// ```no_run
638    /// # #[tokio::main]
639    /// # async fn main() -> Result<(), async_nats::ConnectError> {
640    /// async_nats::ConnectOptions::new()
641    ///     .subscription_capacity(1024)
642    ///     .connect("demo.nats.io")
643    ///     .await?;
644    /// # Ok(())
645    /// # }
646    /// ```
647    pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
648        self.subscription_capacity = capacity;
649        self
650    }
651
652    /// Sets a timeout for the full connection establishment and handshake to avoid
653    /// hangs and deadlocks. This includes TCP/WebSocket connection, TLS setup,
654    /// waiting for the server INFO message, sending CONNECT/PING, and receiving
655    /// the initial server PONG response. Default is set to 5 seconds.
656    ///
657    /// # Examples
658    /// ```no_run
659    /// # #[tokio::main]
660    /// # async fn main() -> Result<(), async_nats::ConnectError> {
661    /// async_nats::ConnectOptions::new()
662    ///     .connection_timeout(tokio::time::Duration::from_secs(5))
663    ///     .connect("demo.nats.io")
664    ///     .await?;
665    /// # Ok(())
666    /// # }
667    /// ```
668    pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
669        self.connection_timeout = timeout;
670        self
671    }
672
673    /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
674    ///
675    /// # Examples
676    /// ```no_run
677    /// # #[tokio::main]
678    /// # async fn main() -> Result<(), async_nats::ConnectError> {
679    /// async_nats::ConnectOptions::new()
680    ///     .request_timeout(Some(std::time::Duration::from_secs(3)))
681    ///     .connect("demo.nats.io")
682    ///     .await?;
683    /// # Ok(())
684    /// # }
685    /// ```
686    pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
687        self.request_timeout = timeout;
688        self
689    }
690
691    /// Registers an asynchronous callback for errors that are received over the wire from the server.
692    ///
693    /// # Examples
694    /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
695    /// work around this
696    ///
697    /// ## Basic
698    /// If you don't need to move anything into the closure, simple signature can be used:
699    ///
700    /// ```no_run
701    /// # #[tokio::main]
702    /// # async fn main() -> Result<(), async_nats::ConnectError> {
703    /// async_nats::ConnectOptions::new()
704    ///     .event_callback(|event| async move {
705    ///         println!("event occurred: {}", event);
706    ///     })
707    ///     .connect("demo.nats.io")
708    ///     .await?;
709    /// # Ok(())
710    /// # }
711    /// ```
712    ///
713    /// ## Listening to specific event kind
714    /// ```no_run
715    /// # #[tokio::main]
716    /// # async fn main() -> Result<(), async_nats::ConnectError> {
717    /// async_nats::ConnectOptions::new()
718    ///     .event_callback(|event| async move {
719    ///         match event {
720    ///             async_nats::Event::Disconnected => println!("disconnected"),
721    ///             async_nats::Event::Connected => println!("reconnected"),
722    ///             async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
723    ///             other => println!("other event happened: {}", other),
724    ///         }
725    ///     })
726    ///     .connect("demo.nats.io")
727    ///     .await?;
728    /// # Ok(())
729    /// # }
730    /// ```
731    ///
732    /// ## Advanced
733    /// If you need to move something into the closure, here's an example how to do that
734    ///
735    /// ```no_run
736    /// # #[tokio::main]
737    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
738    /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
739    /// async_nats::ConnectOptions::new()
740    ///     .event_callback(move |event| {
741    ///         let tx = tx.clone();
742    ///         async move {
743    ///             tx.send(event).await.unwrap();
744    ///         }
745    ///     })
746    ///     .connect("demo.nats.io")
747    ///     .await?;
748    /// # Ok(())
749    /// # }
750    /// ```
751    pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
752    where
753        F: Fn(Event) -> Fut + Send + Sync + 'static,
754        Fut: Future<Output = ()> + 'static + Send + Sync,
755    {
756        self.event_callback = Some(CallbackArg1::<Event, ()>(Arc::new(move |event| {
757            Box::pin(cb(event))
758        })));
759        self
760    }
761
762    /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
763    ///
764    /// # Examples
765    /// ```no_run
766    /// # #[tokio::main]
767    /// # async fn main() -> Result<(), async_nats::ConnectError> {
768    /// async_nats::ConnectOptions::new()
769    ///     .reconnect_delay_callback(|attempts| {
770    ///         println!("no of attempts: {attempts}");
771    ///         std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
772    ///     })
773    ///     .connect("demo.nats.io")
774    ///     .await?;
775    /// # Ok(())
776    /// # }
777    /// ```
778    pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
779    where
780        F: Fn(usize) -> Duration + Send + Sync + 'static,
781    {
782        self.reconnect_delay_callback = Arc::new(cb);
783        self
784    }
785
786    /// Sets a callback invoked on each reconnect attempt to select a specific
787    /// server from the pool.
788    ///
789    /// The callback receives a snapshot of available servers (with per-server
790    /// metadata such as reconnect count) and the last known [`ServerInfo`].
791    /// It should return a [`ReconnectToServer`] specifying which server to try
792    /// and how long to wait, or `None` to use default server selection.
793    ///
794    /// If the returned server address is not in the pool, the library falls back
795    /// to default selection and emits a [`ClientError`][crate::ClientError] event.
796    ///
797    /// When this callback returns `Some`, its delay takes precedence over
798    /// [`reconnect_delay_callback`][ConnectOptions::reconnect_delay_callback].
799    ///
800    /// # Examples
801    /// ```no_run
802    /// # #[tokio::main]
803    /// # async fn main() -> Result<(), async_nats::ConnectError> {
804    /// async_nats::ConnectOptions::new()
805    ///     .reconnect_to_server_callback(|servers, _info| async move {
806    ///         // Always try the first available server immediately.
807    ///         servers.first().map(|s| async_nats::ReconnectToServer {
808    ///             addr: s.addr.clone(),
809    ///             delay: Some(std::time::Duration::ZERO),
810    ///         })
811    ///     })
812    ///     .connect("demo.nats.io")
813    ///     .await?;
814    /// # Ok(())
815    /// # }
816    /// ```
817    pub fn reconnect_to_server_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
818    where
819        F: Fn(Vec<Server>, ServerInfo) -> Fut + Send + Sync + 'static,
820        Fut: Future<Output = Option<ReconnectToServer>> + Send + Sync + 'static,
821    {
822        self.reconnect_to_server_callback = Some(CallbackArg1(Arc::new(move |(servers, info)| {
823            Box::pin(cb(servers, info))
824        })));
825        self
826    }
827
828    /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
829    /// This option enables overriding it.
830    ///
831    /// # Examples
832    /// ```
833    /// # #[tokio::main]
834    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
835    /// async_nats::ConnectOptions::new()
836    ///     .client_capacity(256)
837    ///     .connect("demo.nats.io")
838    ///     .await?;
839    /// # Ok(())
840    /// # }
841    /// ```
842    pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
843        self.sender_capacity = capacity;
844        self
845    }
846
847    /// Sets custom prefix instead of default `_INBOX`.
848    ///
849    /// # Examples
850    ///
851    /// ```
852    /// # #[tokio::main]
853    /// # async fn main() -> Result<(), async_nats::Error> {
854    /// async_nats::ConnectOptions::new()
855    ///     .custom_inbox_prefix("CUSTOM")
856    ///     .connect("demo.nats.io")
857    ///     .await?;
858    /// # Ok(())
859    /// # }
860    /// ```
861    pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
862        self.inbox_prefix = prefix.to_string();
863        self
864    }
865
866    /// Sets the name for the client.
867    ///
868    /// # Examples
869    /// ```
870    /// # #[tokio::main]
871    /// # async fn main() -> Result<(), async_nats::Error> {
872    /// async_nats::ConnectOptions::new()
873    ///     .name("rust-service")
874    ///     .connect("demo.nats.io")
875    ///     .await?;
876    /// # Ok(())
877    /// # }
878    /// ```
879    pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
880        self.name = Some(name.to_string());
881        self
882    }
883
884    /// By default, [`ConnectOptions::connect`] will return an error if
885    /// the connection to the server cannot be established.
886    ///
887    /// Setting `retry_on_initial_connect` makes the client
888    /// establish the connection in the background.
889    pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
890        self.retry_on_initial_connect = true;
891        self
892    }
893
894    /// Specifies the number of consecutive reconnect attempts the client will
895    /// make before giving up. This is useful for preventing zombie services
896    /// from endlessly reaching the servers, but it can also be a footgun and
897    /// surprise for users who do not expect that the client can give up
898    /// entirely.
899    ///
900    /// Pass `None` or `0` for no limit.
901    ///
902    /// # Examples
903    /// ```
904    /// # #[tokio::main]
905    /// # async fn main() -> Result<(), async_nats::Error> {
906    /// async_nats::ConnectOptions::new()
907    ///     .max_reconnects(None)
908    ///     .connect("demo.nats.io")
909    ///     .await?;
910    /// # Ok(())
911    /// # }
912    /// ```
913    pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
914        let val: Option<usize> = max_reconnects.into();
915        self.max_reconnects = if val == Some(0) { None } else { val };
916        self
917    }
918
919    /// Disables subject validation for publish operations.
920    ///
921    /// By default, the client validates publish subjects to ensure they don't contain
922    /// whitespace characters (space, tab, CR, LF).
923    ///
924    /// This option only affects **publish** validation. Subscribe and queue group
925    /// validation always runs regardless of this setting, matching the behavior
926    /// of the Go and Java NATS clients.
927    ///
928    /// # Warning
929    /// Using invalid subjects may cause protocol errors with the NATS server.
930    /// Only disable validation if you are certain all published subjects are valid.
931    ///
932    /// # Examples
933    /// ```no_run
934    /// # #[tokio::main]
935    /// # async fn main() -> Result<(), async_nats::ConnectError> {
936    /// async_nats::ConnectOptions::new()
937    ///     .skip_subject_validation(true)
938    ///     .connect("demo.nats.io")
939    ///     .await?;
940    /// # Ok(())
941    /// # }
942    /// ```
943    pub fn skip_subject_validation(mut self, skip: bool) -> ConnectOptions {
944        self.skip_subject_validation = skip;
945        self
946    }
947
948    /// By default, a server may advertise other servers in the cluster known to it.
949    /// By setting this option, the client will ignore the advertised servers.
950    /// This may be useful if the client may not be able to reach them.
951    pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
952        self.ignore_discovered_servers = true;
953        self
954    }
955
956    /// By default, client will pick random server to which it will try connect to.
957    /// This option disables that feature, forcing it to always respect the order
958    /// in which server addresses were passed.
959    pub fn retain_servers_order(mut self) -> ConnectOptions {
960        self.retain_servers_order = true;
961        self
962    }
963
964    /// Allows passing custom rustls tls config.
965    ///
966    /// # Examples
967    /// ```
968    /// # #[tokio::main]
969    /// # async fn main() -> Result<(), async_nats::Error> {
970    /// let mut root_store = async_nats::rustls::RootCertStore::empty();
971    ///
972    /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs().certs);
973    ///
974    /// let tls_client = async_nats::rustls::ClientConfig::builder()
975    ///     .with_root_certificates(root_store)
976    ///     .with_no_client_auth();
977    ///
978    /// let client = async_nats::ConnectOptions::new()
979    ///     .require_tls(true)
980    ///     .tls_client_config(tls_client)
981    ///     .connect("tls://demo.nats.io")
982    ///     .await?;
983    ///
984    /// # Ok(())
985    /// # }
986    /// ```
987    pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
988        self.tls_client_config = Some(config);
989        self
990    }
991
992    /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
993    /// protocol messages.
994    ///
995    /// # Examples
996    /// ```
997    /// # #[tokio::main]
998    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
999    /// async_nats::ConnectOptions::new()
1000    ///     .read_buffer_capacity(65535)
1001    ///     .connect("demo.nats.io")
1002    ///     .await?;
1003    /// # Ok(())
1004    /// # }
1005    /// ```
1006    pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
1007        self.read_buffer_capacity = size;
1008        self
1009    }
1010
1011    /// Sets the local socket address that the client will bind to when connecting
1012    /// to the server. This is useful when the client machine has multiple
1013    /// network interfaces and you want to control which one is used, or when
1014    /// you need to bind to a specific local port.
1015    ///
1016    /// Use port `0` to let the operating system assign an ephemeral port.
1017    ///
1018    /// # Examples
1019    /// ```no_run
1020    /// # #[tokio::main]
1021    /// # async fn main() -> Result<(), async_nats::ConnectError> {
1022    /// // Bind to a specific IP with an OS-assigned port
1023    /// let addr: std::net::SocketAddr = "192.168.1.10:0".parse().unwrap();
1024    /// async_nats::ConnectOptions::new()
1025    ///     .local_address(addr)
1026    ///     .connect("demo.nats.io")
1027    ///     .await?;
1028    ///
1029    /// // Bind to a specific IP and port
1030    /// let addr: std::net::SocketAddr = "192.168.1.10:9898".parse().unwrap();
1031    /// async_nats::ConnectOptions::new()
1032    ///     .local_address(addr)
1033    ///     .connect("demo.nats.io")
1034    ///     .await?;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub fn local_address(mut self, address: SocketAddr) -> ConnectOptions {
1039        self.local_address = Some(address);
1040        self
1041    }
1042}
1043
1044pub(crate) type AsyncCallbackArg1<A, T> =
1045    Arc<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
1046
1047#[derive(Clone)]
1048pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
1049
1050impl<A, T> CallbackArg1<A, T> {
1051    pub(crate) async fn call(&self, arg: A) -> T {
1052        (self.0.as_ref())(arg).await
1053    }
1054}
1055
1056impl<A, T> fmt::Debug for CallbackArg1<A, T> {
1057    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1058        f.write_str("callback")
1059    }
1060}
1061
1062/// Error report from signing callback.
1063// This was needed because std::io::Error isn't Send.
1064#[derive(Clone, PartialEq)]
1065pub struct AuthError(String);
1066
1067impl AuthError {
1068    pub fn new(s: impl ToString) -> Self {
1069        Self(s.to_string())
1070    }
1071}
1072
1073impl std::fmt::Display for AuthError {
1074    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1075        f.write_str(&format!("AuthError({})", &self.0))
1076    }
1077}
1078
1079impl std::fmt::Debug for AuthError {
1080    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1081        f.write_str(&format!("AuthError({})", &self.0))
1082    }
1083}
1084
1085impl std::error::Error for AuthError {}