use crate::error::RatsioError;
use crate::net::*;
use crate::ops::{Connect, Message, Op, ServerInfo, Subscribe};
use futures::{
prelude::*,
stream,
sync::mpsc::{self, UnboundedSender},
Future, Stream,
};
use parking_lot::RwLock;
use std::fmt::Debug;
use std::{collections::HashMap, sync::Arc};
type NatsSink = stream::SplitSink<NatsConnSinkStream>;
type NatsStream = stream::SplitStream<NatsConnSinkStream>;
mod client;
#[derive(Clone, Debug)]
pub struct NatsClientSender {
tx: UnboundedSender<Op>,
}
impl NatsClientSender {
fn new(sink: NatsSink) -> Self {
let (tx, rx) = mpsc::unbounded();
let rx = rx.map_err(|_| RatsioError::InnerBrokenChain);
let work = sink.send_all(rx).map(|_| ()).map_err(|_| ());
tokio::spawn(work);
NatsClientSender { tx }
}
pub fn send(&self, op: Op) -> impl Future<Item = (), Error = RatsioError> {
self.tx
.unbounded_send(op)
.map_err(|_| RatsioError::InnerBrokenChain)
.into_future()
}
}
#[derive(Debug, Clone)]
pub(crate) enum SinkMessage {
Message(Message),
CLOSE,
}
#[derive(Debug, Clone)]
pub(crate) struct SubscriptionSink {
cmd: Subscribe,
tx: mpsc::UnboundedSender<SinkMessage>,
max_count: Option<u32>,
count: u32,
}
#[derive(Debug)]
pub struct NatsClientMultiplexer {
control_tx: mpsc::UnboundedSender<Op>,
subs_map: Arc<RwLock<HashMap<String, SubscriptionSink>>>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct UriVec(Vec<String>);
impl From<Vec<&str>> for UriVec {
fn from(xs: Vec<&str>) -> Self {
UriVec(xs.into_iter().map(|x| x.into()).collect())
}
}
impl From<Vec<String>> for UriVec {
fn from(xs: Vec<String>) -> Self {
UriVec(xs)
}
}
impl From<String> for UriVec {
fn from(x: String) -> Self {
UriVec(vec![x])
}
}
impl From<&str> for UriVec {
fn from(x: &str) -> Self {
UriVec(vec![x.to_owned()])
}
}
pub type SignerCallback =
Arc<Fn(&[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> + Send + Sync>;
#[derive(Clone)]
pub struct UserJWT {
jwt: String,
signer: SignerCallback,
}
impl Debug for UserJWT {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "UserJWT {{ jwt: {}, signer: (func) }}", self.jwt)
}
}
impl PartialEq for UserJWT {
fn eq(&self, other: &UserJWT) -> bool {
self.jwt == other.jwt
}
}
impl UserJWT {
pub fn new(jwt: String, signer: SignerCallback) -> UserJWT {
UserJWT { jwt, signer }
}
}
#[derive(Debug, Clone, Builder, PartialEq)]
#[builder(setter(into), default)]
pub struct NatsClientOptions {
pub username: String,
pub password: String,
pub auth_token: String,
pub tls_required: bool,
pub verbose: bool,
pub pedantic: bool,
pub echo: bool,
pub name: String,
pub cluster_uris: UriVec,
pub ping_interval: u16,
pub ping_max_out: u16,
pub subscribe_on_reconnect: bool,
pub ensure_connect: bool,
pub reconnect_timeout: u64,
pub user_jwt: Option<UserJWT>,
}
impl Default for NatsClientOptions {
fn default() -> Self {
NatsClientOptions {
username: String::new(),
password: String::new(),
tls_required: false,
auth_token: String::new(),
verbose: true,
pedantic: false,
echo: true,
name: String::new(),
cluster_uris: UriVec(Vec::new()),
ping_interval: 5,
ping_max_out: 3,
subscribe_on_reconnect: true,
ensure_connect: true,
reconnect_timeout: 1000,
user_jwt: None,
}
}
}
impl NatsClientOptions {
pub fn builder() -> NatsClientOptionsBuilder {
NatsClientOptionsBuilder::default()
}
}
#[derive(PartialEq, Clone, Debug)]
pub enum NatsClientState {
Connecting,
Connected,
Reconnecting,
Disconnected,
}
type HandlerMap = HashMap<String, Box<Fn(Arc<NatsClient>) -> () + Send + Sync>>;
pub struct NatsClient {
connection: Arc<NatsConnection>,
opts: NatsClientOptions,
server_info: Arc<RwLock<Option<ServerInfo>>>,
unsub_receiver: Box<dyn Stream<Item = Op, Error = RatsioError> + Send + Sync>,
pub sender: Arc<RwLock<NatsClientSender>>,
pub receiver: Arc<RwLock<NatsClientMultiplexer>>,
control_tx: Arc<RwLock<UnboundedSender<Op>>>,
state: Arc<RwLock<NatsClientState>>,
reconnect_handlers: Arc<RwLock<HandlerMap>>,
}
impl ::std::fmt::Debug for NatsClient {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("NatsClient")
.field("opts", &self.opts)
.field("sender", &self.sender)
.field("receiver", &self.receiver)
.field("other_rx", &"Box<Stream>...")
.finish()
}
}
impl Stream for NatsClient {
type Error = RatsioError;
type Item = Op;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.unsub_receiver
.poll()
.map_err(|_| RatsioError::InnerBrokenChain)
}
}