use async_nats::jetstream;
use async_trait::async_trait;
use serde_json;
use super::{EventPublisher, QueueError, QueueResult};
use crate::runtime::AgentEvent;
pub struct NatsEventPublisher {
jetstream: jetstream::Context,
subject: String,
}
impl NatsEventPublisher {
#[must_use]
pub fn new(jetstream: jetstream::Context, subject: impl Into<String>) -> Self {
Self {
jetstream,
subject: subject.into(),
}
}
pub async fn connect(url: &str, subject: impl Into<String>) -> QueueResult<Self> {
let client = async_nats::connect(url)
.await
.map_err(|e| QueueError::ConnectionFailed {
message: format!("NATS connect failed: {e}"),
})?;
let jetstream = jetstream::new(client);
Ok(Self {
jetstream,
subject: subject.into(),
})
}
}
#[async_trait]
impl EventPublisher for NatsEventPublisher {
async fn publish(&self, event: AgentEvent) -> QueueResult<()> {
let payload = serde_json::to_vec(&event).map_err(|e| QueueError::SerializationFailed {
message: format!("failed to serialize AgentEvent: {e}"),
})?;
let _ack = self
.jetstream
.publish(self.subject.clone(), payload.into())
.await
.map_err(|e| QueueError::PublishFailed {
message: format!("NATS publish failed: {e}"),
})?;
tracing::debug!(
subject = %self.subject,
"published AgentEvent to NATS JetStream",
);
Ok(())
}
}