use std::fmt;
use std::io;
use std::net::IpAddr;
use std::path::Path;
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use blocking::unblock;
use crossbeam_channel::{Receiver, Sender};
use crate::header::HeaderMap;
use crate::IntoServerList;
pub async fn connect<I>(nats_url: I) -> io::Result<Connection>
where
I: IntoServerList,
{
Options::new().connect(nats_url).await
}
#[derive(Clone, Debug)]
pub struct Connection {
inner: crate::Connection,
}
impl Connection {
fn new(inner: crate::Connection) -> Connection {
Self { inner }
}
pub async fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> {
self.publish_with_reply_or_headers(subject, None, None, msg)
.await
}
pub async fn publish_request(
&self,
subject: &str,
reply: &str,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) =
self.inner
.try_publish_with_reply_or_headers(subject, Some(reply), None, &msg)
{
return res;
}
let subject = subject.to_string();
let reply = reply.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || inner.publish_request(&subject, &reply, msg)).await
}
pub fn new_inbox(&self) -> String {
self.inner.new_inbox()
}
pub async fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<Message> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request(&subject, msg)).await?;
Ok(msg.into())
}
pub async fn request_timeout(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
timeout: Duration,
) -> io::Result<Message> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request_timeout(&subject, msg, timeout)).await?;
Ok(msg.into())
}
pub async fn request_multi(
&self,
subject: &str,
msg: impl AsRef<[u8]>,
) -> io::Result<Subscription> {
let subject = subject.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let sub = unblock(move || inner.request_multi(&subject, msg)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner: sub,
_closer_tx,
closer_rx,
})
}
pub async fn subscribe(&self, subject: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
let inner = self.inner.clone();
let inner = unblock(move || inner.subscribe(&subject)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
_closer_tx,
closer_rx,
})
}
pub async fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
let queue = queue.to_string();
let inner = self.inner.clone();
let inner = unblock(move || inner.queue_subscribe(&subject, &queue)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
_closer_tx,
closer_rx,
})
}
pub async fn flush(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.flush()).await
}
pub async fn flush_timeout(&self, timeout: Duration) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.flush_timeout(timeout)).await
}
pub async fn rtt(&self) -> io::Result<Duration> {
let inner = self.inner.clone();
unblock(move || inner.rtt()).await
}
pub fn client_ip(&self) -> io::Result<IpAddr> {
self.inner.client_ip()
}
pub fn client_id(&self) -> u64 {
self.inner.client_id()
}
pub async fn drain(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.drain()).await
}
pub async fn close(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.close()).await;
Ok(())
}
pub async fn publish_with_reply_or_headers(
&self,
subject: &str,
reply: Option<&str>,
headers: Option<&HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) = self
.inner
.try_publish_with_reply_or_headers(subject, reply, headers, &msg)
{
return res;
}
let subject = subject.to_string();
let reply = reply.map(str::to_owned);
let headers = headers.map(HeaderMap::clone);
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || {
inner.publish_with_reply_or_headers(&subject, reply.as_deref(), headers.as_ref(), msg)
})
.await
}
}
#[derive(Debug)]
pub struct Subscription {
inner: crate::Subscription,
_closer_tx: Sender<()>,
closer_rx: Receiver<()>,
}
impl Subscription {
pub async fn next(&self) -> Option<Message> {
if let Some(msg) = self.inner.try_next() {
return Some(msg.into());
}
let inner = self.inner.clone();
let closer = self.closer_rx.clone();
let msg = unblock(move || {
crossbeam_channel::select! {
recv(closer) -> _ => None,
recv(inner.receiver()) -> msg => msg.ok(),
}
})
.await?;
Some(msg.into())
}
pub fn try_next(&self) -> Option<Message> {
self.inner.try_next().map(From::from)
}
pub async fn drain(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.drain()).await
}
pub async fn unsubscribe(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.unsubscribe()).await
}
}
#[derive(Clone)]
pub struct Message {
pub subject: String,
pub reply: Option<String>,
pub data: Vec<u8>,
pub headers: Option<HeaderMap>,
#[doc(hidden)]
pub client: Option<crate::client::Client>,
#[doc(hidden)]
pub double_acked: Arc<AtomicBool>,
}
impl From<crate::Message> for Message {
fn from(sync: crate::Message) -> Message {
Message {
subject: sync.subject,
reply: sync.reply,
data: sync.data,
headers: sync.headers,
client: sync.client,
double_acked: sync.double_acked,
}
}
}
impl Message {
pub fn new(
subject: &str,
reply: Option<&str>,
data: impl AsRef<[u8]>,
headers: Option<HeaderMap>,
) -> Message {
Message {
subject: subject.to_string(),
reply: reply.map(String::from),
data: data.as_ref().to_vec(),
headers,
..Default::default()
}
}
pub async fn respond(&self, msg: impl AsRef<[u8]>) -> io::Result<()> {
let reply = self.reply.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "No reply subject to reply to")
})?;
let client = self.client.as_ref().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotConnected,
crate::message::MESSAGE_NOT_BOUND,
)
})?;
if let Some(res) = client.try_publish(reply.as_str(), None, None, msg.as_ref()) {
return res;
}
let client = client.clone();
let reply = reply.to_owned();
let msg = msg.as_ref().to_vec();
unblock(move || client.publish(&reply, None, None, msg.as_ref())).await
}
}
impl Default for Message {
fn default() -> Message {
Message {
subject: String::from(""),
reply: None,
data: Vec::new(),
headers: None,
client: None,
double_acked: Arc::new(AtomicBool::new(false)),
}
}
}
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Message")
.field("subject", &self.subject)
.field("headers", &self.headers)
.field("reply", &self.reply)
.field("length", &self.data.len())
.finish()
}
}
#[derive(Debug, Default)]
pub struct Options {
inner: crate::Options,
}
impl Options {
pub fn new() -> Options {
Options {
inner: crate::Options::new(),
}
}
pub fn with_token(token: &str) -> Options {
Options {
inner: crate::Options::with_token(token),
}
}
pub fn with_user_pass(user: &str, password: &str) -> Options {
Options {
inner: crate::Options::with_user_pass(user, password),
}
}
pub fn with_credentials(path: impl AsRef<Path>) -> Options {
Options {
inner: crate::Options::with_credentials(path),
}
}
pub fn with_jwt<J, S>(jwt_cb: J, sig_cb: S) -> Options
where
J: Fn() -> io::Result<String> + Send + Sync + 'static,
S: Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
{
Options {
inner: crate::Options::with_jwt(jwt_cb, sig_cb),
}
}
pub fn with_nkey<F>(nkey: &str, sig_cb: F) -> Options
where
F: Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
{
Options {
inner: crate::Options::with_nkey(nkey, sig_cb),
}
}
pub fn client_cert(self, cert: impl AsRef<Path>, key: impl AsRef<Path>) -> Options {
Options {
inner: self.inner.client_cert(cert, key),
}
}
pub fn with_name(self, name: &str) -> Options {
Options {
inner: self.inner.with_name(name),
}
}
pub fn no_echo(self) -> Options {
Options {
inner: self.inner.no_echo(),
}
}
pub fn retry_on_failed_connect(self) -> Options {
Options {
inner: self.inner.retry_on_failed_connect(),
}
}
pub fn max_reconnects<T: Into<Option<usize>>>(self, max_reconnects: T) -> Options {
Options {
inner: self.inner.max_reconnects(max_reconnects),
}
}
pub fn reconnect_buffer_size(self, reconnect_buffer_size: usize) -> Options {
Options {
inner: self.inner.reconnect_buffer_size(reconnect_buffer_size),
}
}
pub async fn connect<I>(self, nats_url: I) -> io::Result<Connection>
where
I: IntoServerList,
{
let servers = nats_url.into_server_list()?;
let conn = unblock(move || self.inner.connect(servers)).await?;
Ok(Connection::new(conn))
}
pub fn disconnect_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.disconnect_callback(cb),
}
}
pub fn reconnect_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.reconnect_callback(cb),
}
}
pub fn close_callback<F>(self, cb: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Options {
inner: self.inner.close_callback(cb),
}
}
pub fn reconnect_delay_callback<F>(self, cb: F) -> Self
where
F: Fn(usize) -> Duration + Send + Sync + 'static,
{
Options {
inner: self.inner.reconnect_delay_callback(cb),
}
}
pub fn tls_required(self, tls_required: bool) -> Options {
Options {
inner: self.inner.tls_required(tls_required),
}
}
pub fn add_root_certificate(self, path: impl AsRef<Path>) -> Options {
Options {
inner: self.inner.add_root_certificate(path),
}
}
}