use async_nats::jetstream;
use async_nats::subject::Subject;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
use super::nats_config::NatsConnectConfig;
use super::nats_source::derive_nats_name;
pub struct NatsSink {
jetstream: jetstream::Context,
subject: Subject,
}
impl NatsSink {
pub async fn connect(
config: &NatsConnectConfig,
subject: &str,
) -> Result<Self, async_nats::Error> {
let client = config.connect().await?;
let js = jetstream::new(client);
let stream_name = derive_nats_name("rsigma", subject);
js.get_or_create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec![subject.to_string()],
..Default::default()
})
.await?;
Ok(NatsSink {
jetstream: js,
subject: Subject::from(subject),
})
}
pub async fn send(&self, result: &ProcessResult) -> Result<(), RuntimeError> {
if result.detections.is_empty() && result.correlations.is_empty() {
return Ok(());
}
let mut published = 0_usize;
for m in &result.detections {
let json = serde_json::to_string(m)?;
self.publish_one(&json).await?;
published += 1;
}
for m in &result.correlations {
let json = serde_json::to_string(m)?;
self.publish_one(&json).await?;
published += 1;
}
tracing::debug!(
subject = %self.subject,
messages = published,
"NATS messages published",
);
Ok(())
}
pub async fn send_raw(&self, json: &str) -> Result<(), RuntimeError> {
self.publish_one(json).await?;
tracing::debug!(subject = %self.subject, "NATS message published (raw)");
Ok(())
}
async fn publish_one(&self, json: &str) -> Result<(), RuntimeError> {
let ack = self
.jetstream
.publish(self.subject.clone(), json.to_string().into())
.await
.map_err(|e| {
tracing::warn!(subject = %self.subject, error = %e, "NATS publish failed");
RuntimeError::Io(std::io::Error::other(e))
})?;
ack.await.map_err(|e| {
tracing::warn!(subject = %self.subject, error = %e, "NATS publish ack failed");
RuntimeError::Io(std::io::Error::other(e))
})?;
Ok(())
}
}