use std::{fmt::Debug, sync::Arc};
use async_nats::{
Client,
jetstream::{self, Context},
};
use async_trait::async_trait;
use tracing::instrument;
use topmesys::{EventConsumer, EventMessage};
#[derive(Debug)]
pub struct NatsGateway {
stream: String,
ctx: Context,
broker_topic: String,
}
impl NatsGateway {
#[instrument]
pub fn new<S: Into<String> + Debug>(client: Client, stream: S, broker_topic: S) -> Self {
Self {
stream: stream.into(),
ctx: jetstream::new(client.clone()),
broker_topic: broker_topic.into(),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn publish_to_stream(&self, msg: &EventMessage) -> anyhow::Result<()> {
self.ctx
.publish(
format!("{}.{}", self.stream, msg.topic().text()),
msg.content().clone(),
)
.await?
.await?;
Ok(())
}
pub fn stream(&self) -> &str {
&self.stream
}
pub fn ctx(&self) -> &Context {
&self.ctx
}
}
#[async_trait]
impl EventConsumer for NatsGateway {
fn consumes(&self) -> &String {
&self.broker_topic
}
async fn handle_event(&self, event: Arc<EventMessage>) -> anyhow::Result<()> {
self.publish_to_stream(event.as_ref()).await
}
}