// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! An async Rust client for the NATS.io ecosystem.
//!
//! `git clone https://github.com/nats-io/nats.rs`
//!
//! NATS.io is a simple, secure and high performance open source messaging
//! system for cloud native applications, `IoT` messaging, and microservices
//! architectures.
//!
//! For more information see [https://nats.io/].
//!
//! [https://nats.io/]: https://nats.io/
//!
//! ## Examples
//!
//! Basic connections, and those with options. The compiler will force these to
//! be correct.
//!
//! ```no_run
//! # smol::block_on(async {
//! let nc = nats::asynk::connect("demo.nats.io").await?;
//!
//! let nc2 = nats::asynk::Options::with_user_pass("derek", "s3cr3t!")
//! .with_name("My Rust NATS App")
//! .connect("127.0.0.1")
//! .await?;
//!
//! let nc3 = nats::asynk::Options::with_credentials("path/to/my.creds")
//! .connect("connect.ngs.global")
//! .await?;
//!
//! let nc4 = nats::asynk::Options::new()
//! .add_root_certificate("my-certs.pem")
//! .connect("tls://demo.nats.io:4443")
//! .await?;
//! # std::io::Result::Ok(()) });
//! ```
//!
//! ### Publish
//!
//! ```
//! # smol::block_on(async {
//! let nc = nats::asynk::connect("demo.nats.io").await?;
//! nc.publish("my.subject", "Hello World!").await?;
//!
//! nc.publish("my.subject", "my message").await?;
//!
//! // Publish a request manually.
//! let reply = nc.new_inbox();
//! let rsub = nc.subscribe(&reply).await?;
//! nc.publish_request("my.subject", &reply, "Help me!").await?;
//! # std::io::Result::Ok(()) });
//! ```
//!
//! ### Subscribe
//!
//! ```no_run
//! # smol::block_on(async {
//! # use std::time::Duration;
//! let nc = nats::asynk::connect("demo.nats.io").await?;
//! let sub = nc.subscribe("foo").await?;
//!
//! // Receive a message.
//! if let Some(msg) = sub.next().await {}
//!
//! // Queue subscription.
//! let qsub = nc.queue_subscribe("foo", "my_group").await?;
//! # std::io::Result::Ok(()) });
//! ```
//!
//! ### Request/Response
//!
//! ```no_run
//! # use std::time::Duration;
//! # smol::block_on(async {
//! let nc = nats::asynk::connect("demo.nats.io").await?;
//! let resp = nc.request("foo", "Help me?").await?;
//!
//! // With multiple responses.
//! let rsub = nc.request_multi("foo", "Help").await?;
//! if let Some(msg) = rsub.next().await {}
//! if let Some(msg) = rsub.next().await {}
//!
//! // Publish a request manually.
//! let reply = nc.new_inbox();
//! let rsub = nc.subscribe(&reply).await?;
//! nc.publish_request("foo", &reply, "Help me!").await?;
//! let response = rsub.next().await;
//! # std::io::Result::Ok(()) });
//! ```
use std::fmt;
use std::io;
use std::net::IpAddr;
use std::path::Path;
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use blocking::unblock;
use crossbeam_channel::{Receiver, Sender};
use crate::header::HeaderMap;
use crate::IntoServerList;
/// Connect to a NATS server at the given url.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::connect("demo.nats.io").await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn connect<I>(nats_url: I) -> io::Result<Connection>
where
I: IntoServerList,
{
Options::new().connect(nats_url).await
}
/// A NATS client connection.
#[derive(Clone, Debug)]
pub struct Connection {
inner: crate::Connection,
}
impl Connection {
fn new(inner: crate::Connection) -> Connection {
Self { inner }
}
/// Publishes a message.
pub async fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> {
self.publish_with_reply_or_headers(subject, None, None, msg)
.await
}
/// Publishes a message with a reply subject.
pub async fn publish_request(
&self,
subject: &str,
reply: &str,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) =
self.inner
.try_publish_with_reply_or_headers(subject, Some(reply), None, &msg)
{
return res;
}
let subject = subject.to_string();
let reply = reply.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || inner.publish_request(&subject, &reply, msg)).await
}
/// Creates a new unique subject for receiving replies.
pub fn new_inbox(&self) -> String {
self.inner.new_inbox()
}
/// Publishes a message and waits for the response.
pub async fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<Message> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request(&subject, msg)).await?;
Ok(msg.into())
}
/// Publishes a message and waits for the response or until the
/// timeout duration is reached
pub async fn request_timeout(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
timeout: Duration,
) -> io::Result<Message> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request_timeout(&subject, msg, timeout)).await?;
Ok(msg.into())
}
/// Publishes a message and returns a subscription for awaiting the
/// response.
pub async fn request_multi(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
) -> io::Result<Subscription> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let sub = unblock(move || inner.request_multi(&subject, msg)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner: sub,
_closer_tx,
closer_rx,
})
}
/// Creates a subscription.
pub async fn subscribe(&self, subject: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
let inner = self.inner.clone();
let inner = unblock(move || inner.subscribe(&subject)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
_closer_tx,
closer_rx,
})
}
/// Creates a queue subscription.
pub async fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
let queue = queue.to_string();
let inner = self.inner.clone();
let inner = unblock(move || inner.queue_subscribe(&subject, &queue)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
_closer_tx,
closer_rx,
})
}
/// Flushes by performing a round trip to the server.
pub async fn flush(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.flush()).await
}
/// Flushes by performing a round trip to the server or times out after a
/// duration of time.
pub async fn flush_timeout(&self, timeout: Duration) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.flush_timeout(timeout)).await
}
/// Calculates the round trip time between this client and the server.
pub async fn rtt(&self) -> io::Result<Duration> {
let inner = self.inner.clone();
unblock(move || inner.rtt()).await
}
/// Returns the client IP as known by the most recently connected server.
///
/// Supported as of server version 2.1.6.
pub fn client_ip(&self) -> io::Result<IpAddr> {
self.inner.client_ip()
}
/// Returns the client ID as known by the most recently connected server.
pub fn client_id(&self) -> u64 {
self.inner.client_id()
}
/// Unsubscribes all subscriptions and flushes the connection.
///
/// Remaining messages can still be received by existing [`Subscription`]s.
pub async fn drain(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.drain()).await
}
/// Closes the connection.
pub async fn close(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.close()).await;
Ok(())
}
/// Publish a message which may have a reply subject or headers set.
pub async fn publish_with_reply_or_headers(
&self,
subject: &str,
reply: Option<&str>,
headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) = self
.inner
.try_publish_with_reply_or_headers(subject, reply, headers, &msg)
{
return res;
}
let subject = subject.to_string();
let reply = reply.map(str::to_owned);
let headers = headers.map(HeaderMap::clone);
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || {
inner.publish_with_reply_or_headers(&subject, reply.as_deref(), headers.as_ref(), msg)
})
.await
}
}
/// A subscription to a subject.
#[derive(Debug)]
pub struct Subscription {
inner: crate::Subscription,
// Dropping this signals to any receivers that the subscription has been closed. These should
// be dropped after inner is dropped, so if another thread is currently blocking, the
// subscription is closed on that thread.
_closer_tx: Sender<()>,
closer_rx: Receiver<()>,
}
impl Subscription {
/// Gets the next message, or returns `None` if the subscription
/// has been unsubscribed or the connection is closed.
pub async fn next(&self) -> Option<Message> {
if let Some(msg) = self.inner.try_next() {
return Some(msg.into());
}
let inner = self.inner.clone();
let closer = self.closer_rx.clone();
let msg = unblock(move || {
// If the subscription is dropped, we should stop blocking this thread immediately.
crossbeam_channel::select! {
recv(closer) -> _ => None,
recv(inner.receiver()) -> msg => msg.ok(),
}
})
.await?;
Some(msg.into())
}
/// Try to get the next message, or None if no messages
/// are present or if the subscription has been unsubscribed
/// or the connection closed.
///
/// # Example
/// ```
/// # fn main() -> std::io::Result<()> {
/// # let nc = nats::connect("demo.nats.io")?;
/// # let sub = nc.subscribe("foo")?;
/// if let Some(msg) = sub.try_next() {
/// println!("Received {}", msg);
/// }
/// # Ok(())
/// # }
/// ```
pub fn try_next(&self) -> Option<Message> {
self.inner.try_next().map(From::from)
}
/// Stops listening for new messages, but the remaining queued messages can
/// still be received.
pub async fn drain(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.drain()).await
}
/// Stops listening for new messages and discards the remaining queued
/// messages.
pub async fn unsubscribe(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.unsubscribe()).await
}
}
/// A message received on a subject.
#[derive(Clone)]
pub struct Message {
/// The subject this message came from.
pub subject: String,
/// Optional reply subject that may be used for sending a response to this
/// message.
pub reply: Option<String>,
/// The message contents.
pub data: Vec<u8>,
/// Optional headers associated with this `Message`.
pub headers: Option<HeaderMap>,
/// Client for publishing on the reply subject.
#[doc(hidden)]
pub client: Option<crate::client::Client>,
/// Whether this message has already been successfully double-acked
/// using `JetStream`.
#[doc(hidden)]
pub double_acked: Arc<AtomicBool>,
}
impl From<crate::Message> for Message {
fn from(sync: crate::Message) -> Message {
Message {
subject: sync.subject,
reply: sync.reply,
data: sync.data,
headers: sync.headers,
client: sync.client,
double_acked: sync.double_acked,
}
}
}
impl Message {
/// Creates new empty `Message`, without a Client.
/// Useful for passing `Message` data or creating `Message` instance without caring about `Client`,
/// but cannot be used on it's own for associated methods as those require `Client` injected into `Message`
/// and will error without it.
pub fn new(
subject: &str,
reply: Option<&str>,
data: impl AsRef<[u8]>,
headers: Option<HeaderMap>,
) -> Message {
Message {
subject: subject.to_string(),
reply: reply.map(String::from),
data: data.as_ref().to_vec(),
headers,
..Default::default()
}
}
/// Respond to a request message.
pub async fn respond(&self, msg: impl AsRef<[u8]>) -> io::Result<()> {
let reply = self.reply.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "No reply subject to reply to")
})?;
let client = self.client.as_ref().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotConnected,
crate::message::MESSAGE_NOT_BOUND,
)
})?;
if let Some(res) = client.try_publish(reply.as_str(), None, None, msg.as_ref()) {
return res;
}
// clone only if we have to move the data to the thread
let client = client.clone();
let reply = reply.to_owned();
let msg = msg.as_ref().to_vec();
unblock(move || client.publish(&reply, None, None, msg.as_ref())).await
}
}
impl Default for Message {
fn default() -> Message {
Message {
subject: String::from(""),
reply: None,
data: Vec::new(),
headers: None,
client: None,
double_acked: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Message")
.field("subject", &self.subject)
.field("headers", &self.headers)
.field("reply", &self.reply)
.field("length", &self.data.len())
.finish()
}
}
/// Connect options.
#[derive(Debug, Default)]
pub struct Options {
inner: crate::Options,
}
impl Options {
/// `Options` for establishing a new NATS `Connection`.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let options = nats::asynk::Options::new();
/// let nc = options.connect("demo.nats.io").await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn new() -> Options {
Options {
inner: crate::Options::new(),
}
}
/// Authenticate with NATS using a token.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::with_token("t0k3n!")
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_token(token: &str) -> Options {
Options {
inner: crate::Options::with_token(token),
}
}
/// Authenticate with NATS using a username and password.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::with_user_pass("derek", "s3cr3t!")
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_user_pass(user: &str, password: &str) -> Options {
Options {
inner: crate::Options::with_user_pass(user, password),
}
}
/// Authenticate with NATS using a `.creds` file.
///
/// # Example
/// ```no_run
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::with_credentials("path/to/my.creds")
/// .connect("connect.ngs.global")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_credentials(path: impl AsRef<Path>) -> Options {
Options {
inner: crate::Options::with_credentials(path),
}
}
/// Authenticate with a function that loads user JWT and a signature
/// function.
///
/// # Example
/// ```no_run
/// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
/// let kp = nkeys::KeyPair::from_seed(seed).unwrap();
///
/// fn load_jwt() -> std::io::Result<String> {
/// todo!()
/// }
///
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::with_jwt(load_jwt, move |nonce| kp.sign(nonce).unwrap())
/// .connect("localhost")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_jwt<J, S>(jwt_cb: J, sig_cb: S) -> Options
where
J: Fn() -> io::Result<String> + Send + Sync + 'static,
S: Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
{
Options {
inner: crate::Options::with_jwt(jwt_cb, sig_cb),
}
}
/// Authenticate with NATS using a public key and a signature function.
///
/// # Example
/// ```no_run
/// let nkey = "UAMMBNV2EYR65NYZZ7IAK5SIR5ODNTTERJOBOF4KJLMWI45YOXOSWULM";
/// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
/// let kp = nkeys::KeyPair::from_seed(seed).unwrap();
///
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::with_nkey(nkey, move |nonce| kp.sign(nonce).unwrap())
/// .connect("localhost")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_nkey<F>(nkey: &str, sig_cb: F) -> Options
where
F: Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
{
Options {
inner: crate::Options::with_nkey(nkey, sig_cb),
}
}
/// Set client certificate and private key files.
///
/// # Example
/// ```no_run
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .client_cert("client-cert.pem", "client-key.pem")
/// .connect("nats://localhost:4443")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn client_cert(self, cert: impl AsRef<Path>, key: impl AsRef<Path>) -> Options {
Options {
inner: self.inner.client_cert(cert, key),
}
}
/// Add a name option to this configuration.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .with_name("My App")
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn with_name(self, name: &str) -> Options {
Options {
inner: self.inner.with_name(name),
}
}
/// Select option to not deliver messages that we have published.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .no_echo()
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn no_echo(self) -> Options {
Options {
inner: self.inner.no_echo(),
}
}
/// Select option to enable reconnect with backoff
/// on first failed connection attempt.
/// The reconnect logic with `max_reconnects` and the
/// `reconnect_delay_callback` will be specified the same
/// as before but will be invoked on the first failed
/// connection attempt.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .retry_on_failed_connect()
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn retry_on_failed_connect(self) -> Options {
Options {
inner: self.inner.retry_on_failed_connect(),
}
}
/// Set the maximum number of reconnect attempts.
/// If no servers remain that are under this threshold,
/// then no further reconnect shall be attempted.
/// The reconnect attempt for a server is reset upon
/// successfull connection.
/// If None then there is no maximum number of attempts.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .max_reconnects(3)
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn max_reconnects<T: Into<Option<usize>>>(self, max_reconnects: T) -> Options {
Options {
inner: self.inner.max_reconnects(max_reconnects),
}
}
/// Set the maximum amount of bytes to buffer
/// when accepting outgoing traffic in disconnected
/// mode.
///
/// The default value is 8mb.
///
/// # Example
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .reconnect_buffer_size(64 * 1024)
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reconnect_buffer_size(self, reconnect_buffer_size: usize) -> Options {
Options {
inner: self.inner.reconnect_buffer_size(reconnect_buffer_size),
}
}
/// Establish a `Connection` with a NATS server.
///
/// Multiple servers may be specified by separating
/// them with commas.
///
/// # Example
///
/// ```
/// # smol::block_on(async {
/// let options = nats::asynk::Options::new();
/// let nc = options.connect("demo.nats.io").await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// In the below case, the second server is configured
/// to use TLS but the first one is not. Using the
/// `tls_required` method can ensure that all
/// servers are connected to with TLS, if that is
/// your intention.
///
///
/// ```
/// # smol::block_on(async {
/// let options = nats::asynk::Options::new();
/// let nc = options
/// .connect("nats://demo.nats.io:4222,tls://demo.nats.io:4443")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn connect<I>(self, nats_url: I) -> io::Result<Connection>
where
I: IntoServerList,
{
let servers = nats_url.into_server_list()?;
let conn = unblock(move || self.inner.connect(servers)).await?;
Ok(Connection::new(conn))
}
/// Set a callback to be executed when connectivity to
/// a server has been lost.
///
/// # Example
///
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .disconnect_callback(|| println!("connection has been lost"))
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn disconnect_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.disconnect_callback(cb),
}
}
/// Set a callback to be executed when connectivity to a
/// server has been reestablished.
///
/// # Example
///
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .reconnect_callback(|| println!("connection has been reestablished"))
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reconnect_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.reconnect_callback(cb),
}
}
/// Set a callback to be executed when the client has been
/// closed due to exhausting reconnect retries to known servers
/// or by completing a drain request.
///
/// # Example
///
/// ```
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .close_callback(|| println!("connection has been closed"))
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn close_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.close_callback(cb),
}
}
/// Set a callback to be executed for calculating the backoff duration
/// to wait before a server reconnection attempt.
///
/// The function takes the number of reconnects as an argument
/// and returns the `Duration` that should be waited before
/// making the next connection attempt.
///
/// It is recommended that some random jitter is added to
/// your returned `Duration`.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .reconnect_delay_callback(|c| Duration::from_millis(std::cmp::min((c * 100) as u64, 8000)))
/// .connect("demo.nats.io")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reconnect_delay_callback<F>(self, cb: F) -> Self
where
F: Fn(usize) -> Duration + Send + Sync + 'static,
{
Options {
inner: self.inner.reconnect_delay_callback(cb),
}
}
/// Setting this requires that TLS be set for all server connections.
///
/// If you only want to use TLS for some server connections, you may
/// declare them separately in the connect string by prefixing them
/// with `tls://host:port` instead of `nats://host:port`.
///
/// # Examples
/// ```no_run
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .tls_required(true)
/// .connect("tls://demo.nats.io:4443")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn tls_required(self, tls_required: bool) -> Options {
Options {
inner: self.inner.tls_required(tls_required),
}
}
/// Adds a root certificate file.
///
/// The file must be PEM encoded. All certificates in the file will be used.
///
/// # Examples
/// ```no_run
/// # smol::block_on(async {
/// let nc = nats::asynk::Options::new()
/// .add_root_certificate("my-certs.pem")
/// .connect("tls://demo.nats.io:4443")
/// .await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn add_root_certificate(self, path: impl AsRef<Path>) -> Options {
Options {
inner: self.inner.add_root_certificate(path),
}
}
}