use super::{block_on, mq_error};
use crate::RuntimeError;
use crate::runtime;
use bytes::Bytes;
use std::time::Duration;
pub struct Message {
inner: async_nats::Message,
}
impl Message {
pub fn payload(&self) -> &[u8] {
&self.inner.payload
}
pub fn subject(&self) -> &str {
self.inner.subject.as_str()
}
}
pub struct Subscription {
inner: async_nats::Subscriber,
}
impl Subscription {
pub fn next_timeout(&mut self, timeout: Duration) -> Result<Message, RuntimeError> {
runtime::check_cancel()?;
let msg = block_on(async {
tokio::time::timeout(timeout, {
use futures_util::StreamExt;
self.inner.next()
})
.await
});
runtime::check_cancel()?;
match msg {
Ok(Some(m)) => Ok(Message { inner: m }),
Ok(None) => Err(RuntimeError::ChannelClosed),
Err(_) => Err(RuntimeError::Timeout),
}
}
pub fn try_next(&mut self) -> Option<Message> {
use futures_util::StreamExt;
block_on(async {
match tokio::time::timeout(Duration::from_millis(1), self.inner.next()).await {
Ok(Some(m)) => Some(Message { inner: m }),
_ => None,
}
})
}
}
#[derive(Clone)]
pub struct Connection {
client: async_nats::Client,
}
pub fn connect(url: &str) -> Result<Connection, RuntimeError> {
runtime::check_cancel()?;
let client = block_on(async_nats::connect(url)).map_err(mq_error)?;
Ok(Connection { client })
}
pub async fn connect_async(url: &str) -> Result<Connection, RuntimeError> {
let client = async_nats::connect(url).await.map_err(mq_error)?;
Ok(Connection { client })
}
impl Connection {
pub fn publish(&self, subject: &str, payload: &[u8]) -> Result<(), RuntimeError> {
runtime::check_cancel()?;
block_on(
self.client
.publish(subject.to_owned(), Bytes::copy_from_slice(payload)),
)
.map_err(mq_error)?;
block_on(self.client.flush()).map_err(mq_error)?;
runtime::check_cancel()?;
Ok(())
}
pub fn subscribe(&self, subject: &str) -> Result<Subscription, RuntimeError> {
runtime::check_cancel()?;
let sub = block_on(self.client.subscribe(subject.to_owned())).map_err(mq_error)?;
Ok(Subscription { inner: sub })
}
pub fn queue_subscribe(
&self,
subject: &str,
queue_group: &str,
) -> Result<Subscription, RuntimeError> {
runtime::check_cancel()?;
let sub = block_on(
self.client
.queue_subscribe(subject.to_owned(), queue_group.to_owned()),
)
.map_err(mq_error)?;
Ok(Subscription { inner: sub })
}
pub async fn publish_async(&self, subject: &str, payload: &[u8]) -> Result<(), RuntimeError> {
self.client
.publish(subject.to_owned(), Bytes::copy_from_slice(payload))
.await
.map_err(mq_error)?;
self.client.flush().await.map_err(mq_error)?;
Ok(())
}
pub async fn subscribe_async(&self, subject: &str) -> Result<Subscription, RuntimeError> {
let sub = self
.client
.subscribe(subject.to_owned())
.await
.map_err(mq_error)?;
Ok(Subscription { inner: sub })
}
}