use std::collections::HashSet;
use async_nats::jetstream;
use async_nats::subject::Subject;
use parking_lot::Mutex;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
use super::nats_config::NatsConnectConfig;
use super::nats_source::derive_nats_name;
pub struct NatsSink {
jetstream: jetstream::Context,
subject: Subject,
ensured: Mutex<HashSet<String>>,
}
impl NatsSink {
pub async fn connect(
config: &NatsConnectConfig,
subject: &str,
) -> Result<Self, async_nats::Error> {
let client = config.connect().await?;
let js = jetstream::new(client);
let stream_name = derive_nats_name("rsigma", subject);
js.get_or_create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec![subject.to_string()],
..Default::default()
})
.await?;
let ensured = Mutex::new(HashSet::from([subject.to_string()]));
Ok(NatsSink {
jetstream: js,
subject: Subject::from(subject),
ensured,
})
}
pub async fn send(&self, result: &ProcessResult) -> Result<(), RuntimeError> {
if result.is_empty() {
return Ok(());
}
let mut published = 0_usize;
for m in result {
let json = serde_json::to_string(m)?;
self.publish_one(&self.subject, &json).await?;
published += 1;
}
tracing::debug!(
subject = %self.subject,
messages = published,
"NATS messages published",
);
Ok(())
}
pub async fn send_raw(&self, json: &str) -> Result<(), RuntimeError> {
self.publish_one(&self.subject, json).await?;
tracing::debug!(subject = %self.subject, "NATS message published (raw)");
Ok(())
}
pub async fn send_incident(
&self,
json: &str,
subject_override: Option<&str>,
) -> Result<(), RuntimeError> {
match subject_override {
Some(subject) => {
self.ensure_stream(subject).await?;
self.publish_one(&Subject::from(subject.to_string()), json)
.await
}
None => self.publish_one(&self.subject, json).await,
}
}
async fn ensure_stream(&self, subject: &str) -> Result<(), RuntimeError> {
if self.ensured.lock().contains(subject) {
return Ok(());
}
let stream_name = derive_nats_name("rsigma", subject);
self.jetstream
.get_or_create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec![subject.to_string()],
..Default::default()
})
.await
.map_err(|e| {
tracing::warn!(subject, error = %e, "NATS incident stream ensure failed");
RuntimeError::Io(std::io::Error::other(e))
})?;
self.ensured.lock().insert(subject.to_string());
Ok(())
}
async fn publish_one(&self, subject: &Subject, json: &str) -> Result<(), RuntimeError> {
let ack = self
.jetstream
.publish(subject.clone(), json.to_string().into())
.await
.map_err(|e| {
tracing::warn!(subject = %subject, error = %e, "NATS publish failed");
RuntimeError::Io(std::io::Error::other(e))
})?;
ack.await.map_err(|e| {
tracing::warn!(subject = %subject, error = %e, "NATS publish ack failed");
RuntimeError::Io(std::io::Error::other(e))
})?;
Ok(())
}
}