use aa_core::storage::{AuditEntry, AuditSink, Result, StorageError};
use async_trait::async_trait;
use super::config::NatsConfig;
use super::subject::subject_for;
pub struct NatsAuditSink {
client: async_nats::Client,
}
impl NatsAuditSink {
#[must_use]
pub fn new(client: async_nats::Client) -> Self {
Self { client }
}
pub async fn connect(config: &NatsConfig) -> Result<Self> {
let client = config
.connect()
.await
.map_err(|e| StorageError::Backend(e.to_string()))?;
Ok(Self::new(client))
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.client.connection_state() == async_nats::connection::State::Connected
}
}
#[async_trait]
impl AuditSink for NatsAuditSink {
async fn emit(&self, event: AuditEntry) -> Result<()> {
if !self.is_connected() {
return Err(StorageError::Backend("nats connection is not established".to_string()));
}
let subject = subject_for(&event);
let payload = serde_json::to_vec(&event).map_err(|e| StorageError::Serialization(e.to_string()))?;
self.client
.publish(subject, payload.into())
.await
.map_err(|e| StorageError::Backend(e.to_string()))?;
Ok(())
}
}