use async_nats::jetstream::consumer::{PullConsumer, pull::Config as ConsumerConfig};
use ruststream::Broker;
use crate::{
error::NatsError, publisher::NatsPublisher, subscribe_options::SubscribeOptions,
subscriber::NatsSubscriber,
};
#[derive(Clone)]
pub struct NatsBroker {
client: async_nats::Client,
}
impl std::fmt::Debug for NatsBroker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsBroker").finish_non_exhaustive()
}
}
impl NatsBroker {
pub async fn connect(addrs: impl async_nats::ToServerAddrs) -> Result<Self, NatsError> {
let client = async_nats::connect(addrs)
.await
.map_err(|err| NatsError::Connect(Box::new(err)))?;
Ok(Self { client })
}
#[must_use]
pub const fn from_client(client: async_nats::Client) -> Self {
Self { client }
}
#[must_use]
pub fn client(&self) -> async_nats::Client {
self.client.clone()
}
pub async fn subscribe(&self, opts: SubscribeOptions) -> Result<NatsSubscriber, NatsError> {
opts.validate()?;
if opts.is_jetstream() {
self.subscribe_jetstream(opts).await
} else {
self.subscribe_core(opts).await
}
}
async fn subscribe_core(&self, opts: SubscribeOptions) -> Result<NatsSubscriber, NatsError> {
let subject = opts.subject().to_owned();
let inner = if let Some(queue) = opts.queue_group_ref() {
self.client
.queue_subscribe(subject.clone(), queue.to_owned())
.await
.map_err(|err| NatsError::Subscribe(Box::new(err)))?
} else {
self.client
.subscribe(subject.clone())
.await
.map_err(|err| NatsError::Subscribe(Box::new(err)))?
};
Ok(NatsSubscriber::from_core(subject, inner))
}
async fn subscribe_jetstream(
&self,
opts: SubscribeOptions,
) -> Result<NatsSubscriber, NatsError> {
let ctx = async_nats::jetstream::new(self.client.clone());
let stream_name = opts
.stream_ref()
.expect("validated jetstream option")
.to_owned();
let stream = ctx
.get_stream(&stream_name)
.await
.map_err(|err| NatsError::JetStream(Box::new(err)))?;
let consumer_cfg = ConsumerConfig {
durable_name: opts.durable_ref().map(str::to_owned),
filter_subject: opts.filter_subject_or_default(),
max_ack_pending: opts.max_ack_pending_or_default(),
ack_wait: opts.ack_wait_or_default(),
deliver_policy: opts.deliver_policy_or_default(),
..Default::default()
};
let consumer: PullConsumer = stream
.create_consumer(consumer_cfg)
.await
.map_err(|err| NatsError::JetStream(Box::new(err)))?;
let messages = consumer
.messages()
.await
.map_err(|err| NatsError::JetStream(Box::new(err)))?;
Ok(NatsSubscriber::from_jetstream(
opts.subject().to_owned(),
stream_name,
messages,
))
}
#[must_use]
pub fn publisher(&self) -> NatsPublisher {
NatsPublisher::new(self.client.clone())
}
pub async fn shutdown_client(&self) {
let _ = self.client.drain().await;
}
}
impl Broker for NatsBroker {
type Subscriber = NatsSubscriber;
type Publisher = NatsPublisher;
type Error = NatsError;
async fn connect(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.shutdown_client().await;
Ok(())
}
}