#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(test, deny(warnings))]
#![cfg_attr(
feature = "fault_injection",
deny(
future_incompatible,
missing_copy_implementations,
missing_docs,
nonstandard_style,
rust_2018_idioms,
trivial_casts,
trivial_numeric_casts,
unsafe_code,
unused,
unused_qualifications
)
)]
#![cfg_attr(feature = "fault_injection", deny(
// over time, consider enabling the following commented-out lints:
// clippy::else_if_without_else,
// clippy::indexing_slicing,
// clippy::multiple_crate_versions,
// clippy::missing_const_for_fn,
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::checked_conversions,
clippy::decimal_literal_representation,
clippy::doc_markdown,
clippy::empty_enum,
clippy::explicit_into_iter_loop,
clippy::explicit_iter_loop,
clippy::expl_impl_clone_on_copy,
clippy::fallible_impl_from,
clippy::filter_map_next,
clippy::float_arithmetic,
clippy::get_unwrap,
clippy::if_not_else,
clippy::inline_always,
clippy::invalid_upcast_comparisons,
clippy::items_after_statements,
clippy::manual_filter_map,
clippy::manual_find_map,
clippy::map_flatten,
clippy::map_unwrap_or,
clippy::match_same_arms,
clippy::maybe_infinite_iter,
clippy::mem_forget,
clippy::needless_borrow,
clippy::needless_continue,
clippy::needless_pass_by_value,
clippy::non_ascii_literal,
clippy::path_buf_push_overwrite,
clippy::print_stdout,
clippy::single_match_else,
clippy::string_add,
clippy::string_add_assign,
clippy::type_repetition_in_bounds,
clippy::unicode_not_nfc,
clippy::unimplemented,
clippy::unseparated_literal_suffix,
clippy::wildcard_dependencies,
clippy::wildcard_enum_match_arm,
))]
#![allow(
clippy::match_like_matches_macro,
clippy::await_holding_lock,
clippy::shadow_reuse,
clippy::shadow_same,
clippy::shadow_unrelated,
clippy::wildcard_enum_match_arm,
clippy::module_name_repetitions
)]
pub mod asynk;
mod auth_utils;
mod client;
mod connect;
mod connector;
mod message;
mod options;
mod proto;
mod secure_wipe;
mod subscription;
pub mod header;
pub mod jetstream;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable")))]
pub mod kv;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable")))]
pub mod object_store;
#[cfg(feature = "fault_injection")]
mod fault_injection;
#[cfg(feature = "fault_injection")]
use fault_injection::{inject_delay, inject_io_failure};
#[cfg(not(feature = "fault_injection"))]
fn inject_delay() {}
#[cfg(not(feature = "fault_injection"))]
fn inject_io_failure() -> io::Result<()> {
Ok(())
}
#[doc(hidden)]
#[deprecated(since = "0.6.0", note = "this has been renamed to `Options`.")]
pub type ConnectionOptions = Options;
#[doc(hidden)]
#[deprecated(since = "0.17.0", note = "this has been moved to `header::HeaderMap`.")]
pub type Headers = HeaderMap;
pub use header::HeaderMap;
use std::{
io::{self, Error, ErrorKind},
sync::Arc,
time::{Duration, Instant},
};
use lazy_static::lazy_static;
use regex::Regex;
pub use connector::{IntoServerList, ServerAddress};
pub use jetstream::JetStreamOptions;
pub use message::Message;
pub use options::Options;
pub use subscription::{Handler, Subscription};
pub use rustls;
#[doc(hidden)]
pub use connect::ConnectInfo;
use client::Client;
use options::AuthStyle;
use secure_wipe::{SecureString, SecureVec};
const VERSION: &str = env!("CARGO_PKG_VERSION");
const LANG: &str = "rust";
const DEFAULT_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
lazy_static! {
static ref VERSION_RE: Regex = Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap();
}
#[allow(unused)]
#[derive(Debug, Default, Clone)]
pub struct ServerInfo {
pub server_id: String,
pub server_name: String,
pub host: String,
pub port: u16,
pub version: String,
pub auth_required: bool,
pub tls_required: bool,
pub max_payload: usize,
pub proto: i8,
pub client_id: u64,
pub go: String,
pub nonce: String,
pub connect_urls: Vec<String>,
pub client_ip: String,
pub headers: bool,
pub lame_duck_mode: bool,
}
impl ServerInfo {
fn parse(s: &str) -> Option<ServerInfo> {
let mut obj = json::parse(s).ok()?;
Some(ServerInfo {
server_id: obj["server_id"].take_string()?,
server_name: obj["server_name"].take_string().unwrap_or_default(),
host: obj["host"].take_string()?,
port: obj["port"].as_u16()?,
version: obj["version"].take_string()?,
auth_required: obj["auth_required"].as_bool().unwrap_or(false),
tls_required: obj["tls_required"].as_bool().unwrap_or(false),
max_payload: obj["max_payload"].as_usize()?,
proto: obj["proto"].as_i8()?,
client_id: obj["client_id"].as_u64()?,
go: obj["go"].take_string()?,
nonce: obj["nonce"].take_string().unwrap_or_default(),
connect_urls: obj["connect_urls"]
.members_mut()
.filter_map(|m| m.take_string())
.collect(),
client_ip: obj["client_ip"].take_string().unwrap_or_default(),
headers: obj["headers"].as_bool().unwrap_or(false),
lame_duck_mode: obj["ldm"].as_bool().unwrap_or(false),
})
}
}
#[derive(Clone, Debug)]
pub struct Connection(pub(crate) Arc<Inner>);
#[derive(Clone, Debug)]
struct Inner {
client: Client,
}
impl Drop for Inner {
fn drop(&mut self) {
self.client.shutdown();
}
}
pub fn connect<I: IntoServerList>(nats_urls: I) -> io::Result<Connection> {
Options::new().connect(nats_urls)
}
impl Connection {
pub(crate) fn connect_with_options<I>(urls: I, options: Options) -> io::Result<Connection>
where
I: IntoServerList,
{
let urls = urls.into_server_list()?;
let client = Client::connect(urls, options)?;
client.flush(DEFAULT_FLUSH_TIMEOUT)?;
Ok(Connection(Arc::new(Inner { client })))
}
pub fn subscribe(&self, subject: &str) -> io::Result<Subscription> {
self.do_subscribe(subject, None)
}
pub fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result<Subscription> {
self.do_subscribe(subject, Some(queue))
}
pub fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> {
self.publish_with_reply_or_headers(subject, None, None, msg)
}
pub fn publish_request(
&self,
subject: &str,
reply: &str,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
self.0
.client
.publish(subject, Some(reply), None, msg.as_ref())
}
pub fn new_inbox(&self) -> String {
format!("_INBOX.{}", nuid::next())
}
pub fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<Message> {
self.request_with_headers_or_timeout(subject, None, None, msg)
}
pub fn request_timeout(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
timeout: Duration,
) -> io::Result<Message> {
self.request_with_headers_or_timeout(subject, None, Some(timeout), msg)
}
pub fn request_with_headers(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
headers: &HeaderMap,
) -> io::Result<Message> {
self.request_with_headers_or_timeout(subject, Some(headers), None, msg)
}
pub fn request_with_headers_or_timeout(
&self,
subject: &str,
maybe_headers: Option<&HeaderMap>,
maybe_timeout: Option<Duration>,
msg: impl AsRef<[u8]>,
) -> io::Result<Message> {
let reply = self.new_inbox();
let sub = self.subscribe(&reply)?;
self.publish_with_reply_or_headers(subject, Some(reply.as_str()), maybe_headers, msg)?;
let result = if let Some(timeout) = maybe_timeout {
sub.next_timeout(timeout)
} else if let Some(msg) = sub.next() {
Ok(msg)
} else {
Err(ErrorKind::ConnectionReset.into())
};
if let Ok(msg) = result.as_ref() {
if msg.is_no_responders() {
return Err(Error::new(ErrorKind::NotFound, "no responders"));
}
}
result
}
pub fn request_multi(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<Subscription> {
let reply = self.new_inbox();
let sub = self.subscribe(&reply)?;
self.publish_with_reply_or_headers(subject, Some(reply.as_str()), None, msg)?;
Ok(sub)
}
pub fn flush(&self) -> io::Result<()> {
self.flush_timeout(DEFAULT_FLUSH_TIMEOUT)
}
pub fn flush_timeout(&self, duration: Duration) -> io::Result<()> {
self.0.client.flush(duration)
}
pub fn close(self) {
self.0.client.flush(DEFAULT_FLUSH_TIMEOUT).ok();
self.0.client.close();
}
pub fn rtt(&self) -> io::Result<Duration> {
let start = Instant::now();
self.flush()?;
Ok(start.elapsed())
}
pub fn is_server_compatible_version(&self, major: i64, minor: i64, patch: i64) -> bool {
let server_info = self.0.client.server_info();
let server_version_captures = VERSION_RE.captures(&server_info.version).unwrap();
let server_major = server_version_captures
.get(1)
.map(|m| m.as_str().parse::<i64>().unwrap())
.unwrap();
let server_minor = server_version_captures
.get(2)
.map(|m| m.as_str().parse::<i64>().unwrap())
.unwrap();
let server_patch = server_version_captures
.get(3)
.map(|m| m.as_str().parse::<i64>().unwrap())
.unwrap();
if server_major < major
|| (server_major == major && server_minor < minor)
|| (server_major == major && server_minor == minor && server_patch < patch)
{
return false;
}
true
}
pub fn client_ip(&self) -> io::Result<std::net::IpAddr> {
let info = self.0.client.server_info();
match info.client_ip.as_str() {
"" => Err(Error::new(
ErrorKind::Other,
&*format!(
"client_ip was not provided by the server. It is \
supported on servers above version 2.1.6. The server \
version is {}",
info.version
),
)),
ip => match ip.parse() {
Ok(addr) => Ok(addr),
Err(_) => Err(Error::new(
ErrorKind::InvalidData,
&*format!(
"client_ip provided by the server cannot be parsed. \
The server provided IP: {}",
info.client_ip
),
)),
},
}
}
pub fn client_id(&self) -> u64 {
self.0.client.server_info().client_id
}
pub fn drain(&self) -> io::Result<()> {
self.0.client.flush(DEFAULT_FLUSH_TIMEOUT)?;
self.0.client.close();
Ok(())
}
pub fn publish_with_reply_or_headers(
&self,
subject: &str,
reply: Option<&str>,
headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
self.0.client.publish(subject, reply, headers, msg.as_ref())
}
pub fn max_payload(&self) -> usize {
self.0.client.server_info.lock().max_payload
}
fn do_subscribe(&self, subject: &str, queue: Option<&str>) -> io::Result<Subscription> {
let (sid, receiver) = self.0.client.subscribe(subject, queue)?;
Ok(Subscription::new(
sid,
subject.to_string(),
receiver,
self.0.client.clone(),
))
}
#[doc(hidden)]
pub fn try_publish_with_reply_or_headers(
&self,
subject: &str,
reply: Option<&str>,
headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> Option<io::Result<()>> {
self.0
.client
.try_publish(subject, reply, headers, msg.as_ref())
}
}