use async_nats::jetstream;
use tokio_stream::StreamExt;
use super::EventSource;
fn derive_nats_name(prefix: &str, subject: &str) -> String {
let sanitized: String = subject
.chars()
.map(|c| match c {
'.' | '>' | '*' => '-',
_ => c,
})
.collect();
format!("{}-{}", prefix, sanitized.trim_end_matches('-'))
}
pub struct NatsSource {
messages: jetstream::consumer::pull::Stream,
}
impl NatsSource {
pub async fn connect(url: &str, subject: &str) -> Result<Self, async_nats::Error> {
let client = async_nats::connect(url).await?;
let jetstream = jetstream::new(client);
let stream_name = derive_nats_name("rsigma", subject);
let consumer_name = derive_nats_name("rsigma-daemon", subject);
let stream = jetstream
.get_or_create_stream(jetstream::stream::Config {
name: stream_name,
subjects: vec![subject.to_string()],
..Default::default()
})
.await?;
let consumer = stream
.get_or_create_consumer(
&consumer_name,
jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: subject.to_string(),
..Default::default()
},
)
.await?;
let messages = consumer.messages().await?;
Ok(NatsSource { messages })
}
}
impl EventSource for NatsSource {
async fn recv(&mut self) -> Option<String> {
loop {
match self.messages.next().await {
Some(Ok(msg)) => {
let payload = String::from_utf8_lossy(&msg.payload).to_string();
if let Err(e) = msg.ack().await {
tracing::warn!(error = %e, "Failed to ack NATS message");
}
if !payload.trim().is_empty() {
return Some(payload);
}
}
Some(Err(e)) => {
tracing::warn!(error = %e, "NATS message error");
continue;
}
None => return None,
}
}
}
}