async_nats_flyradar/
options.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::{connector, Dialer};
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17use base64::engine::general_purpose::URL_SAFE_NO_PAD;
18use base64::engine::Engine;
19use futures::Future;
20use std::fmt::Formatter;
21use std::{
22    fmt,
23    path::{Path, PathBuf},
24    pin::Pin,
25    sync::Arc,
26    time::Duration,
27};
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37///     .require_tls(true)
38///     .ping_interval(std::time::Duration::from_secs(10))
39///     .connect("demo.nats.io")
40///     .await?;
41/// # Ok(())
42/// # }
43/// ```
44pub struct ConnectOptions {
45    pub(crate) name: Option<String>,
46    pub(crate) no_echo: bool,
47    pub(crate) max_reconnects: Option<usize>,
48    pub(crate) connection_timeout: Duration,
49    pub(crate) auth: Auth,
50    pub(crate) tls_required: bool,
51    pub(crate) tls_first: bool,
52    pub(crate) certificates: Vec<PathBuf>,
53    pub(crate) client_cert: Option<PathBuf>,
54    pub(crate) client_key: Option<PathBuf>,
55    pub(crate) tls_client_config: Option<rustls::ClientConfig>,
56    pub(crate) ping_interval: Duration,
57    pub(crate) subscription_capacity: usize,
58    pub(crate) sender_capacity: usize,
59    pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
60    pub(crate) inbox_prefix: String,
61    pub(crate) request_timeout: Option<Duration>,
62    pub(crate) retry_on_initial_connect: bool,
63    pub(crate) ignore_discovered_servers: bool,
64    pub(crate) retain_servers_order: bool,
65    pub(crate) read_buffer_capacity: u16,
66    pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
67    pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
68    //INFO: Diff starts
69    pub(crate) dialer: Option<Arc<dyn Dialer + Send + Sync>>,
70    //INFO: Diff ends
71}
72
73impl fmt::Debug for ConnectOptions {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
75        f.debug_map()
76            .entry(&"name", &self.name)
77            .entry(&"no_echo", &self.no_echo)
78            .entry(&"max_reconnects", &self.max_reconnects)
79            .entry(&"connection_timeout", &self.connection_timeout)
80            .entry(&"tls_required", &self.tls_required)
81            .entry(&"certificates", &self.certificates)
82            .entry(&"client_cert", &self.client_cert)
83            .entry(&"client_key", &self.client_key)
84            .entry(&"tls_client_config", &"XXXXXXXX")
85            .entry(&"tls_first", &self.tls_first)
86            .entry(&"ping_interval", &self.ping_interval)
87            .entry(&"sender_capacity", &self.sender_capacity)
88            .entry(&"inbox_prefix", &self.inbox_prefix)
89            .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
90            .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
91            .finish()
92    }
93}
94
95impl Default for ConnectOptions {
96    fn default() -> ConnectOptions {
97        ConnectOptions {
98            name: None,
99            no_echo: false,
100            max_reconnects: None,
101            connection_timeout: Duration::from_secs(5),
102            tls_required: false,
103            tls_first: false,
104            certificates: Vec::new(),
105            client_cert: None,
106            client_key: None,
107            tls_client_config: None,
108            ping_interval: Duration::from_secs(60),
109            sender_capacity: 2048,
110            subscription_capacity: 1024 * 64,
111            event_callback: None,
112            inbox_prefix: "_INBOX".to_string(),
113            request_timeout: Some(Duration::from_secs(10)),
114            retry_on_initial_connect: false,
115            ignore_discovered_servers: false,
116            retain_servers_order: false,
117            read_buffer_capacity: 65535,
118            reconnect_delay_callback: Box::new(|attempts| {
119                connector::reconnect_delay_callback_default(attempts)
120            }),
121            auth: Default::default(),
122            auth_callback: None,
123            //INFO: Diff starts
124            dialer: None,
125            //INFO: Diff ends
126        }
127    }
128}
129
130impl ConnectOptions {
131    /// Enables customization of NATS connection.
132    ///
133    /// # Examples
134    /// ```no_run
135    /// # #[tokio::main]
136    /// # async fn main() -> Result<(), async_nats::ConnectError> {
137    /// let mut options = async_nats::ConnectOptions::new()
138    ///     .require_tls(true)
139    ///     .ping_interval(std::time::Duration::from_secs(10))
140    ///     .connect("demo.nats.io")
141    ///     .await?;
142    /// # Ok(())
143    /// # }
144    /// ```
145    pub fn new() -> ConnectOptions {
146        ConnectOptions::default()
147    }
148
149    /// Connect to the NATS Server leveraging all passed options.
150    ///
151    /// # Examples
152    /// ```no_run
153    /// # #[tokio::main]
154    /// # async fn main() -> Result<(), async_nats::ConnectError> {
155    /// let nc = async_nats::ConnectOptions::new()
156    ///     .require_tls(true)
157    ///     .connect("demo.nats.io")
158    ///     .await?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// ## Pass multiple URLs.
164    /// ```no_run
165    /// #[tokio::main]
166    /// # async fn main() -> Result<(), async_nats::Error> {
167    /// use async_nats::ServerAddr;
168    /// let client = async_nats::connect(vec![
169    ///     "demo.nats.io".parse::<ServerAddr>()?,
170    ///     "other.nats.io".parse::<ServerAddr>()?,
171    /// ])
172    /// .await
173    /// .unwrap();
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
178        crate::connect_with_options(addrs, self).await
179    }
180
181    //INFO: Diff starts
182    pub fn with_dialer(mut self, dialer: Arc<dyn Dialer + Send + Sync>) -> Self {
183        self.dialer = Some(dialer);
184        self
185    }
186    //INFO: Diff ends
187
188    /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
189    /// Requires an asynchronous function that accepts nonce and returns [Auth].
190    /// It will overwrite all other auth methods used.
191    ///
192    ///
193    /// # Example
194    /// ```no_run
195    /// # #[tokio::main]
196    /// # async fn main() -> Result<(), async_nats::ConnectError> {
197    /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
198    ///     let mut auth = async_nats::Auth::new();
199    ///     auth.username = Some("derek".to_string());
200    ///     auth.password = Some("s3cr3t".to_string());
201    ///     Ok(auth)
202    /// })
203    /// .connect("demo.nats.io")
204    /// .await?;
205    /// # Ok(())
206    /// # }
207    /// ```
208    pub fn with_auth_callback<F, Fut>(callback: F) -> Self
209    where
210        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
211        Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
212    {
213        let mut options = ConnectOptions::new();
214        options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
215            move |nonce| Box::pin(callback(nonce)),
216        )));
217        options
218    }
219
220    /// Authenticate against NATS Server with the provided token.
221    ///
222    /// # Examples
223    /// ```no_run
224    /// # #[tokio::main]
225    /// # async fn main() -> Result<(), async_nats::ConnectError> {
226    /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
227    ///     .connect("demo.nats.io")
228    ///     .await?;
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub fn with_token(token: String) -> Self {
233        ConnectOptions::default().token(token)
234    }
235
236    /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
237    /// This can be used as a way to mix authentication methods.
238    ///
239    /// # Examples
240    /// ```no_run
241    /// # #[tokio::main]
242    /// # async fn main() -> Result<(), async_nats::ConnectError> {
243    /// let nc = async_nats::ConnectOptions::new()
244    ///     .token("t0k3n!".into())
245    ///     .connect("demo.nats.io")
246    ///     .await?;
247    /// # Ok(())
248    /// # }
249    /// ```
250    pub fn token(mut self, token: String) -> Self {
251        self.auth.token = Some(token);
252        self
253    }
254
255    /// Authenticate against NATS Server with the provided username and password.
256    ///
257    /// # Examples
258    /// ```no_run
259    /// # #[tokio::main]
260    /// # async fn main() -> Result<(), async_nats::ConnectError> {
261    /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
262    ///     .connect("demo.nats.io")
263    ///     .await?;
264    /// # Ok(())
265    /// # }
266    /// ```
267    pub fn with_user_and_password(user: String, pass: String) -> Self {
268        ConnectOptions::default().user_and_password(user, pass)
269    }
270
271    /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
272    /// This can be used as a way to mix authentication methods.
273    ///
274    /// # Examples
275    /// ```no_run
276    /// # #[tokio::main]
277    /// # async fn main() -> Result<(), async_nats::ConnectError> {
278    /// let nc = async_nats::ConnectOptions::new()
279    ///     .user_and_password("derek".into(), "s3cr3t!".into())
280    ///     .connect("demo.nats.io")
281    ///     .await?;
282    /// # Ok(())
283    /// # }
284    /// ```
285    pub fn user_and_password(mut self, user: String, pass: String) -> Self {
286        self.auth.username = Some(user);
287        self.auth.password = Some(pass);
288        self
289    }
290
291    /// Authenticate with an NKey. Requires an NKey Seed secret.
292    ///
293    /// # Example
294    /// ```no_run
295    /// # #[tokio::main]
296    /// # async fn main() -> Result<(), async_nats::ConnectError> {
297    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
298    /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
299    ///     .connect("localhost")
300    ///     .await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn with_nkey(seed: String) -> Self {
305        ConnectOptions::default().nkey(seed)
306    }
307
308    /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
309    /// Requires an NKey Seed Secret.
310    /// This can be used as a way to mix authentication methods.
311    ///
312    /// # Example
313    /// ```no_run
314    /// # #[tokio::main]
315    /// # async fn main() -> Result<(), async_nats::ConnectError> {
316    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
317    /// let nc = async_nats::ConnectOptions::new()
318    ///     .nkey(seed.into())
319    ///     .connect("localhost")
320    ///     .await?;
321    /// # Ok(())
322    /// # }
323    /// ```
324    pub fn nkey(mut self, seed: String) -> Self {
325        self.auth.nkey = Some(seed);
326        self
327    }
328
329    /// Authenticate with a JWT. Requires function to sign the server nonce.
330    /// The signing function is asynchronous.
331    ///
332    /// # Example
333    /// ```no_run
334    /// # #[tokio::main]
335    /// # async fn main() -> Result<(), async_nats::ConnectError> {
336    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
337    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
338    /// // load jwt from creds file or other secure source
339    /// async fn load_jwt() -> std::io::Result<String> {
340    ///     todo!();
341    /// }
342    /// let jwt = load_jwt().await?;
343    /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
344    ///     let key_pair = key_pair.clone();
345    ///     async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
346    /// })
347    /// .connect("localhost")
348    /// .await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
353    where
354        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
355        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
356    {
357        ConnectOptions::default().jwt(jwt, sign_cb)
358    }
359
360    /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
361    /// Requires an asynchronous function to sign the server nonce.
362    /// This can be used as a way to mix authentication methods.
363    ///
364    ///
365    /// # Example
366    /// ```no_run
367    /// # #[tokio::main]
368    /// # async fn main() -> Result<(), async_nats::ConnectError> {
369    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
370    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
371    /// // load jwt from creds file or other secure source
372    /// async fn load_jwt() -> std::io::Result<String> {
373    ///     todo!();
374    /// }
375    /// let jwt = load_jwt().await?;
376    /// let nc = async_nats::ConnectOptions::new()
377    ///     .jwt(jwt, move |nonce| {
378    ///         let key_pair = key_pair.clone();
379    ///         async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
380    ///     })
381    ///     .connect("localhost")
382    ///     .await?;
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
387    where
388        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
389        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
390    {
391        let sign_cb = Arc::new(sign_cb);
392
393        let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
394            let sign_cb = sign_cb.clone();
395            Box::pin(async move {
396                let sig = sign_cb(nonce.as_bytes().to_vec())
397                    .await
398                    .map_err(AuthError::new)?;
399                Ok(URL_SAFE_NO_PAD.encode(sig))
400            })
401        }));
402
403        self.auth.jwt = Some(jwt);
404        self.auth.signature_callback = Some(jwt_sign_callback);
405        self
406    }
407
408    /// Authenticate with NATS using a `.creds` file.
409    /// Open the provided file, load its creds,
410    /// and perform the desired authentication
411    ///
412    /// # Example
413    /// ```no_run
414    /// # #[tokio::main]
415    /// # async fn main() -> Result<(), async_nats::ConnectError> {
416    /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
417    ///     .await?
418    ///     .connect("connect.ngs.global")
419    ///     .await?;
420    /// # Ok(())
421    /// # }
422    /// ```
423    pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
424        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
425        Self::with_credentials(&cred_file_contents)
426    }
427
428    /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
429    /// This will open the credentials file and load its credentials.
430    /// This can be used as a way to mix authentication methods.
431    ///
432    /// # Example
433    /// ```no_run
434    /// # #[tokio::main]
435    /// # async fn main() -> Result<(), async_nats::ConnectError> {
436    /// let nc = async_nats::ConnectOptions::new()
437    ///     .credentials_file("path/to/my.creds")
438    ///     .await?
439    ///     .connect("connect.ngs.global")
440    ///     .await?;
441    /// # Ok(())
442    /// # }
443    /// ```
444    pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
445        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
446        self.credentials(&cred_file_contents)
447    }
448
449    /// Authenticate with NATS using a credential str, in the creds file format.
450    ///
451    /// # Example
452    /// ```no_run
453    /// # #[tokio::main]
454    /// # async fn main() -> Result<(), async_nats::ConnectError> {
455    /// let creds = "-----BEGIN NATS USER JWT-----
456    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
457    /// ------END NATS USER JWT------
458    ///
459    /// ************************* IMPORTANT *************************
460    /// NKEY Seed printed below can be used sign and prove identity.
461    /// NKEYs are sensitive and should be treated as secrets.
462    ///
463    /// -----BEGIN USER NKEY SEED-----
464    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
465    /// ------END USER NKEY SEED------
466    /// ";
467    ///
468    /// let nc = async_nats::ConnectOptions::with_credentials(creds)
469    ///     .expect("failed to parse static creds")
470    ///     .connect("connect.ngs.global")
471    ///     .await?;
472    /// # Ok(())
473    /// # }
474    /// ```
475    pub fn with_credentials(creds: &str) -> io::Result<Self> {
476        ConnectOptions::default().credentials(creds)
477    }
478
479    /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
480    /// The string should be in the credentials file format.
481    /// This can be used as a way to mix authentication methods.
482    ///
483    /// # Example
484    /// ```no_run
485    /// # #[tokio::main]
486    /// # async fn main() -> Result<(), async_nats::ConnectError> {
487    /// let creds = "-----BEGIN NATS USER JWT-----
488    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
489    /// ------END NATS USER JWT------
490    ///
491    /// ************************* IMPORTANT *************************
492    /// NKEY Seed printed below can be used sign and prove identity.
493    /// NKEYs are sensitive and should be treated as secrets.
494    ///
495    /// -----BEGIN USER NKEY SEED-----
496    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
497    /// ------END USER NKEY SEED------
498    /// ";
499    ///
500    /// let nc = async_nats::ConnectOptions::new()
501    ///     .credentials(creds)
502    ///     .expect("failed to parse static creds")
503    ///     .connect("connect.ngs.global")
504    ///     .await?;
505    /// # Ok(())
506    /// # }
507    /// ```
508    pub fn credentials(self, creds: &str) -> io::Result<Self> {
509        let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
510        let key_pair = std::sync::Arc::new(key_pair);
511
512        Ok(self.jwt(jwt.to_owned(), move |nonce| {
513            let key_pair = key_pair.clone();
514            async move { key_pair.sign(&nonce).map_err(AuthError::new) }
515        }))
516    }
517
518    /// Loads root certificates by providing the path to them.
519    ///
520    /// # Examples
521    /// ```no_run
522    /// # #[tokio::main]
523    /// # async fn main() -> Result<(), async_nats::ConnectError> {
524    /// let nc = async_nats::ConnectOptions::new()
525    ///     .add_root_certificates("mycerts.pem".into())
526    ///     .connect("demo.nats.io")
527    ///     .await?;
528    /// # Ok(())
529    /// # }
530    /// ```
531    pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
532        self.certificates = vec![path];
533        self
534    }
535
536    /// Loads client certificate by providing the path to it.
537    ///
538    /// # Examples
539    /// ```no_run
540    /// # #[tokio::main]
541    /// # async fn main() -> Result<(), async_nats::ConnectError> {
542    /// let nc = async_nats::ConnectOptions::new()
543    ///     .add_client_certificate("cert.pem".into(), "key.pem".into())
544    ///     .connect("demo.nats.io")
545    ///     .await?;
546    /// # Ok(())
547    /// # }
548    /// ```
549    pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
550        self.client_cert = Some(cert);
551        self.client_key = Some(key);
552        self
553    }
554
555    /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
556    ///
557    /// # Examples
558    /// ```no_run
559    /// # #[tokio::main]
560    /// # async fn main() -> Result<(), async_nats::ConnectError> {
561    /// let nc = async_nats::ConnectOptions::new()
562    ///     .require_tls(true)
563    ///     .connect("demo.nats.io")
564    ///     .await?;
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
569        self.tls_required = is_required;
570        self
571    }
572
573    /// Changes how tls connection is established. If `tls_first` is set,
574    /// client will try to establish tls before getting info from the server.
575    /// That requires the server to enable `handshake_first` option in the config.
576    pub fn tls_first(mut self) -> ConnectOptions {
577        self.tls_first = true;
578        self.tls_required = true;
579        self
580    }
581
582    /// Sets how often Client sends PING message to the server.
583    ///
584    /// # Examples
585    /// ```no_run
586    /// # use tokio::time::Duration;
587    /// # #[tokio::main]
588    /// # async fn main() -> Result<(), async_nats::ConnectError> {
589    /// async_nats::ConnectOptions::new()
590    ///     .ping_interval(Duration::from_secs(24))
591    ///     .connect("demo.nats.io")
592    ///     .await?;
593    /// # Ok(())
594    /// # }
595    /// ```
596    pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
597        self.ping_interval = ping_interval;
598        self
599    }
600
601    /// Sets `no_echo` option which disables delivering messages that were published from the same
602    /// connection.
603    ///
604    /// # Examples
605    /// ```no_run
606    /// # #[tokio::main]
607    /// # async fn main() -> Result<(), async_nats::ConnectError> {
608    /// async_nats::ConnectOptions::new()
609    ///     .no_echo()
610    ///     .connect("demo.nats.io")
611    ///     .await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub fn no_echo(mut self) -> ConnectOptions {
616        self.no_echo = true;
617        self
618    }
619
620    /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
621    /// callback and drop messages.
622    /// Default is set to 65536 messages buffer.
623    ///
624    /// # Examples
625    /// ```no_run
626    /// # #[tokio::main]
627    /// # async fn main() -> Result<(), async_nats::ConnectError> {
628    /// async_nats::ConnectOptions::new()
629    ///     .subscription_capacity(1024)
630    ///     .connect("demo.nats.io")
631    ///     .await?;
632    /// # Ok(())
633    /// # }
634    /// ```
635    pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
636        self.subscription_capacity = capacity;
637        self
638    }
639
640    /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
641    /// Default is set to 5 seconds.
642    ///
643    /// # Examples
644    /// ```no_run
645    /// # #[tokio::main]
646    /// # async fn main() -> Result<(), async_nats::ConnectError> {
647    /// async_nats::ConnectOptions::new()
648    ///     .connection_timeout(tokio::time::Duration::from_secs(5))
649    ///     .connect("demo.nats.io")
650    ///     .await?;
651    /// # Ok(())
652    /// # }
653    /// ```
654    pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
655        self.connection_timeout = timeout;
656        self
657    }
658
659    /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
660    ///
661    /// # Examples
662    /// ```no_run
663    /// # #[tokio::main]
664    /// # async fn main() -> Result<(), async_nats::ConnectError> {
665    /// async_nats::ConnectOptions::new()
666    ///     .request_timeout(Some(std::time::Duration::from_secs(3)))
667    ///     .connect("demo.nats.io")
668    ///     .await?;
669    /// # Ok(())
670    /// # }
671    /// ```
672    pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
673        self.request_timeout = timeout;
674        self
675    }
676
677    /// Registers an asynchronous callback for errors that are received over the wire from the server.
678    ///
679    /// # Examples
680    /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
681    /// work around this
682    ///
683    /// ## Basic
684    /// If you don't need to move anything into the closure, simple signature can be used:
685    ///
686    /// ```no_run
687    /// # #[tokio::main]
688    /// # async fn main() -> Result<(), async_nats::ConnectError> {
689    /// async_nats::ConnectOptions::new()
690    ///     .event_callback(|event| async move {
691    ///         println!("event occurred: {}", event);
692    ///     })
693    ///     .connect("demo.nats.io")
694    ///     .await?;
695    /// # Ok(())
696    /// # }
697    /// ```
698    ///
699    /// ## Listening to specific event kind
700    /// ```no_run
701    /// # #[tokio::main]
702    /// # async fn main() -> Result<(), async_nats::ConnectError> {
703    /// async_nats::ConnectOptions::new()
704    ///     .event_callback(|event| async move {
705    ///         match event {
706    ///             async_nats::Event::Disconnected => println!("disconnected"),
707    ///             async_nats::Event::Connected => println!("reconnected"),
708    ///             async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
709    ///             other => println!("other event happened: {}", other),
710    ///         }
711    ///     })
712    ///     .connect("demo.nats.io")
713    ///     .await?;
714    /// # Ok(())
715    /// # }
716    /// ```
717    ///
718    /// ## Advanced
719    /// If you need to move something into the closure, here's an example how to do that
720    ///
721    /// ```no_run
722    /// # #[tokio::main]
723    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
724    /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
725    /// async_nats::ConnectOptions::new()
726    ///     .event_callback(move |event| {
727    ///         let tx = tx.clone();
728    ///         async move {
729    ///             tx.send(event).await.unwrap();
730    ///         }
731    ///     })
732    ///     .connect("demo.nats.io")
733    ///     .await?;
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
738    where
739        F: Fn(Event) -> Fut + Send + Sync + 'static,
740        Fut: Future<Output = ()> + 'static + Send + Sync,
741    {
742        self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
743            Box::pin(cb(event))
744        })));
745        self
746    }
747
748    /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
749    ///
750    /// # Examples
751    /// ```no_run
752    /// # #[tokio::main]
753    /// # async fn main() -> Result<(), async_nats::ConnectError> {
754    /// async_nats::ConnectOptions::new()
755    ///     .reconnect_delay_callback(|attempts| {
756    ///         println!("no of attempts: {attempts}");
757    ///         std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
758    ///     })
759    ///     .connect("demo.nats.io")
760    ///     .await?;
761    /// # Ok(())
762    /// # }
763    /// ```
764    pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
765    where
766        F: Fn(usize) -> Duration + Send + Sync + 'static,
767    {
768        self.reconnect_delay_callback = Box::new(cb);
769        self
770    }
771
772    /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
773    /// This option enables overriding it.
774    ///
775    /// # Examples
776    /// ```
777    /// # #[tokio::main]
778    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
779    /// async_nats::ConnectOptions::new()
780    ///     .client_capacity(256)
781    ///     .connect("demo.nats.io")
782    ///     .await?;
783    /// # Ok(())
784    /// # }
785    /// ```
786    pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
787        self.sender_capacity = capacity;
788        self
789    }
790
791    /// Sets custom prefix instead of default `_INBOX`.
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// # #[tokio::main]
797    /// # async fn main() -> Result<(), async_nats::Error> {
798    /// async_nats::ConnectOptions::new()
799    ///     .custom_inbox_prefix("CUSTOM")
800    ///     .connect("demo.nats.io")
801    ///     .await?;
802    /// # Ok(())
803    /// # }
804    /// ```
805    pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
806        self.inbox_prefix = prefix.to_string();
807        self
808    }
809
810    /// Sets the name for the client.
811    ///
812    /// # Examples
813    /// ```
814    /// # #[tokio::main]
815    /// # async fn main() -> Result<(), async_nats::Error> {
816    /// async_nats::ConnectOptions::new()
817    ///     .name("rust-service")
818    ///     .connect("demo.nats.io")
819    ///     .await?;
820    /// # Ok(())
821    /// # }
822    /// ```
823    pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
824        self.name = Some(name.to_string());
825        self
826    }
827
828    /// By default, [`ConnectOptions::connect`] will return an error if
829    /// the connection to the server cannot be established.
830    ///
831    /// Setting `retry_on_initial_connect` makes the client
832    /// establish the connection in the background.
833    pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
834        self.retry_on_initial_connect = true;
835        self
836    }
837
838    /// Specifies the number of consecutive reconnect attempts the client will
839    /// make before giving up. This is useful for preventing zombie services
840    /// from endlessly reaching the servers, but it can also be a footgun and
841    /// surprise for users who do not expect that the client can give up
842    /// entirely.
843    ///
844    /// Pass `None` or `0` for no limit.
845    ///
846    /// # Examples
847    /// ```
848    /// # #[tokio::main]
849    /// # async fn main() -> Result<(), async_nats::Error> {
850    /// async_nats::ConnectOptions::new()
851    ///     .max_reconnects(None)
852    ///     .connect("demo.nats.io")
853    ///     .await?;
854    /// # Ok(())
855    /// # }
856    /// ```
857    pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
858        let val: Option<usize> = max_reconnects.into();
859        self.max_reconnects = if val == Some(0) { None } else { val };
860        self
861    }
862
863    /// By default, a server may advertise other servers in the cluster known to it.
864    /// By setting this option, the client will ignore the advertised servers.
865    /// This may be useful if the client may not be able to reach them.
866    pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
867        self.ignore_discovered_servers = true;
868        self
869    }
870
871    /// By default, client will pick random server to which it will try connect to.
872    /// This option disables that feature, forcing it to always respect the order
873    /// in which server addresses were passed.
874    pub fn retain_servers_order(mut self) -> ConnectOptions {
875        self.retain_servers_order = true;
876        self
877    }
878
879    /// Allows passing custom rustls tls config.
880    ///
881    /// # Examples
882    /// ```
883    /// # #[tokio::main]
884    /// # async fn main() -> Result<(), async_nats::Error> {
885    /// let mut root_store = async_nats::rustls::RootCertStore::empty();
886    ///
887    /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
888    ///
889    /// let tls_client = async_nats::rustls::ClientConfig::builder()
890    ///     .with_root_certificates(root_store)
891    ///     .with_no_client_auth();
892    ///
893    /// let client = async_nats::ConnectOptions::new()
894    ///     .require_tls(true)
895    ///     .tls_client_config(tls_client)
896    ///     .connect("tls://demo.nats.io")
897    ///     .await?;
898    ///
899    /// # Ok(())
900    /// # }
901    /// ```
902    pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
903        self.tls_client_config = Some(config);
904        self
905    }
906
907    /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
908    /// protocol messages.
909    ///
910    /// # Examples
911    /// ```
912    /// # #[tokio::main]
913    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
914    /// async_nats::ConnectOptions::new()
915    ///     .read_buffer_capacity(65535)
916    ///     .connect("demo.nats.io")
917    ///     .await?;
918    /// # Ok(())
919    /// # }
920    /// ```
921    pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
922        self.read_buffer_capacity = size;
923        self
924    }
925}
926
927pub(crate) type AsyncCallbackArg1<A, T> =
928    Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
929
930pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
931
932impl<A, T> CallbackArg1<A, T> {
933    pub(crate) async fn call(&self, arg: A) -> T {
934        (self.0.as_ref())(arg).await
935    }
936}
937
938impl<A, T> fmt::Debug for CallbackArg1<A, T> {
939    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
940        f.write_str("callback")
941    }
942}
943
944/// Error report from signing callback.
945// This was needed because std::io::Error isn't Send.
946#[derive(Clone, PartialEq)]
947pub struct AuthError(String);
948
949impl AuthError {
950    pub fn new(s: impl ToString) -> Self {
951        Self(s.to_string())
952    }
953}
954
955impl std::fmt::Display for AuthError {
956    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
957        f.write_str(&format!("AuthError({})", &self.0))
958    }
959}
960
961impl std::fmt::Debug for AuthError {
962    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
963        f.write_str(&format!("AuthError({})", &self.0))
964    }
965}
966
967impl std::error::Error for AuthError {}