Skip to main content

async_nats/
options.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17#[cfg(feature = "nkeys")]
18use base64::engine::general_purpose::URL_SAFE_NO_PAD;
19#[cfg(feature = "nkeys")]
20use base64::engine::Engine;
21use futures_util::Future;
22use std::fmt::Formatter;
23use std::net::SocketAddr;
24#[cfg(feature = "nkeys")]
25use std::path::Path;
26use std::{fmt, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
27#[cfg(feature = "nkeys")]
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37///     .require_tls(true)
38///     .ping_interval(std::time::Duration::from_secs(10))
39///     .connect("demo.nats.io")
40///     .await?;
41/// # Ok(())
42/// # }
43/// ```
44#[derive(Clone)]
45pub struct ConnectOptions {
46    pub(crate) name: Option<String>,
47    pub(crate) no_echo: bool,
48    pub(crate) max_reconnects: Option<usize>,
49    pub(crate) connection_timeout: Duration,
50    pub(crate) auth: Auth,
51    pub(crate) tls_required: bool,
52    pub(crate) tls_first: bool,
53    pub(crate) certificates: Vec<PathBuf>,
54    pub(crate) client_cert: Option<PathBuf>,
55    pub(crate) client_key: Option<PathBuf>,
56    pub(crate) tls_client_config: Option<rustls::ClientConfig>,
57    pub(crate) ping_interval: Duration,
58    pub(crate) subscription_capacity: usize,
59    pub(crate) sender_capacity: usize,
60    pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
61    pub(crate) inbox_prefix: String,
62    pub(crate) request_timeout: Option<Duration>,
63    pub(crate) retry_on_initial_connect: bool,
64    pub(crate) ignore_discovered_servers: bool,
65    pub(crate) retain_servers_order: bool,
66    pub(crate) read_buffer_capacity: u16,
67    pub(crate) reconnect_delay_callback: Arc<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
68    pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
69    pub(crate) skip_subject_validation: bool,
70    pub(crate) local_address: Option<SocketAddr>,
71}
72
73impl fmt::Debug for ConnectOptions {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
75        f.debug_map()
76            .entry(&"name", &self.name)
77            .entry(&"no_echo", &self.no_echo)
78            .entry(&"max_reconnects", &self.max_reconnects)
79            .entry(&"connection_timeout", &self.connection_timeout)
80            .entry(&"tls_required", &self.tls_required)
81            .entry(&"certificates", &self.certificates)
82            .entry(&"client_cert", &self.client_cert)
83            .entry(&"client_key", &self.client_key)
84            .entry(&"tls_client_config", &"XXXXXXXX")
85            .entry(&"tls_first", &self.tls_first)
86            .entry(&"ping_interval", &self.ping_interval)
87            .entry(&"sender_capacity", &self.sender_capacity)
88            .entry(&"inbox_prefix", &self.inbox_prefix)
89            .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
90            .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
91            .entry(&"skip_subject_validation", &self.skip_subject_validation)
92            .finish()
93    }
94}
95
96impl Default for ConnectOptions {
97    fn default() -> ConnectOptions {
98        ConnectOptions {
99            name: None,
100            no_echo: false,
101            max_reconnects: None,
102            connection_timeout: Duration::from_secs(5),
103            tls_required: false,
104            tls_first: false,
105            certificates: Vec::new(),
106            client_cert: None,
107            client_key: None,
108            tls_client_config: None,
109            ping_interval: Duration::from_secs(60),
110            sender_capacity: 2048,
111            subscription_capacity: 1024 * 64,
112            event_callback: None,
113            inbox_prefix: "_INBOX".to_string(),
114            request_timeout: Some(Duration::from_secs(10)),
115            retry_on_initial_connect: false,
116            ignore_discovered_servers: false,
117            retain_servers_order: false,
118            read_buffer_capacity: 65535,
119            reconnect_delay_callback: Arc::new(|attempts| {
120                connector::reconnect_delay_callback_default(attempts)
121            }),
122            auth: Default::default(),
123            auth_callback: None,
124            skip_subject_validation: false,
125            local_address: None,
126        }
127    }
128}
129
130impl ConnectOptions {
131    /// Enables customization of NATS connection.
132    ///
133    /// # Examples
134    /// ```no_run
135    /// # #[tokio::main]
136    /// # async fn main() -> Result<(), async_nats::ConnectError> {
137    /// let mut options = async_nats::ConnectOptions::new()
138    ///     .require_tls(true)
139    ///     .ping_interval(std::time::Duration::from_secs(10))
140    ///     .connect("demo.nats.io")
141    ///     .await?;
142    /// # Ok(())
143    /// # }
144    /// ```
145    pub fn new() -> ConnectOptions {
146        ConnectOptions::default()
147    }
148
149    /// Connect to the NATS Server leveraging all passed options.
150    ///
151    /// # Examples
152    /// ```no_run
153    /// # #[tokio::main]
154    /// # async fn main() -> Result<(), async_nats::ConnectError> {
155    /// let nc = async_nats::ConnectOptions::new()
156    ///     .require_tls(true)
157    ///     .connect("demo.nats.io")
158    ///     .await?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// ## Pass multiple URLs.
164    /// ```no_run
165    /// #[tokio::main]
166    /// # async fn main() -> Result<(), async_nats::Error> {
167    /// use async_nats::ServerAddr;
168    /// let client = async_nats::connect(vec![
169    ///     "demo.nats.io".parse::<ServerAddr>()?,
170    ///     "other.nats.io".parse::<ServerAddr>()?,
171    /// ])
172    /// .await
173    /// .unwrap();
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
178        crate::connect_with_options(addrs, self).await
179    }
180
181    /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
182    /// Requires an asynchronous function that accepts nonce and returns [Auth].
183    /// It will overwrite all other auth methods used.
184    ///
185    ///
186    /// # Example
187    /// ```no_run
188    /// # #[tokio::main]
189    /// # async fn main() -> Result<(), async_nats::ConnectError> {
190    /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
191    ///     let mut auth = async_nats::Auth::new();
192    ///     auth.username = Some("derek".to_string());
193    ///     auth.password = Some("s3cr3t".to_string());
194    ///     Ok(auth)
195    /// })
196    /// .connect("demo.nats.io")
197    /// .await?;
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub fn with_auth_callback<F, Fut>(callback: F) -> Self
202    where
203        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
204        Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
205    {
206        let mut options = ConnectOptions::new();
207        options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Arc::new(
208            move |nonce| Box::pin(callback(nonce)),
209        )));
210        options
211    }
212
213    /// Authenticate against NATS Server with the provided token.
214    ///
215    /// # Examples
216    /// ```no_run
217    /// # #[tokio::main]
218    /// # async fn main() -> Result<(), async_nats::ConnectError> {
219    /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
220    ///     .connect("demo.nats.io")
221    ///     .await?;
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn with_token(token: String) -> Self {
226        ConnectOptions::default().token(token)
227    }
228
229    /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
230    /// This can be used as a way to mix authentication methods.
231    ///
232    /// # Examples
233    /// ```no_run
234    /// # #[tokio::main]
235    /// # async fn main() -> Result<(), async_nats::ConnectError> {
236    /// let nc = async_nats::ConnectOptions::new()
237    ///     .token("t0k3n!".into())
238    ///     .connect("demo.nats.io")
239    ///     .await?;
240    /// # Ok(())
241    /// # }
242    /// ```
243    pub fn token(mut self, token: String) -> Self {
244        self.auth.token = Some(token);
245        self
246    }
247
248    /// Authenticate against NATS Server with the provided username and password.
249    ///
250    /// # Examples
251    /// ```no_run
252    /// # #[tokio::main]
253    /// # async fn main() -> Result<(), async_nats::ConnectError> {
254    /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
255    ///     .connect("demo.nats.io")
256    ///     .await?;
257    /// # Ok(())
258    /// # }
259    /// ```
260    pub fn with_user_and_password(user: String, pass: String) -> Self {
261        ConnectOptions::default().user_and_password(user, pass)
262    }
263
264    /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
265    /// This can be used as a way to mix authentication methods.
266    ///
267    /// # Examples
268    /// ```no_run
269    /// # #[tokio::main]
270    /// # async fn main() -> Result<(), async_nats::ConnectError> {
271    /// let nc = async_nats::ConnectOptions::new()
272    ///     .user_and_password("derek".into(), "s3cr3t!".into())
273    ///     .connect("demo.nats.io")
274    ///     .await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub fn user_and_password(mut self, user: String, pass: String) -> Self {
279        self.auth.username = Some(user);
280        self.auth.password = Some(pass);
281        self
282    }
283
284    /// Authenticate with an NKey. Requires an NKey Seed secret.
285    ///
286    /// # Example
287    /// ```no_run
288    /// # #[tokio::main]
289    /// # async fn main() -> Result<(), async_nats::ConnectError> {
290    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
291    /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
292    ///     .connect("localhost")
293    ///     .await?;
294    /// # Ok(())
295    /// # }
296    /// ```
297    #[cfg(feature = "nkeys")]
298    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
299    pub fn with_nkey(seed: String) -> Self {
300        ConnectOptions::default().nkey(seed)
301    }
302
303    /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
304    /// Requires an NKey Seed Secret.
305    /// This can be used as a way to mix authentication methods.
306    ///
307    /// # Example
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), async_nats::ConnectError> {
311    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
312    /// let nc = async_nats::ConnectOptions::new()
313    ///     .nkey(seed.into())
314    ///     .connect("localhost")
315    ///     .await?;
316    /// # Ok(())
317    /// # }
318    /// ```
319    #[cfg(feature = "nkeys")]
320    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
321    pub fn nkey(mut self, seed: String) -> Self {
322        self.auth.nkey = Some(seed);
323        self
324    }
325
326    /// Authenticate with a JWT. Requires function to sign the server nonce.
327    /// The signing function is asynchronous.
328    ///
329    /// # Example
330    /// ```no_run
331    /// # #[tokio::main]
332    /// # async fn main() -> Result<(), async_nats::ConnectError> {
333    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
334    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
335    /// // load jwt from creds file or other secure source
336    /// async fn load_jwt() -> std::io::Result<String> {
337    ///     todo!();
338    /// }
339    /// let jwt = load_jwt().await?;
340    /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
341    ///     let key_pair = key_pair.clone();
342    ///     async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
343    /// })
344    /// .connect("localhost")
345    /// .await?;
346    /// # Ok(())
347    /// # }
348    /// ```
349    #[cfg(feature = "nkeys")]
350    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
351    pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
352    where
353        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
354        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
355    {
356        ConnectOptions::default().jwt(jwt, sign_cb)
357    }
358
359    /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
360    /// Requires an asynchronous function to sign the server nonce.
361    /// This can be used as a way to mix authentication methods.
362    ///
363    ///
364    /// # Example
365    /// ```no_run
366    /// # #[tokio::main]
367    /// # async fn main() -> Result<(), async_nats::ConnectError> {
368    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
369    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
370    /// // load jwt from creds file or other secure source
371    /// async fn load_jwt() -> std::io::Result<String> {
372    ///     todo!();
373    /// }
374    /// let jwt = load_jwt().await?;
375    /// let nc = async_nats::ConnectOptions::new()
376    ///     .jwt(jwt, move |nonce| {
377    ///         let key_pair = key_pair.clone();
378    ///         async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
379    ///     })
380    ///     .connect("localhost")
381    ///     .await?;
382    /// # Ok(())
383    /// # }
384    /// ```
385    #[cfg(feature = "nkeys")]
386    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
387    pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
388    where
389        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
390        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
391    {
392        let sign_cb = Arc::new(sign_cb);
393
394        let jwt_sign_callback = CallbackArg1(Arc::new(move |nonce: String| {
395            let sign_cb = sign_cb.clone();
396            Box::pin(async move {
397                let sig = sign_cb(nonce.as_bytes().to_vec())
398                    .await
399                    .map_err(AuthError::new)?;
400                Ok(URL_SAFE_NO_PAD.encode(sig))
401            })
402        }));
403
404        self.auth.jwt = Some(jwt);
405        self.auth.signature_callback = Some(jwt_sign_callback);
406        self
407    }
408
409    /// Authenticate with NATS using a `.creds` file.
410    /// Open the provided file, load its creds,
411    /// and perform the desired authentication
412    ///
413    /// # Example
414    /// ```no_run
415    /// # #[tokio::main]
416    /// # async fn main() -> Result<(), async_nats::ConnectError> {
417    /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
418    ///     .await?
419    ///     .connect("connect.ngs.global")
420    ///     .await?;
421    /// # Ok(())
422    /// # }
423    /// ```
424    #[cfg(feature = "nkeys")]
425    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
426    pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
427        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
428        Self::with_credentials(&cred_file_contents)
429    }
430
431    /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
432    /// This will open the credentials file and load its credentials.
433    /// This can be used as a way to mix authentication methods.
434    ///
435    /// # Example
436    /// ```no_run
437    /// # #[tokio::main]
438    /// # async fn main() -> Result<(), async_nats::ConnectError> {
439    /// let nc = async_nats::ConnectOptions::new()
440    ///     .credentials_file("path/to/my.creds")
441    ///     .await?
442    ///     .connect("connect.ngs.global")
443    ///     .await?;
444    /// # Ok(())
445    /// # }
446    /// ```
447    #[cfg(feature = "nkeys")]
448    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
449    pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
450        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
451        self.credentials(&cred_file_contents)
452    }
453
454    /// Authenticate with NATS using a credential str, in the creds file format.
455    ///
456    /// # Example
457    /// ```no_run
458    /// # #[tokio::main]
459    /// # async fn main() -> Result<(), async_nats::ConnectError> {
460    /// let creds = "-----BEGIN NATS USER JWT-----
461    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
462    /// ------END NATS USER JWT------
463    ///
464    /// ************************* IMPORTANT *************************
465    /// NKEY Seed printed below can be used sign and prove identity.
466    /// NKEYs are sensitive and should be treated as secrets.
467    ///
468    /// -----BEGIN USER NKEY SEED-----
469    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
470    /// ------END USER NKEY SEED------
471    /// ";
472    ///
473    /// let nc = async_nats::ConnectOptions::with_credentials(creds)
474    ///     .expect("failed to parse static creds")
475    ///     .connect("connect.ngs.global")
476    ///     .await?;
477    /// # Ok(())
478    /// # }
479    /// ```
480    #[cfg(feature = "nkeys")]
481    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
482    pub fn with_credentials(creds: &str) -> io::Result<Self> {
483        ConnectOptions::default().credentials(creds)
484    }
485
486    /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
487    /// The string should be in the credentials file format.
488    /// This can be used as a way to mix authentication methods.
489    ///
490    /// # Example
491    /// ```no_run
492    /// # #[tokio::main]
493    /// # async fn main() -> Result<(), async_nats::ConnectError> {
494    /// let creds = "-----BEGIN NATS USER JWT-----
495    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
496    /// ------END NATS USER JWT------
497    ///
498    /// ************************* IMPORTANT *************************
499    /// NKEY Seed printed below can be used sign and prove identity.
500    /// NKEYs are sensitive and should be treated as secrets.
501    ///
502    /// -----BEGIN USER NKEY SEED-----
503    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
504    /// ------END USER NKEY SEED------
505    /// ";
506    ///
507    /// let nc = async_nats::ConnectOptions::new()
508    ///     .credentials(creds)
509    ///     .expect("failed to parse static creds")
510    ///     .connect("connect.ngs.global")
511    ///     .await?;
512    /// # Ok(())
513    /// # }
514    /// ```
515    #[cfg(feature = "nkeys")]
516    #[cfg_attr(docsrs, doc(cfg(feature = "nkeys")))]
517    pub fn credentials(self, creds: &str) -> io::Result<Self> {
518        let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
519        let key_pair = std::sync::Arc::new(key_pair);
520
521        Ok(self.jwt(jwt.to_owned(), move |nonce| {
522            let key_pair = key_pair.clone();
523            async move { key_pair.sign(&nonce).map_err(AuthError::new) }
524        }))
525    }
526
527    /// Loads root certificates by providing the path to them.
528    ///
529    /// # Examples
530    /// ```no_run
531    /// # #[tokio::main]
532    /// # async fn main() -> Result<(), async_nats::ConnectError> {
533    /// let nc = async_nats::ConnectOptions::new()
534    ///     .add_root_certificates("mycerts.pem".into())
535    ///     .connect("demo.nats.io")
536    ///     .await?;
537    /// # Ok(())
538    /// # }
539    /// ```
540    pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
541        self.certificates = vec![path];
542        self
543    }
544
545    /// Loads client certificate by providing the path to it.
546    ///
547    /// # Examples
548    /// ```no_run
549    /// # #[tokio::main]
550    /// # async fn main() -> Result<(), async_nats::ConnectError> {
551    /// let nc = async_nats::ConnectOptions::new()
552    ///     .add_client_certificate("cert.pem".into(), "key.pem".into())
553    ///     .connect("demo.nats.io")
554    ///     .await?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
559        self.client_cert = Some(cert);
560        self.client_key = Some(key);
561        self
562    }
563
564    /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
565    ///
566    /// # Examples
567    /// ```no_run
568    /// # #[tokio::main]
569    /// # async fn main() -> Result<(), async_nats::ConnectError> {
570    /// let nc = async_nats::ConnectOptions::new()
571    ///     .require_tls(true)
572    ///     .connect("demo.nats.io")
573    ///     .await?;
574    /// # Ok(())
575    /// # }
576    /// ```
577    pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
578        self.tls_required = is_required;
579        self
580    }
581
582    /// Changes how tls connection is established. If `tls_first` is set,
583    /// client will try to establish tls before getting info from the server.
584    /// That requires the server to enable `handshake_first` option in the config.
585    pub fn tls_first(mut self) -> ConnectOptions {
586        self.tls_first = true;
587        self.tls_required = true;
588        self
589    }
590
591    /// Sets how often Client sends PING message to the server.
592    ///
593    /// # Examples
594    /// ```no_run
595    /// # use tokio::time::Duration;
596    /// # #[tokio::main]
597    /// # async fn main() -> Result<(), async_nats::ConnectError> {
598    /// async_nats::ConnectOptions::new()
599    ///     .ping_interval(Duration::from_secs(24))
600    ///     .connect("demo.nats.io")
601    ///     .await?;
602    /// # Ok(())
603    /// # }
604    /// ```
605    pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
606        self.ping_interval = ping_interval;
607        self
608    }
609
610    /// Sets `no_echo` option which disables delivering messages that were published from the same
611    /// connection.
612    ///
613    /// # Examples
614    /// ```no_run
615    /// # #[tokio::main]
616    /// # async fn main() -> Result<(), async_nats::ConnectError> {
617    /// async_nats::ConnectOptions::new()
618    ///     .no_echo()
619    ///     .connect("demo.nats.io")
620    ///     .await?;
621    /// # Ok(())
622    /// # }
623    /// ```
624    pub fn no_echo(mut self) -> ConnectOptions {
625        self.no_echo = true;
626        self
627    }
628
629    /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
630    /// callback and drop messages.
631    /// Default is set to 65536 messages buffer.
632    ///
633    /// # Examples
634    /// ```no_run
635    /// # #[tokio::main]
636    /// # async fn main() -> Result<(), async_nats::ConnectError> {
637    /// async_nats::ConnectOptions::new()
638    ///     .subscription_capacity(1024)
639    ///     .connect("demo.nats.io")
640    ///     .await?;
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
645        self.subscription_capacity = capacity;
646        self
647    }
648
649    /// Sets a timeout for the full connection establishment and handshake to avoid
650    /// hangs and deadlocks. This includes TCP/WebSocket connection, TLS setup,
651    /// waiting for the server INFO message, sending CONNECT/PING, and receiving
652    /// the initial server PONG response. Default is set to 5 seconds.
653    ///
654    /// # Examples
655    /// ```no_run
656    /// # #[tokio::main]
657    /// # async fn main() -> Result<(), async_nats::ConnectError> {
658    /// async_nats::ConnectOptions::new()
659    ///     .connection_timeout(tokio::time::Duration::from_secs(5))
660    ///     .connect("demo.nats.io")
661    ///     .await?;
662    /// # Ok(())
663    /// # }
664    /// ```
665    pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
666        self.connection_timeout = timeout;
667        self
668    }
669
670    /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
671    ///
672    /// # Examples
673    /// ```no_run
674    /// # #[tokio::main]
675    /// # async fn main() -> Result<(), async_nats::ConnectError> {
676    /// async_nats::ConnectOptions::new()
677    ///     .request_timeout(Some(std::time::Duration::from_secs(3)))
678    ///     .connect("demo.nats.io")
679    ///     .await?;
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
684        self.request_timeout = timeout;
685        self
686    }
687
688    /// Registers an asynchronous callback for errors that are received over the wire from the server.
689    ///
690    /// # Examples
691    /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
692    /// work around this
693    ///
694    /// ## Basic
695    /// If you don't need to move anything into the closure, simple signature can be used:
696    ///
697    /// ```no_run
698    /// # #[tokio::main]
699    /// # async fn main() -> Result<(), async_nats::ConnectError> {
700    /// async_nats::ConnectOptions::new()
701    ///     .event_callback(|event| async move {
702    ///         println!("event occurred: {}", event);
703    ///     })
704    ///     .connect("demo.nats.io")
705    ///     .await?;
706    /// # Ok(())
707    /// # }
708    /// ```
709    ///
710    /// ## Listening to specific event kind
711    /// ```no_run
712    /// # #[tokio::main]
713    /// # async fn main() -> Result<(), async_nats::ConnectError> {
714    /// async_nats::ConnectOptions::new()
715    ///     .event_callback(|event| async move {
716    ///         match event {
717    ///             async_nats::Event::Disconnected => println!("disconnected"),
718    ///             async_nats::Event::Connected => println!("reconnected"),
719    ///             async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
720    ///             other => println!("other event happened: {}", other),
721    ///         }
722    ///     })
723    ///     .connect("demo.nats.io")
724    ///     .await?;
725    /// # Ok(())
726    /// # }
727    /// ```
728    ///
729    /// ## Advanced
730    /// If you need to move something into the closure, here's an example how to do that
731    ///
732    /// ```no_run
733    /// # #[tokio::main]
734    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
735    /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
736    /// async_nats::ConnectOptions::new()
737    ///     .event_callback(move |event| {
738    ///         let tx = tx.clone();
739    ///         async move {
740    ///             tx.send(event).await.unwrap();
741    ///         }
742    ///     })
743    ///     .connect("demo.nats.io")
744    ///     .await?;
745    /// # Ok(())
746    /// # }
747    /// ```
748    pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
749    where
750        F: Fn(Event) -> Fut + Send + Sync + 'static,
751        Fut: Future<Output = ()> + 'static + Send + Sync,
752    {
753        self.event_callback = Some(CallbackArg1::<Event, ()>(Arc::new(move |event| {
754            Box::pin(cb(event))
755        })));
756        self
757    }
758
759    /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
760    ///
761    /// # Examples
762    /// ```no_run
763    /// # #[tokio::main]
764    /// # async fn main() -> Result<(), async_nats::ConnectError> {
765    /// async_nats::ConnectOptions::new()
766    ///     .reconnect_delay_callback(|attempts| {
767    ///         println!("no of attempts: {attempts}");
768    ///         std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
769    ///     })
770    ///     .connect("demo.nats.io")
771    ///     .await?;
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
776    where
777        F: Fn(usize) -> Duration + Send + Sync + 'static,
778    {
779        self.reconnect_delay_callback = Arc::new(cb);
780        self
781    }
782
783    /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
784    /// This option enables overriding it.
785    ///
786    /// # Examples
787    /// ```
788    /// # #[tokio::main]
789    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
790    /// async_nats::ConnectOptions::new()
791    ///     .client_capacity(256)
792    ///     .connect("demo.nats.io")
793    ///     .await?;
794    /// # Ok(())
795    /// # }
796    /// ```
797    pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
798        self.sender_capacity = capacity;
799        self
800    }
801
802    /// Sets custom prefix instead of default `_INBOX`.
803    ///
804    /// # Examples
805    ///
806    /// ```
807    /// # #[tokio::main]
808    /// # async fn main() -> Result<(), async_nats::Error> {
809    /// async_nats::ConnectOptions::new()
810    ///     .custom_inbox_prefix("CUSTOM")
811    ///     .connect("demo.nats.io")
812    ///     .await?;
813    /// # Ok(())
814    /// # }
815    /// ```
816    pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
817        self.inbox_prefix = prefix.to_string();
818        self
819    }
820
821    /// Sets the name for the client.
822    ///
823    /// # Examples
824    /// ```
825    /// # #[tokio::main]
826    /// # async fn main() -> Result<(), async_nats::Error> {
827    /// async_nats::ConnectOptions::new()
828    ///     .name("rust-service")
829    ///     .connect("demo.nats.io")
830    ///     .await?;
831    /// # Ok(())
832    /// # }
833    /// ```
834    pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
835        self.name = Some(name.to_string());
836        self
837    }
838
839    /// By default, [`ConnectOptions::connect`] will return an error if
840    /// the connection to the server cannot be established.
841    ///
842    /// Setting `retry_on_initial_connect` makes the client
843    /// establish the connection in the background.
844    pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
845        self.retry_on_initial_connect = true;
846        self
847    }
848
849    /// Specifies the number of consecutive reconnect attempts the client will
850    /// make before giving up. This is useful for preventing zombie services
851    /// from endlessly reaching the servers, but it can also be a footgun and
852    /// surprise for users who do not expect that the client can give up
853    /// entirely.
854    ///
855    /// Pass `None` or `0` for no limit.
856    ///
857    /// # Examples
858    /// ```
859    /// # #[tokio::main]
860    /// # async fn main() -> Result<(), async_nats::Error> {
861    /// async_nats::ConnectOptions::new()
862    ///     .max_reconnects(None)
863    ///     .connect("demo.nats.io")
864    ///     .await?;
865    /// # Ok(())
866    /// # }
867    /// ```
868    pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
869        let val: Option<usize> = max_reconnects.into();
870        self.max_reconnects = if val == Some(0) { None } else { val };
871        self
872    }
873
874    /// Disables subject validation for publish operations.
875    ///
876    /// By default, the client validates publish subjects to ensure they don't contain
877    /// whitespace characters (space, tab, CR, LF).
878    ///
879    /// This option only affects **publish** validation. Subscribe and queue group
880    /// validation always runs regardless of this setting, matching the behavior
881    /// of the Go and Java NATS clients.
882    ///
883    /// # Warning
884    /// Using invalid subjects may cause protocol errors with the NATS server.
885    /// Only disable validation if you are certain all published subjects are valid.
886    ///
887    /// # Examples
888    /// ```no_run
889    /// # #[tokio::main]
890    /// # async fn main() -> Result<(), async_nats::ConnectError> {
891    /// async_nats::ConnectOptions::new()
892    ///     .skip_subject_validation(true)
893    ///     .connect("demo.nats.io")
894    ///     .await?;
895    /// # Ok(())
896    /// # }
897    /// ```
898    pub fn skip_subject_validation(mut self, skip: bool) -> ConnectOptions {
899        self.skip_subject_validation = skip;
900        self
901    }
902
903    /// By default, a server may advertise other servers in the cluster known to it.
904    /// By setting this option, the client will ignore the advertised servers.
905    /// This may be useful if the client may not be able to reach them.
906    pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
907        self.ignore_discovered_servers = true;
908        self
909    }
910
911    /// By default, client will pick random server to which it will try connect to.
912    /// This option disables that feature, forcing it to always respect the order
913    /// in which server addresses were passed.
914    pub fn retain_servers_order(mut self) -> ConnectOptions {
915        self.retain_servers_order = true;
916        self
917    }
918
919    /// Allows passing custom rustls tls config.
920    ///
921    /// # Examples
922    /// ```
923    /// # #[tokio::main]
924    /// # async fn main() -> Result<(), async_nats::Error> {
925    /// let mut root_store = async_nats::rustls::RootCertStore::empty();
926    ///
927    /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs().certs);
928    ///
929    /// let tls_client = async_nats::rustls::ClientConfig::builder()
930    ///     .with_root_certificates(root_store)
931    ///     .with_no_client_auth();
932    ///
933    /// let client = async_nats::ConnectOptions::new()
934    ///     .require_tls(true)
935    ///     .tls_client_config(tls_client)
936    ///     .connect("tls://demo.nats.io")
937    ///     .await?;
938    ///
939    /// # Ok(())
940    /// # }
941    /// ```
942    pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
943        self.tls_client_config = Some(config);
944        self
945    }
946
947    /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
948    /// protocol messages.
949    ///
950    /// # Examples
951    /// ```
952    /// # #[tokio::main]
953    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
954    /// async_nats::ConnectOptions::new()
955    ///     .read_buffer_capacity(65535)
956    ///     .connect("demo.nats.io")
957    ///     .await?;
958    /// # Ok(())
959    /// # }
960    /// ```
961    pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
962        self.read_buffer_capacity = size;
963        self
964    }
965
966    /// Sets the local socket address that the client will bind to when connecting
967    /// to the server. This is useful when the client machine has multiple
968    /// network interfaces and you want to control which one is used, or when
969    /// you need to bind to a specific local port.
970    ///
971    /// Use port `0` to let the operating system assign an ephemeral port.
972    ///
973    /// # Examples
974    /// ```no_run
975    /// # #[tokio::main]
976    /// # async fn main() -> Result<(), async_nats::ConnectError> {
977    /// // Bind to a specific IP with an OS-assigned port
978    /// let addr: std::net::SocketAddr = "192.168.1.10:0".parse().unwrap();
979    /// async_nats::ConnectOptions::new()
980    ///     .local_address(addr)
981    ///     .connect("demo.nats.io")
982    ///     .await?;
983    ///
984    /// // Bind to a specific IP and port
985    /// let addr: std::net::SocketAddr = "192.168.1.10:9898".parse().unwrap();
986    /// async_nats::ConnectOptions::new()
987    ///     .local_address(addr)
988    ///     .connect("demo.nats.io")
989    ///     .await?;
990    /// # Ok(())
991    /// # }
992    /// ```
993    pub fn local_address(mut self, address: SocketAddr) -> ConnectOptions {
994        self.local_address = Some(address);
995        self
996    }
997}
998
999pub(crate) type AsyncCallbackArg1<A, T> =
1000    Arc<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
1001
1002#[derive(Clone)]
1003pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
1004
1005impl<A, T> CallbackArg1<A, T> {
1006    pub(crate) async fn call(&self, arg: A) -> T {
1007        (self.0.as_ref())(arg).await
1008    }
1009}
1010
1011impl<A, T> fmt::Debug for CallbackArg1<A, T> {
1012    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1013        f.write_str("callback")
1014    }
1015}
1016
1017/// Error report from signing callback.
1018// This was needed because std::io::Error isn't Send.
1019#[derive(Clone, PartialEq)]
1020pub struct AuthError(String);
1021
1022impl AuthError {
1023    pub fn new(s: impl ToString) -> Self {
1024        Self(s.to_string())
1025    }
1026}
1027
1028impl std::fmt::Display for AuthError {
1029    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1030        f.write_str(&format!("AuthError({})", &self.0))
1031    }
1032}
1033
1034impl std::fmt::Debug for AuthError {
1035    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1036        f.write_str(&format!("AuthError({})", &self.0))
1037    }
1038}
1039
1040impl std::error::Error for AuthError {}