pub mod client;
mod client_inner;
mod converters;
use crate::net::nats_tcp_stream::NatsTcpStream;
use crate::ops::{ServerInfo, Op, Message, Subscribe};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::fmt::Debug;
use futures::stream::{ SplitSink};
use futures::lock::Mutex;
use nom::lib::std::collections::HashMap;
use tokio::sync::mpsc::UnboundedSender;
#[derive(Debug, Clone)]
pub struct NatsSid(pub(crate) String);
#[derive(Debug, Clone, Builder, PartialEq)]
#[builder(setter(into), default)]
pub struct NatsClientOptions {
pub username: String,
pub password: String,
pub auth_token: String,
pub tls_required: bool,
pub verbose: bool,
pub pedantic: bool,
pub echo: bool,
pub name: String,
pub cluster_uris: UriVec,
pub ping_interval: u16,
pub ping_max_out: u16,
pub subscribe_on_reconnect: bool,
pub ensure_connect: bool,
pub reconnect_timeout: u64,
pub user_jwt: Option<UserJWT>,
pub nkey: Option<String>,
}
impl Default for NatsClientOptions {
fn default() -> Self {
NatsClientOptions {
username: String::new(),
password: String::new(),
tls_required: false,
auth_token: String::new(),
verbose: true,
pedantic: false,
echo: true,
name: String::new(),
cluster_uris: UriVec(Vec::new()),
ping_interval: 5,
ping_max_out: 3,
subscribe_on_reconnect: true,
ensure_connect: true,
reconnect_timeout: 1000,
user_jwt: None,
nkey: None,
}
}
}
impl NatsClientOptions {
pub fn builder() -> NatsClientOptionsBuilder {
NatsClientOptionsBuilder::default()
}
}
#[derive(PartialEq, Clone, Debug)]
pub enum NatsClientState {
Connecting,
Connected,
Reconnecting,
Disconnected,
Shutdown,
}
pub(crate) type ReconnectHandler = Box<dyn Fn(&NatsClient) -> () + Send + Sync>;
pub use crate::ops::Message as NatsMessage;
pub struct NatsClient {
inner: Arc<NatsClientInner>,
reconnect_handlers: RwLock<Vec<ReconnectHandler>>,
}
#[derive(Debug)]
pub (crate) enum ClosableMessage {
Message(Message),
Close,
}
pub struct NatsClientInner {
conn_sink: Arc<Mutex<SplitSink<NatsTcpStream, Op>>>,
opts: NatsClientOptions,
server_info: RwLock<Option<ServerInfo>>,
subscriptions: Arc<Mutex<HashMap<String, (UnboundedSender<ClosableMessage>, Subscribe)>>>,
on_reconnect: tokio::sync::Mutex<Option<Pin<Box<dyn Future<Output=()> + Send + Sync>>>>,
state: RwLock<NatsClientState>,
last_ping: RwLock<u128>,
client_ref: RwLock<Option<Arc<NatsClient>>>,
reconnect_version: RwLock<u128>,
}
impl ::std::fmt::Debug for NatsClient {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("NatsClient")
.field("opts", &self.inner.opts)
.finish()
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct UriVec(Vec<String>);
impl From<String> for NatsClientOptions {
fn from(uri: String) -> Self {
NatsClientOptions{
cluster_uris: UriVec(vec![uri]),
..Default::default()
}
}
}
pub type SignerCallback =
Arc<dyn Fn(&[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> + Send + Sync>;
#[derive(Clone)]
pub struct UserJWT {
jwt: String,
signer: SignerCallback,
}
impl Debug for UserJWT {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "UserJWT {{ jwt: {}, signer: (func) }}", self.jwt)
}
}
impl PartialEq for UserJWT {
fn eq(&self, other: &UserJWT) -> bool {
self.jwt == other.jwt
}
}
impl UserJWT {
pub fn new(jwt: String, signer: SignerCallback) -> UserJWT {
UserJWT { jwt, signer }
}
}