use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::{
error::FusekiResult,
streaming::{RDFEvent, StreamConsumer, StreamProducer},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConfig {
pub brokers: Vec<String>,
pub security_protocol: Option<String>,
pub sasl_mechanism: Option<String>,
pub sasl_username: Option<String>,
pub sasl_password: Option<String>,
pub properties: HashMap<String, String>,
}
impl Default for KafkaConfig {
fn default() -> Self {
Self {
brokers: vec!["localhost:9092".to_string()],
security_protocol: None,
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
properties: HashMap::new(),
}
}
}
impl From<crate::streaming::KafkaConfig> for KafkaConfig {
fn from(config: crate::streaming::KafkaConfig) -> Self {
Self {
brokers: config.brokers,
security_protocol: None,
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
properties: HashMap::new(),
}
}
}
pub struct KafkaProducer {
config: KafkaConfig,
}
impl KafkaProducer {
pub async fn new(config: KafkaConfig) -> FusekiResult<Self> {
tracing::info!("Creating Kafka producer with brokers: {:?}", config.brokers);
Ok(Self { config })
}
}
#[async_trait]
impl StreamProducer for KafkaProducer {
async fn send(&self, event: RDFEvent) -> FusekiResult<()> {
tracing::debug!("Sending RDF event to Kafka");
tracing::info!("Would send to Kafka: {:?}", event);
Ok(())
}
async fn send_batch(&self, events: Vec<RDFEvent>) -> FusekiResult<()> {
tracing::debug!("Sending batch of {} RDF events to Kafka", events.len());
for event in events {
self.send(event).await?;
}
Ok(())
}
async fn flush(&self) -> FusekiResult<()> {
tracing::debug!("Flushing Kafka producer");
Ok(())
}
}
pub struct KafkaConsumer {
config: KafkaConfig,
}
impl KafkaConsumer {
pub async fn new(config: KafkaConfig) -> FusekiResult<Self> {
tracing::info!("Creating Kafka consumer with brokers: {:?}", config.brokers);
Ok(Self { config })
}
}
#[async_trait]
impl StreamConsumer for KafkaConsumer {
async fn subscribe(
&self,
_handler: Box<dyn crate::streaming::EventHandler>,
) -> FusekiResult<()> {
tracing::info!("Subscribing to Kafka events with handler");
Ok(())
}
async fn unsubscribe(&self) -> FusekiResult<()> {
tracing::info!("Unsubscribing from Kafka events");
Ok(())
}
async fn commit(&self) -> FusekiResult<()> {
tracing::debug!("Committing Kafka consumer offsets");
Ok(())
}
}