#![allow(dead_code)]
#[macro_use]
extern crate derive_builder;
#[macro_use]
extern crate log;
use crate::protocol::ServerInfo;
use crate::subs::SubscriptionManager;
use crate::tcp::TcpClient;
use crossbeam_channel::{self as channel, bounded};
use nats_types::{DeliveredMessage, PublishMessage};
use regex::Captures;
use regex::Regex;
use std::{sync::Arc, thread, time::Duration};
pub type Result<T> = std::result::Result<T, crate::error::Error>;
pub use nats_types::DeliveredMessage as Message;
#[derive(Clone, Debug, PartialEq)]
pub enum AuthenticationStyle {
UserCredentials(String, String),
Token(String),
Basic { username: String, password: String },
Anonymous,
}
impl AuthenticationStyle {
pub fn from_credsfile(file: &str) -> Result<AuthenticationStyle> {
let raw = std::fs::read_to_string(file)?;
Self::from_credsfile_str(&raw)
}
fn from_credsfile_str(raw: &str) -> Result<AuthenticationStyle> {
let re = Regex::new(CREDSFILE_FORMAT)?;
let caps: Vec<Captures> = re.captures_iter(&raw).collect();
let uc =
AuthenticationStyle::UserCredentials(caps[1][0].to_string(), caps[1][1].to_string());
Ok(uc)
}
}
const CREDSFILE_FORMAT: &str =
r#"\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))"#;
type MessageHandler = Arc<dyn Fn(&Message) -> Result<()> + Sync + Send>;
#[derive(Debug, Clone, Builder, PartialEq)]
#[builder(setter(into), default)]
pub struct ClientOptions {
cluster_uris: Vec<String>,
authentication: AuthenticationStyle,
connect_timeout: Duration,
reconnect_attempts: u8,
}
impl Default for ClientOptions {
fn default() -> Self {
ClientOptions {
cluster_uris: Vec::new(),
authentication: AuthenticationStyle::Anonymous,
connect_timeout: Duration::from_millis(100),
reconnect_attempts: 3,
}
}
}
impl ClientOptions {
pub fn builder() -> ClientOptionsBuilder {
ClientOptionsBuilder::default()
}
}
#[derive(Clone)]
pub struct Client {
id: String,
inbox_wildcard: String,
opts: ClientOptions,
servers: Vec<ServerInfo>,
server_index: usize,
submgr: SubscriptionManager,
delivery_sender: channel::Sender<DeliveredMessage>,
delivery_receiver: channel::Receiver<DeliveredMessage>,
write_sender: channel::Sender<Vec<u8>>,
write_receiver: channel::Receiver<Vec<u8>>,
}
impl Client {
pub fn from_options(opts: ClientOptions) -> Result<Client> {
let uris = opts.cluster_uris.clone();
let (ds, dr) = channel::unbounded();
let (ws, wr) = channel::unbounded();
let mut nuid = nuid::NUID::new();
nuid.randomize_prefix();
let id = nuid.next();
Ok(Client {
inbox_wildcard: format!("_INBOX.{}.*", &id),
id: id.clone(),
opts,
servers: protocol::parse_server_uris(&uris)?,
submgr: SubscriptionManager::new(id, ws.clone()),
server_index: 0,
delivery_sender: ds,
delivery_receiver: dr,
write_sender: ws,
write_receiver: wr,
})
}
pub fn new(url: &str) -> Result<Client> {
let opts = ClientOptions::builder()
.cluster_uris(vec![url.into()])
.build()
.unwrap();
Self::from_options(opts)
}
pub fn subscribe<F>(&self, subject: &str, handler: F) -> Result<()>
where
F: Fn(&Message) -> Result<()> + Sync + Send,
F: 'static,
{
self.raw_subscribe(subject, None, Arc::new(handler))
}
pub fn queue_subscribe<F>(&self, subject: &str, queue_group: &str, handler: F) -> Result<()>
where
F: Fn(&Message) -> Result<()> + Sync + Send,
F: 'static,
{
self.raw_subscribe(subject, Some(queue_group), Arc::new(handler))
}
pub fn request<T>(
&self,
subject: T,
payload: &[u8],
timeout: std::time::Duration,
) -> Result<Message>
where
T: AsRef<str>,
{
let (sender, receiver) = channel::bounded(1);
let inbox = self.submgr.add_new_inbox_sub(sender)?;
self.publish(subject.as_ref(), payload, Some(&inbox))?;
match receiver.recv_timeout(timeout) {
Ok(msg) => Ok(msg.clone()),
Err(e) => Err(err!(Timeout, "Request timeout expired: {}", e)),
}
}
pub fn unsubscribe(&self, subject: impl AsRef<str>) -> Result<()> {
let s = subject.as_ref();
self.submgr.unsubscribe_by_subject(s)
}
pub fn publish(&self, subject: &str, payload: &[u8], reply_to: Option<&str>) -> Result<()> {
let vec = PublishMessage::as_vec(subject, reply_to, payload)?;
match self.write_sender.send(vec) {
Ok(_) => Ok(()),
Err(e) => Err(err!(ConcurrencyFailure, "Concurrency failure: {}", e)),
}
}
pub fn connect(&self) -> Result<()> {
let (s, r) = bounded(1);
let tcp_client = TcpClient::new(
self.servers.clone(),
self.delivery_sender.clone(),
self.write_sender.clone(),
self.write_receiver.clone(),
self.opts.clone(),
s,
);
tcp_client.connect()?;
info!("TCP connection established.");
if r.recv_timeout(self.opts.connect_timeout).is_err() {
error!("Failed to establish NATS connection within timeout");
return Err(err!(
Timeout,
"Failed to establish connection without timeout"
));
};
let mgr = self.submgr.clone();
self.submgr.add_sub(
&self.inbox_wildcard,
None,
Arc::new(move |msg| {
if let Some(sender) = mgr.sender_for_inbox(&msg.subject) {
sender.send(msg.clone()).unwrap();
mgr.remove_inbox(&msg.subject)
}
Ok(())
}),
)?;
self.start_subscription_dispatcher(self.delivery_receiver.clone())
}
fn start_subscription_dispatcher(
&self,
receiver: channel::Receiver<DeliveredMessage>,
) -> Result<()> {
let c = self.clone();
thread::spawn(move || {
loop {
match receiver.recv() {
Ok(msg) => {
let handler = c.get_handler(msg.subscription_id as usize);
(handler)(&msg).unwrap();
}
Err(e) => {
error!("Failed to receive message: {}", e);
}
}
}
});
Ok(())
}
fn get_handler(&self, sid: usize) -> MessageHandler {
self.submgr.handler_for_sid(sid).unwrap()
}
fn raw_subscribe(
&self,
subject: &str,
queue_group: Option<&str>,
handler: MessageHandler,
) -> Result<()> {
match self.submgr.add_sub(subject, queue_group, handler) {
Ok(_) => Ok(()),
Err(e) => Err(err!(SubscriptionFailure, "Subscription failure: {}", e)),
}
}
}
impl Default for Client {
fn default() -> Client {
Client::from_options(ClientOptions::default()).unwrap()
}
}
#[macro_use]
pub mod error;
mod protocol;
mod subs;
mod tcp;
#[cfg(test)]
mod tests {
use super::AuthenticationStyle;
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
#[test]
fn credsfile_parses() {
let credsfile = r#"-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJBNDNRN1NLT0tCT0tYUDc1WVhMWjcyVDZKNDVIVzJKR0ZRWUJFQ1I2VE1FWEZFN1RKSjVBIiwiaWF0IjoxNTU0ODk2OTQ1LCJpc3MiOiJBQU9KV0RRV1pPQkNFTUVWWUQ2VEhPTUVCSExYS0NBMzZGU0dJVUxINFBWRU1ORDVUMjNEUEM0VSIsIm5hbWUiOiJiZW1pc191c2VyIiwic3ViIjoiVUNMRkYzTFBLTTQ3WTZGNkQ3UExMVzU2MzZKU1JDUFhFUUFDTEdVWTZNT01BS1lXMkk2VUFFRUQiLCJ0eXBlIjoidXNlciIsIm5hdHMiOnsicHViIjp7fSwic3ViIjp7fX19.3aH-hCSTS8z8rg2km7Q_aat5VpwT-t9swSmh3bnVBY_9IV9wE9mjSOUgHE2sq-7pR4HTCpYa0RPrNcgNfaVuBg
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUACGBWJZLVP4CHF7WTY65KT3I4QHAQ5DEZMFAJKTIUIRQPXE6DVMFQEUU
------END USER NKEY SEED------
*************************************************************"#;
let authstyle = AuthenticationStyle::from_credsfile_str(credsfile);
assert!(authstyle.is_ok());
if let AuthenticationStyle::UserCredentials(_jwt, seed) = authstyle.unwrap() {
assert_eq!(
"SUACGBWJZLVP4CHF7WTY65KT3I4QHAQ5DEZMFAJKTIUIRQPXE6DVMFQEUU",
seed
);
}
}
}