mod address;
pub mod error;
mod parser;
mod refs;
mod state;
#[cfg(test)]
mod tests;
#[cfg(feature = "tls")]
pub mod tls;
use log::trace;
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
fmt,
str::FromStr,
sync::atomic::{AtomicU64, Ordering},
};
use tokio::sync::mpsc::Sender as MpscSender;
use crate::util;
pub use self::{
address::Address,
refs::{ClientRef, ClientRefMut, StableMutexGuard},
state::{ClientState, ConnectionState, StateTransition, StateTransitionResult},
};
///////////////////////////////////////////////////////////////////////////////////////////////////
/// The [`INFO`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#info) message sent by the server
#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
pub struct Info {
pub(crate) server_id: String,
pub(crate) version: String,
pub(crate) go: String,
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) max_payload: usize,
#[serde(default)]
pub(crate) proto: i32,
pub(crate) client_id: Option<u64>,
#[serde(default)]
pub(crate) auth_required: bool,
#[serde(default)]
pub(crate) tls_required: bool,
#[serde(default)]
pub(crate) tls_verify: bool,
#[serde(default)]
pub(crate) connect_urls: Vec<Address>,
}
impl Info {
/// Construct a new default `Info`
pub(crate) fn new() -> Self {
Self::default()
}
/// The unique identifier of the NATS server
pub fn server_id(&self) -> &str {
&self.server_id
}
/// The version of the NATS server
pub fn version(&self) -> &str {
&self.version
}
/// The version of golang the NATS server was built with
pub fn go(&self) -> &str {
&self.go
}
/// The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be
/// configured with -client_advertise host:port
pub fn host(&self) -> &str {
&self.host
}
/// The port number the NATS server is configured to listen on
pub fn port(&self) -> u16 {
self.port
}
/// Maximum payload size, in bytes, that the server will accept from the client.
pub fn max_payload(&self) -> usize {
self.max_payload
}
/// An integer indicating the protocol version of the server. The server version 1.2.0 sets
/// this to 1 to indicate that it supports the "Echo" feature.
pub fn proto(&self) -> i32 {
self.proto
}
/// An optional unsigned integer (64 bits) representing the internal client identifier in the
/// server. This can be used to filter client connections in monitoring, correlate with error
/// logs, etc...
pub fn client_id(&self) -> Option<u64> {
self.client_id
}
/// If this is set, then the client should try to authenticate upon connect.
pub fn auth_required(&self) -> bool {
self.auth_required
}
/// If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be
/// ssl_required and has been updated along with the protocol from SSL to TLS.
pub fn tls_required(&self) -> bool {
self.tls_required
}
/// If this is set, the client must provide a valid certificate during the TLS handshake.
pub fn tls_verify(&self) -> bool {
self.tls_verify
}
/// An optional list of server urls that a client can connect to.
pub fn connect_urls(&self) -> &[Address] {
&self.connect_urls
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// The methods of client authorization set in the [`Connect`](struct.Connect.html) message
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(untagged)]
pub enum Authorization {
/// Use the `auth_token` authorization method
Token {
#[serde(rename = "auth_token")]
token: String,
},
/// Use the `user` and `pass` authorization method
UsernamePassword {
#[serde(rename = "user")]
username: String,
#[serde(rename = "pass")]
password: String,
},
}
impl Authorization {
/// Create a `Authorization::token`
pub fn token(token: String) -> Self {
Authorization::Token { token }
}
/// Create a `Authorization::UsernamePassword`
pub fn username_password(username: String, password: String) -> Self {
Authorization::UsernamePassword { username, password }
}
}
impl FromStr for Authorization {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match util::split_after(s, util::USERNAME_PASSWORD_SEPARATOR) {
(token, None) => Ok(Authorization::token(String::from(token))),
(username, Some(password)) => Ok(Authorization::username_password(
String::from(username),
String::from(password),
)),
}
}
}
impl fmt::Display for Authorization {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
Authorization::Token { token } => write!(f, "{}", token)?,
Authorization::UsernamePassword { username, password } => {
write!(f, "{}:{}", username, password)?
}
}
Ok(())
}
}
/// The [`CONNECT`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#connect) message sent by the client
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Connect {
verbose: bool,
pedantic: bool,
tls_required: bool,
#[serde(flatten)]
authorization: Option<Authorization>,
name: Option<String>,
#[serde(rename = "lang")]
language: String,
version: String,
protocol: i32,
echo: bool,
}
impl Connect {
/// Construct a new default `Connect`
pub fn new() -> Self {
Self::default()
}
/// Return `true` if the connection is verbose.
pub fn is_verbose(&self) -> bool {
self.verbose
}
/// Turns on +OK protocol acknowledgements. [default = `false`]
pub fn verbose(&mut self, verbose: bool) -> &mut Self {
self.verbose = verbose;
self
}
/// Return `true` if the connection is pedantic.
pub fn is_pedantic(&self) -> bool {
self.pedantic
}
/// Turns on additional strict format checking, e.g. for properly formed subjects [default =
/// `false`]
pub fn pedantic(&mut self, pedantic: bool) -> &mut Self {
self.pedantic = pedantic;
self
}
/// Return `true` if the connection requires TLS.
pub fn is_tls_required(&self) -> bool {
self.tls_required
}
/// Indicates whether the client requires an SSL connection. [default = `false`]
pub fn tls_required(&mut self, tls_required: bool) -> &mut Self {
self.tls_required = tls_required;
self
}
/// Get the [`Authorization`](enum.Authorization.html)
pub fn authorization(&self) -> Option<&Authorization> {
self.authorization.as_ref()
}
/// Set the authorization to use a token
pub fn token(&mut self, token: String) -> &mut Self {
self.set_authorization(Some(Authorization::token(token)))
}
/// Set the authorization to use a username and password
pub fn username_password(&mut self, username: String, password: String) -> &mut Self {
self.set_authorization(Some(Authorization::username_password(username, password)))
}
/// Set the authorization
pub fn set_authorization(&mut self, authorization: Option<Authorization>) -> &mut Self {
self.authorization = authorization;
self
}
/// Remove the authorization
pub fn clear_authorization(&mut self) -> &mut Self {
self.set_authorization(None)
}
/// Get the optional name of the client.
pub fn get_name(&self) -> Option<&str> {
self.name.as_deref()
}
/// Set the optional client name. [default = `None`]
pub fn name(&mut self, name: String) -> &mut Self {
self.name = Some(name);
self
}
/// Remove the optional client name [default = `None`]
pub fn clear_name(&mut self) -> &mut Self {
self.name = None;
self
}
/// The implementation language of the client. [always = `"rust"`]
pub fn get_lang(&self) -> &str {
&self.language
}
/// The version of the client. [always = `"<the crate version>"`]
pub fn get_version(&self) -> &str {
&self.version
}
/// Optional int. Sending 0 (or absent) indicates client supports original protocol. Sending 1
/// indicates that the client supports dynamic reconfiguration of cluster topology changes by
/// asynchronously receiving INFO messages with known servers it can reconnect to. [always =
/// `1`]
pub fn get_protocol(&self) -> i32 {
self.protocol
}
/// Return `true` if echo is enabled on the connection.
pub fn is_echo(&self) -> bool {
self.echo
}
/// Optional boolean. If set to true, the server (version 1.2.0+) will send originating
/// messages from this connection to its own subscriptions. Clients should set this to true
/// only for server supporting this feature, which is when proto in the INFO protocol is set to
/// at least 1 [default = `false`]
pub fn echo(&mut self, echo: bool) -> &mut Self {
self.echo = echo;
self
}
}
impl Default for Connect {
fn default() -> Self {
Self {
verbose: false,
pedantic: false,
tls_required: false,
authorization: None,
name: None,
language: String::from("rust"),
version: String::from(util::CLIENT_VERSION),
protocol: 1,
echo: false,
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// The [`-ERR`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#okerr) messages sent from the server
#[derive(Clone, Debug, PartialEq)]
pub enum ProtocolError {
/// Unknown protocol error
UnknownProtocolOperation,
/// Client attempted to connect to a route port instead of the client port
AttemptedToConnectToRoutePort,
/// Client failed to authenticate to the server with credentials specified in the CONNECT
/// message
AuthorizationViolation,
/// Client took too long to authenticate to the server after establishing a connection
/// (default 1 second)
AuthorizationTimeout,
/// Client specified an invalid protocol version in the CONNECT message
InvalidClientProtocol,
/// Message destination subject and reply subject length exceeded the maximum control line
/// value specified by the max_control_line server option. The default is 1024 bytes.
MaximumControlLineExceeded,
/// Cannot parse the protocol message sent by the client
ParserError,
/// The server requires TLS and the client does not have TLS enabled.
SecureConnectionTlsRequired,
/// The server hasn't received a message from the client, including a PONG in too long.
StaleConnection,
/// This error is sent by the server when creating a new connection and the server has exceeded
/// the maximum number of connections specified by the max_connections server option. The
/// default is 64k.
MaximumConnectionsExceeded,
/// The server pending data size for the connection has reached the maximum size (default 10MB).
SlowConsumer,
/// Client attempted to publish a message with a payload size that exceeds the max_payload size
/// configured on the server. This value is supplied to the client upon connection in the
/// initial INFO message. The client is expected to do proper accounting of byte size to be
/// sent to the server in order to handle this error synchronously.
MaximumPayloadViolation,
/// Client sent a malformed subject (e.g. sub foo. 90)
InvalidSubject,
/// The user specified in the CONNECT message does not have permission to subscribe to the
/// subject.
PermissionsViolationForSubscription(Subject),
/// The user specified in the CONNECT message does not have permissions to publish to the
/// subject.
PermissionsViolationForPublish(Subject),
}
impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProtocolError::UnknownProtocolOperation => {
write!(f, "{}", util::UNKNOWN_PROTOCOL_OPERATION)?
}
ProtocolError::AttemptedToConnectToRoutePort => {
write!(f, "{}", util::ATTEMPTED_TO_CONNECT_TO_ROUTE_PORT)?
}
ProtocolError::AuthorizationViolation => {
write!(f, "{}", util::AUTHORIZATION_VIOLATION)?
}
ProtocolError::AuthorizationTimeout => write!(f, "{}", util::AUTHORIZATION_TIMEOUT)?,
ProtocolError::InvalidClientProtocol => write!(f, "{}", util::INVALID_CLIENT_PROTOCOL)?,
ProtocolError::MaximumControlLineExceeded => {
write!(f, "{}", util::MAXIMUM_CONTROL_LINE_EXCEEDED)?
}
ProtocolError::ParserError => write!(f, "{}", util::PARSER_ERROR)?,
ProtocolError::SecureConnectionTlsRequired => {
write!(f, "{}", util::SECURE_CONNECTION_TLS_REQUIRED)?
}
ProtocolError::StaleConnection => write!(f, "{}", util::STALE_CONNECTION)?,
ProtocolError::MaximumConnectionsExceeded => {
write!(f, "{}", util::MAXIMUM_CONNECTIONS_EXCEEDED)?
}
ProtocolError::SlowConsumer => write!(f, "{}", util::SLOW_CONSUMER)?,
ProtocolError::MaximumPayloadViolation => {
write!(f, "{}", util::MAXIMUM_PAYLOAD_VIOLATION)?
}
ProtocolError::InvalidSubject => write!(f, "{}", util::INVALID_SUBJECT)?,
ProtocolError::PermissionsViolationForSubscription(subject) => write!(
f,
"{} {}",
util::PERMISSIONS_VIOLATION_FOR_SUBSCRIPTION,
subject
)?,
ProtocolError::PermissionsViolationForPublish(subject) => {
write!(f, "{} {}", util::PERMISSIONS_VIOLATION_FOR_PUBLISH, subject)?
}
}
Ok(())
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// A [subject](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#protocol-conventions) to publish or subscribe to
///
/// `Subject`s can be created by parsing a String or via a SubjectBuilder
///
/// # Example
/// ```
/// use rants::{ Subject, SubjectBuilder };
///
/// let subject = "foo.bar.*.>".parse::<Subject>();
/// assert!(subject.is_ok());
///
/// let subject = SubjectBuilder::new()
/// .add("foo")
/// .add("bar")
/// .add_wildcard()
/// .build_full_wildcard();
///
/// assert_eq!(format!("{}", subject), "foo.bar.*.>");
/// ```
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Subject {
tokens: Vec<String>,
full_wildcard: bool,
}
impl fmt::Display for Subject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Is the whole subject a full wildcard
if self.tokens.is_empty() {
write!(f, ">")?;
return Ok(());
}
write!(f, "{}", self.tokens.join("."))?;
if self.full_wildcard {
write!(f, ".>")?;
}
Ok(())
}
}
#[derive(Default)]
pub struct SubjectBuilder {
tokens: Vec<String>,
}
impl SubjectBuilder {
/// Create a new `SubjectBuilder`
pub fn new() -> Self {
Self::default()
}
/// Add a component to the subject
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, subject: impl Into<String>) -> Self {
// Need to add some checks here to check for illegal characters
self.tokens.push(subject.into());
self
}
/// Add a wildcard to the subject
pub fn add_wildcard(mut self) -> Self {
self.tokens.push("*".to_string());
self
}
/// Generate the subject
pub fn build(self) -> Subject {
let fwc = self.tokens.is_empty();
Subject {
tokens: self.tokens,
full_wildcard: fwc,
}
}
/// Generate the subject with a full wildcard ending
pub fn build_full_wildcard(self) -> Subject {
Subject {
tokens: self.tokens,
full_wildcard: true,
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// The [`MSG`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#msg) message sent by the server
#[derive(Debug, PartialEq)]
pub struct Msg {
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
payload: Vec<u8>,
}
impl Msg {
pub(crate) fn new(
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
payload: Vec<u8>,
) -> Self {
Self {
subject,
sid,
reply_to,
payload,
}
}
/// Get the [`Subject`](struct.Subject.html)
pub fn subject(&self) -> &Subject {
&self.subject
}
/// Get the subscription id
pub fn sid(&self) -> Sid {
self.sid
}
/// Get the optional reply to [`Subject`](struct.Subject.html)
pub fn reply_to(&self) -> Option<&Subject> {
self.reply_to.as_ref()
}
/// Get the payload
pub fn payload(&self) -> &[u8] {
&self.payload
}
/// Take ownership of the payload
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// The type used for subscription IDs
///
/// This is a unique identifier the client uses when routing messages from the server. A
/// subscription ID can be any ASCII string, but within this client library, we always use
/// the string representation of an atomically increasing `u64` counter.
pub type Sid = u64;
static SID: AtomicU64 = AtomicU64::new(0);
/// A subscription to receive messages from a particular [`Subject`](struct.Subject.html)
pub struct Subscription {
subject: Subject,
sid: Sid,
queue_group: Option<String>,
pub(crate) unsubscribe_after: Option<u64>,
pub(crate) tx: MpscSender<Msg>,
}
impl Subscription {
pub(crate) fn new(subject: Subject, queue_group: Option<String>, tx: MpscSender<Msg>) -> Self {
Self {
subject,
sid: SID.fetch_add(1, Ordering::Relaxed),
queue_group,
unsubscribe_after: None,
tx,
}
}
/// The [`Subject`](struct.Subject.html) of the subscription
pub fn subject(&self) -> &Subject {
&self.subject
}
/// The unique subscription ID
pub fn sid(&self) -> Sid {
self.sid
}
/// The optional queue group of the subscription
pub fn queue_group(&self) -> Option<&str> {
self.queue_group.as_ref().map(String::as_ref)
}
/// If this is of type `Some`, it means the subscription will automatically unsubscribe
/// after receiving the indicated number of messages
pub fn unsubscribe_after(&self) -> Option<u64> {
self.unsubscribe_after
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// Representation of all possible server control lines. A control line is the first line of a
/// message
#[derive(Debug, PartialEq)]
pub enum ServerControl {
Info(Info),
Msg {
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
len: u64,
},
Ping,
Pong,
Ok,
Err(ProtocolError),
}
/// Representation of all possible server messages. This is similar to `ServerControl` however it
/// contains a full message type
#[derive(Debug, PartialEq)]
pub enum ServerMessage {
Info(Info),
Msg(Msg),
Ping,
Pong,
Ok,
Err(ProtocolError),
}
impl From<ServerControl> for ServerMessage {
fn from(control: ServerControl) -> Self {
match control {
ServerControl::Info(info) => ServerMessage::Info(info),
// We should never try to directly convert a `ServerControl::Msg` to
// `ServerMessage::Msg`. The reason is the `Msg` message has a payload and therefore
// requires further parsing.
ServerControl::Msg { .. } => unreachable!(),
ServerControl::Ping => ServerMessage::Ping,
ServerControl::Pong => ServerMessage::Pong,
ServerControl::Ok => ServerMessage::Ok,
ServerControl::Err(e) => ServerMessage::Err(e),
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
pub enum ClientControl<'a> {
Connect(&'a Connect),
Pub(&'a Subject, Option<&'a Subject>, usize),
Sub(&'a Subscription),
Unsub(Sid, Option<u64>),
Ping,
Pong,
}
impl fmt::Display for ClientControl<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Connect(connect) => write!(
f,
"{} {}{}",
util::CONNECT_OP_NAME,
serde_json::to_string(connect).expect("to serialize Connect"),
util::MESSAGE_TERMINATOR
),
Self::Pub(subject, reply_to, len) => {
if let Some(reply_to) = reply_to {
write!(
f,
"{} {} {} {}{}",
util::PUB_OP_NAME,
subject,
reply_to,
len,
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {} {}{}",
util::PUB_OP_NAME,
subject,
len,
util::MESSAGE_TERMINATOR
)
}
}
Self::Sub(subscription) => {
if let Some(queue_group) = &subscription.queue_group {
write!(
f,
"{} {} {} {}{}",
util::SUB_OP_NAME,
subscription.subject(),
queue_group,
subscription.sid(),
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {} {}{}",
util::SUB_OP_NAME,
subscription.subject(),
subscription.sid(),
util::MESSAGE_TERMINATOR
)
}
}
Self::Unsub(sid, max_msgs) => {
if let Some(max_msgs) = max_msgs {
write!(
f,
"{} {} {}{}",
util::UNSUB_OP_NAME,
sid,
max_msgs,
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {}{}",
util::UNSUB_OP_NAME,
sid,
util::MESSAGE_TERMINATOR
)
}
}
Self::Ping => write!(f, "{}{}", util::PING_OP_NAME, util::MESSAGE_TERMINATOR),
Self::Pong => write!(f, "{}{}", util::PONG_OP_NAME, util::MESSAGE_TERMINATOR),
}
}
}
impl ClientControl<'_> {
pub fn to_line(&self) -> String {
let s = self.to_string();
trace!("->> {:?}", s);
s
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////