mod address;
pub mod error;
mod parser;
mod refs;
mod state;
#[cfg(test)]
mod tests;
use log::trace;
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
fmt,
str::FromStr,
sync::atomic::{AtomicU64, Ordering},
};
use tokio::sync::mpsc::Sender as MpscSender;
use crate::util;
pub use self::{
address::Address,
refs::{ClientRef, ClientRefMut, StableMutexGuard},
state::{ClientState, ConnectionState, StateTransition, StateTransitionResult},
};
#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
pub struct Info {
pub(crate) server_id: String,
pub(crate) version: String,
pub(crate) go: String,
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) max_payload: usize,
#[serde(default)]
pub(crate) proto: i32,
pub(crate) client_id: Option<u64>,
#[serde(default)]
pub(crate) auth_required: bool,
#[serde(default)]
pub(crate) tls_required: bool,
#[serde(default)]
pub(crate) tls_verify: bool,
#[serde(default)]
pub(crate) connect_urls: Vec<Address>,
}
impl Info {
pub(crate) fn new() -> Self {
Self::default()
}
pub fn server_id(&self) -> &str {
&self.server_id
}
pub fn version(&self) -> &str {
&self.version
}
pub fn go(&self) -> &str {
&self.go
}
pub fn host(&self) -> &str {
&self.host
}
pub fn port(&self) -> u16 {
self.port
}
pub fn max_payload(&self) -> usize {
self.max_payload
}
pub fn proto(&self) -> i32 {
self.proto
}
pub fn client_id(&self) -> Option<u64> {
self.client_id
}
pub fn auth_required(&self) -> bool {
self.auth_required
}
pub fn tls_required(&self) -> bool {
self.tls_required
}
pub fn tls_verify(&self) -> bool {
self.tls_verify
}
pub fn connect_urls(&self) -> &[Address] {
&self.connect_urls
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(untagged)]
pub enum Authorization {
Token {
#[serde(rename = "auth_token")]
token: String,
},
UsernamePassword {
#[serde(rename = "user")]
username: String,
#[serde(rename = "pass")]
password: String,
},
}
impl Authorization {
pub fn token(token: String) -> Self {
Authorization::Token { token }
}
pub fn username_password(username: String, password: String) -> Self {
Authorization::UsernamePassword { username, password }
}
}
impl FromStr for Authorization {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match util::split_after(s, util::USERNAME_PASSWORD_SEPARATOR) {
(token, None) => Ok(Authorization::token(String::from(token))),
(username, Some(password)) => Ok(Authorization::username_password(
String::from(username),
String::from(password),
)),
}
}
}
impl fmt::Display for Authorization {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
Authorization::Token { token } => write!(f, "{}", token)?,
Authorization::UsernamePassword { username, password } => {
write!(f, "{}:{}", username, password)?
}
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Connect {
verbose: bool,
pedantic: bool,
tls_required: bool,
#[serde(flatten)]
authorization: Option<Authorization>,
name: Option<String>,
#[serde(rename = "lang")]
language: String,
version: String,
protocol: i32,
echo: bool,
}
impl Connect {
pub fn new() -> Self {
Self::default()
}
pub fn is_verbose(&self) -> bool {
self.verbose
}
pub fn verbose(&mut self, verbose: bool) -> &mut Self {
self.verbose = verbose;
self
}
pub fn is_pedantic(&self) -> bool {
self.pedantic
}
pub fn pedantic(&mut self, pedantic: bool) -> &mut Self {
self.pedantic = pedantic;
self
}
pub fn is_tls_required(&self) -> bool {
self.tls_required
}
pub fn tls_required(&mut self, tls_required: bool) -> &mut Self {
self.tls_required = tls_required;
self
}
pub fn authorization(&self) -> Option<&Authorization> {
self.authorization.as_ref()
}
pub fn token(&mut self, token: String) -> &mut Self {
self.set_authorization(Some(Authorization::token(token)))
}
pub fn username_password(&mut self, username: String, password: String) -> &mut Self {
self.set_authorization(Some(Authorization::username_password(username, password)))
}
pub fn set_authorization(&mut self, authorization: Option<Authorization>) -> &mut Self {
self.authorization = authorization;
self
}
pub fn clear_authorization(&mut self) -> &mut Self {
self.set_authorization(None)
}
pub fn get_name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn name(&mut self, name: String) -> &mut Self {
self.name = Some(name);
self
}
pub fn clear_name(&mut self) -> &mut Self {
self.name = None;
self
}
pub fn get_lang(&self) -> &str {
&self.language
}
pub fn get_version(&self) -> &str {
&self.version
}
pub fn get_protocol(&self) -> i32 {
self.protocol
}
pub fn is_echo(&self) -> bool {
self.echo
}
pub fn echo(&mut self, echo: bool) -> &mut Self {
self.echo = echo;
self
}
}
impl Default for Connect {
fn default() -> Self {
Self {
verbose: false,
pedantic: false,
tls_required: false,
authorization: None,
name: None,
language: String::from("rust"),
version: String::from(util::CLIENT_VERSION),
protocol: 1,
echo: false,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum ProtocolError {
UnknownProtocolOperation,
AttemptedToConnectToRoutePort,
AuthorizationViolation,
AuthorizationTimeout,
InvalidClientProtocol,
MaximumControlLineExceeded,
ParserError,
SecureConnectionTlsRequired,
StaleConnection,
MaximumConnectionsExceeded,
SlowConsumer,
MaximumPayloadViolation,
InvalidSubject,
PermissionsViolationForSubscription(Subject),
PermissionsViolationForPublish(Subject),
}
impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProtocolError::UnknownProtocolOperation => {
write!(f, "{}", util::UNKNOWN_PROTOCOL_OPERATION)?
}
ProtocolError::AttemptedToConnectToRoutePort => {
write!(f, "{}", util::ATTEMPTED_TO_CONNECT_TO_ROUTE_PORT)?
}
ProtocolError::AuthorizationViolation => {
write!(f, "{}", util::AUTHORIZATION_VIOLATION)?
}
ProtocolError::AuthorizationTimeout => write!(f, "{}", util::AUTHORIZATION_TIMEOUT)?,
ProtocolError::InvalidClientProtocol => write!(f, "{}", util::INVALID_CLIENT_PROTOCOL)?,
ProtocolError::MaximumControlLineExceeded => {
write!(f, "{}", util::MAXIMUM_CONTROL_LINE_EXCEEDED)?
}
ProtocolError::ParserError => write!(f, "{}", util::PARSER_ERROR)?,
ProtocolError::SecureConnectionTlsRequired => {
write!(f, "{}", util::SECURE_CONNECTION_TLS_REQUIRED)?
}
ProtocolError::StaleConnection => write!(f, "{}", util::STALE_CONNECTION)?,
ProtocolError::MaximumConnectionsExceeded => {
write!(f, "{}", util::MAXIMUM_CONNECTIONS_EXCEEDED)?
}
ProtocolError::SlowConsumer => write!(f, "{}", util::SLOW_CONSUMER)?,
ProtocolError::MaximumPayloadViolation => {
write!(f, "{}", util::MAXIMUM_PAYLOAD_VIOLATION)?
}
ProtocolError::InvalidSubject => write!(f, "{}", util::INVALID_SUBJECT)?,
ProtocolError::PermissionsViolationForSubscription(subject) => write!(
f,
"{} {}",
util::PERMISSIONS_VIOLATION_FOR_SUBSCRIPTION,
subject
)?,
ProtocolError::PermissionsViolationForPublish(subject) => {
write!(f, "{} {}", util::PERMISSIONS_VIOLATION_FOR_PUBLISH, subject)?
}
}
Ok(())
}
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Subject {
tokens: Vec<String>,
full_wildcard: bool,
}
impl fmt::Display for Subject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.tokens.is_empty() {
write!(f, ">")?;
return Ok(());
}
write!(f, "{}", self.tokens.join("."))?;
if self.full_wildcard {
write!(f, ".>")?;
}
Ok(())
}
}
#[derive(Default)]
pub struct SubjectBuilder {
tokens: Vec<String>,
}
impl SubjectBuilder {
pub fn new() -> Self {
Self::default()
}
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, subject: impl Into<String>) -> Self {
self.tokens.push(subject.into());
self
}
pub fn add_wildcard(mut self) -> Self {
self.tokens.push("*".to_string());
self
}
pub fn build(self) -> Subject {
let fwc = self.tokens.is_empty();
Subject {
tokens: self.tokens,
full_wildcard: fwc,
}
}
pub fn build_full_wildcard(self) -> Subject {
Subject {
tokens: self.tokens,
full_wildcard: true,
}
}
}
#[derive(Debug, PartialEq)]
pub struct Msg {
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
payload: Vec<u8>,
}
impl Msg {
pub(crate) fn new(
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
payload: Vec<u8>,
) -> Self {
Self {
subject,
sid,
reply_to,
payload,
}
}
pub fn subject(&self) -> &Subject {
&self.subject
}
pub fn sid(&self) -> Sid {
self.sid
}
pub fn reply_to(&self) -> Option<&Subject> {
self.reply_to.as_ref()
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
}
pub type Sid = u64;
static SID: AtomicU64 = AtomicU64::new(0);
pub struct Subscription {
subject: Subject,
sid: Sid,
queue_group: Option<String>,
pub(crate) unsubscribe_after: Option<u64>,
pub(crate) tx: MpscSender<Msg>,
}
impl Subscription {
pub(crate) fn new(subject: Subject, queue_group: Option<String>, tx: MpscSender<Msg>) -> Self {
Self {
subject,
sid: SID.fetch_add(1, Ordering::Relaxed),
queue_group,
unsubscribe_after: None,
tx,
}
}
pub fn subject(&self) -> &Subject {
&self.subject
}
pub fn sid(&self) -> Sid {
self.sid
}
pub fn queue_group(&self) -> Option<&str> {
self.queue_group.as_ref().map(String::as_ref)
}
pub fn unsubscribe_after(&self) -> Option<u64> {
self.unsubscribe_after
}
}
#[derive(Debug, PartialEq)]
pub enum ServerControl {
Info(Info),
Msg {
subject: Subject,
sid: Sid,
reply_to: Option<Subject>,
len: u64,
},
Ping,
Pong,
Ok,
Err(ProtocolError),
}
#[derive(Debug, PartialEq)]
pub enum ServerMessage {
Info(Info),
Msg(Msg),
Ping,
Pong,
Ok,
Err(ProtocolError),
}
impl From<ServerControl> for ServerMessage {
fn from(control: ServerControl) -> Self {
match control {
ServerControl::Info(info) => ServerMessage::Info(info),
ServerControl::Msg { .. } => unreachable!(),
ServerControl::Ping => ServerMessage::Ping,
ServerControl::Pong => ServerMessage::Pong,
ServerControl::Ok => ServerMessage::Ok,
ServerControl::Err(e) => ServerMessage::Err(e),
}
}
}
pub enum ClientControl<'a> {
Connect(&'a Connect),
Pub(&'a Subject, Option<&'a Subject>, usize),
Sub(&'a Subscription),
Unsub(Sid, Option<u64>),
Ping,
Pong,
}
impl fmt::Display for ClientControl<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Connect(connect) => write!(
f,
"{} {}{}",
util::CONNECT_OP_NAME,
serde_json::to_string(connect).expect("to serialize Connect"),
util::MESSAGE_TERMINATOR
),
Self::Pub(subject, reply_to, len) => {
if let Some(reply_to) = reply_to {
write!(
f,
"{} {} {} {}{}",
util::PUB_OP_NAME,
subject,
reply_to,
len,
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {} {}{}",
util::PUB_OP_NAME,
subject,
len,
util::MESSAGE_TERMINATOR
)
}
}
Self::Sub(subscription) => {
if let Some(queue_group) = &subscription.queue_group {
write!(
f,
"{} {} {} {}{}",
util::SUB_OP_NAME,
subscription.subject(),
queue_group,
subscription.sid(),
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {} {}{}",
util::SUB_OP_NAME,
subscription.subject(),
subscription.sid(),
util::MESSAGE_TERMINATOR
)
}
}
Self::Unsub(sid, max_msgs) => {
if let Some(max_msgs) = max_msgs {
write!(
f,
"{} {} {}{}",
util::UNSUB_OP_NAME,
sid,
max_msgs,
util::MESSAGE_TERMINATOR
)
} else {
write!(
f,
"{} {}{}",
util::UNSUB_OP_NAME,
sid,
util::MESSAGE_TERMINATOR
)
}
}
Self::Ping => write!(f, "{}{}", util::PING_OP_NAME, util::MESSAGE_TERMINATOR),
Self::Pong => write!(f, "{}{}", util::PONG_OP_NAME, util::MESSAGE_TERMINATOR),
}
}
}
impl ClientControl<'_> {
pub fn to_line(&self) -> String {
let s = self.to_string();
trace!("->> {:?}", s);
s
}
}