use async_nats::HeaderMap;
use async_nats::jetstream::{self, Context};
use std::sync::Arc;
use std::time::Duration;
use super::publisher::NatsError;
pub const EVENT_STREAM: &str = "noetl_events";
pub const EVENT_SUBJECT_PREFIX: &str = "noetl.events";
pub const EVENT_SUBJECT_WILDCARD: &str = "noetl.events.>";
#[derive(Clone)]
pub struct EventStreamPublisher {
js: Context,
}
impl EventStreamPublisher {
pub async fn new(
client: Arc<async_nats::Client>,
dedup_window: Duration,
max_age: Duration,
) -> Result<Self, NatsError> {
let js = jetstream::new((*client).clone());
Self::ensure_stream(&js, dedup_window, max_age).await?;
Ok(Self { js })
}
async fn ensure_stream(
js: &Context,
dedup_window: Duration,
max_age: Duration,
) -> Result<(), NatsError> {
match js.get_stream(EVENT_STREAM).await {
Ok(_) => {
tracing::debug!(stream = EVENT_STREAM, "Using existing event stream");
Ok(())
}
Err(_) => {
let config = jetstream::stream::Config {
name: EVENT_STREAM.to_string(),
subjects: vec![EVENT_SUBJECT_WILDCARD.to_string()],
storage: jetstream::stream::StorageType::File,
duplicate_window: dedup_window,
max_age,
..Default::default()
};
js.create_stream(config)
.await
.map_err(|e| NatsError::JetStream(e.to_string()))?;
tracing::info!(
stream = EVENT_STREAM,
subject = EVENT_SUBJECT_WILDCARD,
"Created event stream (CQRS write-path queue, #103 phase 2a)"
);
Ok(())
}
}
}
pub async fn publish_event(
&self,
event_id: i64,
event_type: &str,
payload: &[u8],
) -> Result<(), NatsError> {
let subject = format!("{EVENT_SUBJECT_PREFIX}.{event_type}");
let mut headers = HeaderMap::new();
headers.insert("Nats-Msg-Id", event_id.to_string().as_str());
self.js
.publish_with_headers(subject, headers, payload.to_vec().into())
.await
.map_err(|e| NatsError::Publish(e.to_string()))?
.await
.map_err(|e| NatsError::Publish(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stream_constants_are_stable() {
assert_eq!(EVENT_STREAM, "noetl_events");
assert_eq!(EVENT_SUBJECT_WILDCARD, "noetl.events.>");
assert!(EVENT_SUBJECT_WILDCARD.starts_with(EVENT_SUBJECT_PREFIX));
}
}