use async_nats::jetstream;
use async_nats::jetstream::consumer::DeliverPolicy;
use tokio_stream::StreamExt;
use super::nats_config::NatsConnectConfig;
use super::{AckToken, EventSource, RawEvent};
pub(crate) fn derive_nats_name(prefix: &str, subject: &str) -> String {
let sanitized: String = subject
.chars()
.map(|c| match c {
'.' | '>' | '*' => '-',
_ => c,
})
.collect();
format!("{}-{}", prefix, sanitized.trim_end_matches('-'))
}
#[derive(Debug, Clone, Default)]
pub enum ReplayPolicy {
#[default]
Resume,
FromSequence(u64),
FromTime(time::OffsetDateTime),
Latest,
}
pub struct NatsSource {
messages: jetstream::consumer::pull::Stream,
}
impl NatsSource {
pub async fn connect(
config: &NatsConnectConfig,
subject: &str,
replay: &ReplayPolicy,
consumer_group: Option<&str>,
) -> Result<Self, async_nats::Error> {
let client = config.connect().await?;
let jetstream = jetstream::new(client);
let stream_name = derive_nats_name("rsigma", subject);
let consumer_name = match consumer_group {
Some(group) => group.to_string(),
None => derive_nats_name("rsigma-daemon", subject),
};
let stream = jetstream
.get_or_create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec![subject.to_string()],
..Default::default()
})
.await?;
let deliver_policy = match replay {
ReplayPolicy::Resume => DeliverPolicy::All,
ReplayPolicy::FromSequence(seq) => DeliverPolicy::ByStartSequence {
start_sequence: *seq,
},
ReplayPolicy::FromTime(time) => DeliverPolicy::ByStartTime { start_time: *time },
ReplayPolicy::Latest => DeliverPolicy::Last,
};
let consumer = stream
.get_or_create_consumer(
&consumer_name,
jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: subject.to_string(),
deliver_policy,
..Default::default()
},
)
.await?;
let messages = consumer.messages().await?;
Ok(NatsSource { messages })
}
}
impl EventSource for NatsSource {
async fn recv(&mut self) -> Option<RawEvent> {
loop {
match self.messages.next().await {
Some(Ok(msg)) => {
let payload = String::from_utf8_lossy(&msg.payload).to_string();
if !payload.trim().is_empty() {
return Some(RawEvent {
payload,
ack_token: AckToken::Nats(Box::new(msg)),
});
}
}
Some(Err(e)) => {
tracing::warn!(error = %e, "NATS message error");
continue;
}
None => return None,
}
}
}
}