Struct async_nats::ConnectOptions
source · pub struct ConnectOptions { /* private fields */ }
Expand description
Connect options. Used to connect with NATS when custom config is needed.
Examples
let mut options =
async_nats::ConnectOptions::new()
.require_tls(true)
.ping_interval(std::time::Duration::from_secs(10))
.connect("demo.nats.io").await?;
Implementations§
source§impl ConnectOptions
impl ConnectOptions
sourcepub fn new() -> ConnectOptions
pub fn new() -> ConnectOptions
Enables customization of NATS connection.
Examples
let mut options =
async_nats::ConnectOptions::new()
.require_tls(true)
.ping_interval(std::time::Duration::from_secs(10))
.connect("demo.nats.io").await?;
sourcepub async fn connect<A: ToServerAddrs>(
self,
addrs: A
) -> Result<Client, ConnectError>
pub async fn connect<A: ToServerAddrs>( self, addrs: A ) -> Result<Client, ConnectError>
Connect to the NATS Server leveraging all passed options.
Examples
let nc = async_nats::ConnectOptions::new().require_tls(true).connect("demo.nats.io").await?;
Pass multiple URLs.
#[tokio::main]
use async_nats::ServerAddr;
let client = async_nats::connect(vec![
"demo.nats.io".parse::<ServerAddr>()?,
"other.nats.io".parse::<ServerAddr>()?,
])
.await
.unwrap();
sourcepub fn with_token(token: String) -> Self
pub fn with_token(token: String) -> Self
Auth against NATS Server with provided token.
Examples
let nc =
async_nats::ConnectOptions::with_token("t0k3n!".into()).connect("demo.nats.io").await?;
sourcepub fn with_user_and_password(user: String, pass: String) -> Self
pub fn with_user_and_password(user: String, pass: String) -> Self
Auth against NATS Server with provided username and password.
Examples
let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
.connect("demo.nats.io").await?;
sourcepub fn with_nkey(seed: String) -> Self
pub fn with_nkey(seed: String) -> Self
Authenticate with a NKey. Requires NKey Seed secret.
Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let nc = async_nats::ConnectOptions::with_nkey(seed.into())
.connect("localhost").await?;
sourcepub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Selfwhere
F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Selfwhere F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
Authenticate with a JWT. Requires function to sign the server nonce. The signing function is asynchronous
Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
// load jwt from creds file or other secure source
async fn load_jwt() -> std::io::Result<String> { todo!(); }
let jwt = load_jwt().await?;
let nc = async_nats::ConnectOptions::with_jwt(jwt,
move |nonce| {
let key_pair = key_pair.clone();
async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }})
.connect("localhost").await?;
sourcepub async fn with_credentials_file(path: PathBuf) -> Result<Self>
pub async fn with_credentials_file(path: PathBuf) -> Result<Self>
Authenticate with NATS using a .creds
file.
Open the provided file, load its creds,
and perform the desired authentication
Example
let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds".into()).await?
.connect("connect.ngs.global").await?;
sourcepub fn with_credentials(creds: &str) -> Result<Self>
pub fn with_credentials(creds: &str) -> Result<Self>
Authenticate with NATS using a credential str, in the creds file format.
Example
let creds =
"-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
------END USER NKEY SEED------
";
let nc = async_nats::ConnectOptions::with_credentials(creds)
.expect("failed to parse static creds")
.connect("connect.ngs.global").await?;
sourcepub fn add_root_certificates(self, path: PathBuf) -> ConnectOptions
pub fn add_root_certificates(self, path: PathBuf) -> ConnectOptions
Loads root certificates by providing the path to them.
Examples
let nc =
async_nats::ConnectOptions::new().add_root_certificates("mycerts.pem".into()).connect("demo.nats.io").await?;
sourcepub fn add_client_certificate(
self,
cert: PathBuf,
key: PathBuf
) -> ConnectOptions
pub fn add_client_certificate( self, cert: PathBuf, key: PathBuf ) -> ConnectOptions
Loads client certificate by providing the path to it.
Examples
let nc =
async_nats::ConnectOptions::new().add_client_certificate("cert.pem".into(), "key.pem".into()).connect("demo.nats.io").await?;
sourcepub fn require_tls(self, is_required: bool) -> ConnectOptions
pub fn require_tls(self, is_required: bool) -> ConnectOptions
Sets or disables TLS requirement. If TLS connection is impossible while options.require_tls(true)
connection will return error.
Examples
let nc =
async_nats::ConnectOptions::new().require_tls(true).connect("demo.nats.io").await?;
sourcepub fn flush_interval(self, flush_interval: Duration) -> ConnectOptions
pub fn flush_interval(self, flush_interval: Duration) -> ConnectOptions
Sets the interval for flushing. NATS connection will send buffered data to the NATS Server whenever buffer limit is reached, but it is also necessary to flush once in a while if client is sending rarely and small messages. Flush interval allows to modify that interval.
Examples
async_nats::ConnectOptions::new().flush_interval(Duration::from_millis(100)).connect("demo.nats.io").await?;
sourcepub fn ping_interval(self, ping_interval: Duration) -> ConnectOptions
pub fn ping_interval(self, ping_interval: Duration) -> ConnectOptions
Sets how often Client sends PING message to the server.
Examples
async_nats::ConnectOptions::new().flush_interval(Duration::from_millis(100)).connect("demo.nats.io").await?;
sourcepub fn no_echo(self) -> ConnectOptions
pub fn no_echo(self) -> ConnectOptions
Sets no_echo
option which disables delivering messages that were published from the same
connection.
Examples
async_nats::ConnectOptions::new().no_echo().connect("demo.nats.io").await?;
sourcepub fn subscription_capacity(self, capacity: usize) -> ConnectOptions
pub fn subscription_capacity(self, capacity: usize) -> ConnectOptions
Sets the capacity for Subscribers
. Exceeding it will trigger slow consumer
error
callback and drop messages.
Default is set to 1024 messages buffer.
Examples
async_nats::ConnectOptions::new().subscription_capacity(1024).connect("demo.nats.io").await?;
sourcepub fn connection_timeout(self, timeout: Duration) -> ConnectOptions
pub fn connection_timeout(self, timeout: Duration) -> ConnectOptions
Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks. Default is set to 5 seconds.
Examples
async_nats::ConnectOptions::new().connection_timeout(tokio::time::Duration::from_secs(5)).connect("demo.nats.io").await?;
sourcepub fn request_timeout(self, timeout: Option<Duration>) -> ConnectOptions
pub fn request_timeout(self, timeout: Option<Duration>) -> ConnectOptions
Sets a timeout for Client::request
. Default value is set to 10 seconds.
Examples
async_nats::ConnectOptions::new().request_timeout(Some(std::time::Duration::from_secs(3))).connect("demo.nats.io").await?;
sourcepub fn event_callback<F, Fut>(self, cb: F) -> ConnectOptionswhere
F: Fn(Event) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + 'static + Send + Sync,
pub fn event_callback<F, Fut>(self, cb: F) -> ConnectOptionswhere F: Fn(Event) -> Fut + Send + Sync + 'static, Fut: Future<Output = ()> + 'static + Send + Sync,
Registers an asynchronous callback for errors that are received over the wire from the server.
Examples
As asynchronous callbacks are still not in stable
channel, here are some examples how to
work around this
Basic
If you don’t need to move anything into the closure, simple signature can be used:
async_nats::ConnectOptions::new().event_callback(|event| async move {
println!("event occurred: {}", event);
}).connect("demo.nats.io").await?;
Listening to specific event kind
async_nats::ConnectOptions::new().event_callback(|event| async move {
match event {
async_nats::Event::Disconnected => println!("disconnected"),
async_nats::Event::Connected => println!("reconnected"),
async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
other => println!("other event happened: {}", other),
}
}).connect("demo.nats.io").await?;
Advanced
If you need to move something into the closure, here’s an example how to do that
let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
async_nats::ConnectOptions::new().event_callback(move |event| {
let tx = tx.clone();
async move {
tx.send(event).await.unwrap();
}
}).connect("demo.nats.io").await?;
sourcepub fn client_capacity(self, capacity: usize) -> ConnectOptions
pub fn client_capacity(self, capacity: usize) -> ConnectOptions
By default, Client dispatches op’s to the Client onto the channel with capacity of 128. This option enables overriding it.
Examples
async_nats::ConnectOptions::new().client_capacity(256).connect("demo.nats.io").await?;
sourcepub fn custom_inbox_prefix<T: ToString>(self, prefix: T) -> ConnectOptions
pub fn custom_inbox_prefix<T: ToString>(self, prefix: T) -> ConnectOptions
Sets custom prefix instead of default _INBOX
.
Examples
async_nats::ConnectOptions::new().custom_inbox_prefix("CUSTOM").connect("demo.nats.io").await?;
sourcepub fn name<T: ToString>(self, name: T) -> ConnectOptions
pub fn name<T: ToString>(self, name: T) -> ConnectOptions
Sets the name for the client.
Examples
async_nats::ConnectOptions::new().name("rust-service").connect("demo.nats.io").await?;
pub fn retry_on_initial_connect(self) -> ConnectOptions
pub fn ignore_discovered_servers(self) -> ConnectOptions
sourcepub fn retain_servers_order(self) -> ConnectOptions
pub fn retain_servers_order(self) -> ConnectOptions
By default, client will pick random server to which it will try connect to. This option disables that feature, forcing it to always respect the order in which server addresses were passed.