use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{
error::FusekiResult,
streaming::{RDFEvent, StreamConsumer, StreamProducer},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsConfig {
pub url: String,
pub subject_prefix: String,
pub token: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
}
impl Default for NatsConfig {
fn default() -> Self {
Self {
url: "nats://localhost:4222".to_string(),
subject_prefix: "rdf".to_string(),
token: None,
username: None,
password: None,
}
}
}
impl From<crate::streaming::NatsConfig> for NatsConfig {
fn from(config: crate::streaming::NatsConfig) -> Self {
let url = config
.servers
.first()
.map(|u| u.to_string())
.unwrap_or_else(|| "nats://localhost:4222".to_string());
let (username, password, token) = match config.auth {
Some(crate::streaming::NatsAuth::UserPass { username, password }) => {
(Some(username), Some(password), None)
}
Some(crate::streaming::NatsAuth::Token(token)) => (None, None, Some(token)),
Some(crate::streaming::NatsAuth::NKey { .. }) => {
(None, None, None)
}
None => (None, None, None),
};
Self {
url,
subject_prefix: config.subject_prefix,
token,
username,
password,
}
}
}
pub struct NatsProducer {
config: NatsConfig,
}
impl NatsProducer {
pub async fn new(config: NatsConfig) -> FusekiResult<Self> {
tracing::info!("Creating NATS producer for: {}", config.url);
Ok(Self { config })
}
}
#[async_trait]
impl StreamProducer for NatsProducer {
async fn send(&self, event: RDFEvent) -> FusekiResult<()> {
tracing::debug!("Sending RDF event to NATS");
tracing::info!("Would send to NATS: {:?}", event);
Ok(())
}
async fn send_batch(&self, events: Vec<RDFEvent>) -> FusekiResult<()> {
tracing::debug!("Sending batch of {} RDF events to NATS", events.len());
for event in events {
self.send(event).await?;
}
Ok(())
}
async fn flush(&self) -> FusekiResult<()> {
tracing::debug!("Flushing NATS producer");
Ok(())
}
}
pub struct NatsConsumer {
config: NatsConfig,
}
impl NatsConsumer {
pub async fn new(config: NatsConfig) -> FusekiResult<Self> {
tracing::info!("Creating NATS consumer for: {}", config.url);
Ok(Self { config })
}
}
#[async_trait]
impl StreamConsumer for NatsConsumer {
async fn subscribe(
&self,
_handler: Box<dyn crate::streaming::EventHandler>,
) -> FusekiResult<()> {
tracing::info!("Subscribing to NATS events with handler");
Ok(())
}
async fn unsubscribe(&self) -> FusekiResult<()> {
tracing::info!("Unsubscribing from NATS events");
Ok(())
}
async fn commit(&self) -> FusekiResult<()> {
tracing::debug!("Committing NATS consumer (no-op for NATS)");
Ok(())
}
}