use std::time::Duration;
use async_nats::jetstream;
use futures_util::StreamExt;
use tracing::{debug, error, instrument, warn};
use crate::application::ports::{NatsError, NatsPublisher};
use crate::config::model::{ConsumerConfig, StreamConfig};
const RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2000];
pub struct JetStreamPublisher {
jetstream: jetstream::Context,
}
impl JetStreamPublisher {
#[must_use]
pub fn new(client: async_nats::Client) -> Self {
let jetstream = jetstream::new(client);
Self { jetstream }
}
#[must_use]
pub fn jetstream_context(&self) -> &jetstream::Context {
&self.jetstream
}
}
impl NatsPublisher for JetStreamPublisher {
#[instrument(skip(self, payload, content_type), fields(subject = %subject))]
fn publish(
&self,
subject: &str,
payload: &[u8],
content_type: &str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NatsError>> + Send + '_>>
{
let subject = subject.to_owned();
let payload = bytes::Bytes::copy_from_slice(payload);
let content_type = content_type.to_owned();
Box::pin(async move {
let mut headers = async_nats::HeaderMap::new();
headers.insert("Content-Type", content_type.as_str());
let mut last_err = None;
for (attempt, delay_ms) in RETRY_DELAYS_MS.iter().enumerate() {
let publish_result = self
.jetstream
.publish_with_headers(subject.clone(), headers.clone(), payload.clone())
.await;
match publish_result {
Ok(ack_future) => match ack_future.await {
Ok(_ack) => {
debug!(subject = %subject, "published to jetstream");
return Ok(());
}
Err(e) => {
warn!(
subject = %subject,
attempt = attempt + 1,
error = %e,
"jetstream ack failed, retrying"
);
last_err = Some(e.to_string());
}
},
Err(e) => {
warn!(
subject = %subject,
attempt = attempt + 1,
error = %e,
"jetstream publish failed, retrying"
);
last_err = Some(e.to_string());
}
}
tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
}
let reason = last_err.unwrap_or_else(|| "unknown error".to_owned());
error!(subject = %subject, reason = %reason, "publish failed after all retries");
Err(NatsError::PublishFailed { subject, reason })
})
}
#[instrument(skip(self, config), fields(stream = %config.name))]
fn ensure_stream(
&self,
config: &StreamConfig,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NatsError>> + Send + '_>>
{
let config = config.clone();
Box::pin(async move {
let storage = match config.storage.as_str() {
"memory" => jetstream::stream::StorageType::Memory,
_ => jetstream::stream::StorageType::File,
};
let retention = match config.retention.as_str() {
"interest" => jetstream::stream::RetentionPolicy::Interest,
"workqueue" => jetstream::stream::RetentionPolicy::WorkQueue,
_ => jetstream::stream::RetentionPolicy::Limits,
};
let discard = match config.discard.as_str() {
"new" => jetstream::stream::DiscardPolicy::New,
_ => jetstream::stream::DiscardPolicy::Old,
};
let stream_config = jetstream::stream::Config {
name: config.name.clone(),
subjects: config.subjects.clone(),
storage,
retention,
max_age: Duration::from_secs(config.max_age_secs),
max_bytes: config.max_bytes,
max_messages: config.max_msgs,
max_message_size: config.max_msg_size,
discard,
num_replicas: config.num_replicas,
duplicate_window: Duration::from_secs(config.duplicate_window_secs),
..Default::default()
};
match self.jetstream.get_stream(&config.name).await {
Ok(_existing) => {
debug!(stream = %config.name, "stream already exists");
Ok(())
}
Err(_) => {
self.jetstream
.create_stream(stream_config)
.await
.map_err(|e| NatsError::StreamSetupFailed {
stream: config.name.clone(),
reason: e.to_string(),
})?;
debug!(stream = %config.name, "stream created");
Ok(())
}
}
})
}
#[instrument(skip(self, config), fields(consumer = %config.name, stream = %config.stream))]
fn ensure_consumer(
&self,
config: &ConsumerConfig,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NatsError>> + Send + '_>>
{
let config = config.clone();
Box::pin(async move {
let stream = self
.jetstream
.get_stream(&config.stream)
.await
.map_err(|e| NatsError::ConsumerSetupFailed {
consumer: config.name.clone(),
reason: format!("stream not found: {e}"),
})?;
let ack_policy = match config.ack_policy.as_str() {
"none" => jetstream::consumer::AckPolicy::None,
"all" => jetstream::consumer::AckPolicy::All,
_ => jetstream::consumer::AckPolicy::Explicit,
};
let deliver_policy = match config.deliver_policy.as_str() {
"last" => jetstream::consumer::DeliverPolicy::Last,
"new" => jetstream::consumer::DeliverPolicy::New,
_ => jetstream::consumer::DeliverPolicy::All,
};
let inactive_threshold = if config.inactive_threshold_secs > 0 {
Duration::from_secs(config.inactive_threshold_secs)
} else {
Duration::ZERO
};
let consumer_config = jetstream::consumer::pull::Config {
durable_name: if config.durable {
Some(config.name.clone())
} else {
None
},
name: Some(config.name.clone()),
ack_policy,
ack_wait: Duration::from_secs(config.ack_wait_secs),
max_deliver: config.max_deliver,
deliver_policy,
filter_subject: config.filter_subject.clone().unwrap_or_default(),
max_ack_pending: config.max_ack_pending,
inactive_threshold,
..Default::default()
};
match stream
.get_consumer::<jetstream::consumer::pull::Config>(&config.name)
.await
{
Ok(_existing) => {
debug!(consumer = %config.name, "consumer already exists");
Ok(())
}
Err(_) => {
stream.create_consumer(consumer_config).await.map_err(|e| {
NatsError::ConsumerSetupFailed {
consumer: config.name.clone(),
reason: e.to_string(),
}
})?;
debug!(consumer = %config.name, "consumer created");
Ok(())
}
}
})
}
fn health_check(
&self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NatsError>> + Send + '_>>
{
Box::pin(async move {
let mut stream_list = self.jetstream.streams();
match stream_list.next().await {
Some(Ok(_)) | None => Ok(()),
Some(Err(e)) => Err(NatsError::HealthCheckFailed(e.to_string())),
}
})
}
}