use std::{fmt::Debug, sync::Arc};
use async_nats::{
Client, ServerAddr,
jetstream::{self, Context},
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use topmesys::{EventConsumer, EventMessage};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct NatsConfig {
url: String,
stream: String,
consumer: String,
}
impl NatsConfig {
pub fn new(url: String, stream: String, consumer: String) -> Self {
Self {
url,
stream,
consumer,
}
}
pub async fn create_client(&self) -> anyhow::Result<async_nats::Client> {
let addrs: Vec<ServerAddr> = self
.url
.split(",")
.filter_map(|url| url.parse::<ServerAddr>().ok())
.collect();
Ok(async_nats::connect(addrs).await?)
}
pub fn stream(&self) -> &str {
&self.stream
}
pub async fn generate_consumer(
&self,
client: async_nats::Client,
) -> anyhow::Result<async_nats::jetstream::consumer::PullConsumer> {
Ok(async_nats::jetstream::new(client)
.get_consumer_from_stream(&self.consumer, &self.stream)
.await?)
}
pub fn consumer(&self) -> &str {
&self.consumer
}
}
#[derive(Debug)]
pub struct NatsGateway {
stream: String,
ctx: Context,
broker_topic: String,
}
impl NatsGateway {
#[instrument]
pub fn new<S: Into<String> + Debug>(client: Client, stream: S, broker_topic: S) -> Self {
Self {
stream: stream.into(),
ctx: jetstream::new(client.clone()),
broker_topic: broker_topic.into(),
}
}
#[instrument(skip(self), level = "debug")]
pub async fn publish_to_stream(&self, msg: &EventMessage) -> anyhow::Result<()> {
self.ctx
.publish(
format!("{}.{}", self.stream, msg.topic().text()),
msg.content().clone(),
)
.await?
.await?;
Ok(())
}
pub fn stream(&self) -> &str {
&self.stream
}
pub fn ctx(&self) -> &Context {
&self.ctx
}
}
#[async_trait]
impl EventConsumer for NatsGateway {
fn consumes(&self) -> &String {
&self.broker_topic
}
async fn handle_event(&self, event: Arc<EventMessage>) -> anyhow::Result<()> {
self.publish_to_stream(event.as_ref()).await
}
}