#[cfg(feature = "nats")]
mod inner {
use crate::config::NatsConfig;
use crate::error::{ServiceError, ServiceResult};
#[derive(Clone)]
pub struct NatsClient {
client: async_nats::Client,
jetstream: Option<async_nats::jetstream::Context>,
}
impl NatsClient {
pub async fn connect(config: &NatsConfig) -> ServiceResult<Self> {
let client = async_nats::ConnectOptions::new()
.connect(&config.url)
.await
.map_err(|e| ServiceError::Unavailable(format!("NATS connect failed: {e}")))?;
let jetstream = if config.jetstream {
Some(async_nats::jetstream::new(client.clone()))
} else {
None
};
Ok(Self { client, jetstream })
}
pub fn client(&self) -> &async_nats::Client {
&self.client
}
pub fn jetstream(&self) -> Option<&async_nats::jetstream::Context> {
self.jetstream.as_ref()
}
pub async fn publish(
&self,
subject: impl AsRef<str>,
payload: bytes::Bytes,
) -> ServiceResult<()> {
self.client
.publish(subject.as_ref().to_owned(), payload)
.await?;
Ok(())
}
pub async fn request(
&self,
subject: impl AsRef<str>,
payload: bytes::Bytes,
) -> ServiceResult<async_nats::Message> {
let msg = self
.client
.request(subject.as_ref().to_owned(), payload)
.await?;
Ok(msg)
}
pub async fn subscribe(
&self,
subject: impl AsRef<str>,
) -> ServiceResult<async_nats::Subscriber> {
let sub = self.client.subscribe(subject.as_ref().to_owned()).await?;
Ok(sub)
}
pub async fn drain(&self) -> ServiceResult<()> {
self.client
.drain()
.await
.map_err(|e| ServiceError::Internal(format!("NATS drain failed: {e}")))?;
Ok(())
}
pub fn is_connected(&self) -> bool {
use async_nats::connection::State;
self.client.connection_state() == State::Connected
}
pub async fn ensure_stream(
&self,
config: async_nats::jetstream::stream::Config,
) -> ServiceResult<async_nats::jetstream::stream::Stream> {
let js = self.require_jetstream()?;
let stream = js.get_or_create_stream(config).await.map_err(|e| {
ServiceError::Internal(format!("JetStream get_or_create_stream failed: {e}"))
})?;
Ok(stream)
}
pub async fn publish_jetstream(
&self,
subject: impl AsRef<str>,
payload: bytes::Bytes,
) -> ServiceResult<async_nats::jetstream::context::PublishAckFuture> {
let js = self.require_jetstream()?;
let ack = js
.publish(subject.as_ref().to_owned(), payload)
.await?;
Ok(ack)
}
pub async fn pull_subscribe(
&self,
stream_name: &str,
consumer_config: async_nats::jetstream::consumer::pull::Config,
) -> ServiceResult<async_nats::jetstream::consumer::PullConsumer> {
let js = self.require_jetstream()?;
let stream = js
.get_stream(stream_name)
.await
.map_err(|e| ServiceError::Internal(format!("JetStream get_stream failed: {e}")))?;
let consumer = stream
.get_or_create_consumer(&consumer_config.name.clone().unwrap_or_default(), consumer_config)
.await
.map_err(|e| {
ServiceError::Internal(format!("JetStream get_or_create_consumer failed: {e}"))
})?;
Ok(consumer)
}
fn require_jetstream(
&self,
) -> ServiceResult<&async_nats::jetstream::Context> {
self.jetstream
.as_ref()
.ok_or_else(|| ServiceError::Configuration("jetstream not enabled".to_string()))
}
}
impl std::fmt::Debug for NatsClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsClient")
.field("connected", &self.is_connected())
.field("jetstream", &self.jetstream.is_some())
.finish()
}
}
pub type SharedNatsClient = std::sync::Arc<NatsClient>;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_connect_bad_url_returns_error() {
let config = NatsConfig {
url: "nats://localhost:1".to_string(),
jetstream: false,
lease_duration: 30,
};
let result = NatsClient::connect(&config).await;
assert!(result.is_err(), "expected connect error for unreachable port");
match result.unwrap_err() {
ServiceError::Unavailable(_) => {}
other => panic!("expected Unavailable variant, got {other:?}"),
}
}
#[tokio::test]
async fn test_jetstream_helpers_fail_without_jetstream() {
let err = ServiceError::Configuration("jetstream not enabled".to_string());
match err {
ServiceError::Configuration(ref msg) => {
assert_eq!(msg, "jetstream not enabled");
}
other => panic!("unexpected variant: {other:?}"),
}
}
}
}
#[cfg(feature = "nats")]
pub use inner::{NatsClient, SharedNatsClient};