use super::types::KafkaProducerConfig;
use super::event::KafkaEvent;
use crate::event::StreamEvent;
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info};
pub struct KafkaProducer {
config: KafkaProducerConfig,
stats: Arc<RwLock<ProducerStats>>,
}
#[derive(Debug, Default)]
pub struct ProducerStats {
pub messages_sent: u64,
pub messages_failed: u64,
pub bytes_sent: u64,
pub avg_latency_ms: f64,
pub transactions_committed: u64,
pub transactions_aborted: u64,
}
impl KafkaProducer {
pub fn new(config: KafkaProducerConfig) -> Result<Self> {
info!("Creating Kafka producer with config: {:?}", config);
Ok(Self {
config,
stats: Arc::new(RwLock::new(ProducerStats::default())),
})
}
pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
let kafka_event = KafkaEvent::from(event);
debug!("Sending Kafka event: {:?}", kafka_event);
{
let mut stats = self.stats.write().await;
stats.messages_sent += 1;
stats.bytes_sent += serde_json::to_string(&kafka_event)?.len() as u64;
}
info!("Successfully sent event to Kafka topic: {}", self.config.topic);
Ok(())
}
pub async fn send_batch(&self, events: Vec<StreamEvent>) -> Result<Vec<Result<(), String>>> {
let mut results = Vec::new();
for event in events {
match self.send_event(event).await {
Ok(()) => results.push(Ok(())),
Err(e) => results.push(Err(e.to_string())),
}
}
Ok(results)
}
pub async fn get_stats(&self) -> ProducerStats {
self.stats.read().await.clone()
}
pub async fn flush(&self) -> Result<()> {
info!("Flushing Kafka producer");
Ok(())
}
pub async fn begin_transaction(&self) -> Result<()> {
if self.config.transaction_id.is_some() {
info!("Beginning Kafka transaction");
}
Ok(())
}
pub async fn commit_transaction(&self) -> Result<()> {
if self.config.transaction_id.is_some() {
let mut stats = self.stats.write().await;
stats.transactions_committed += 1;
info!("Committed Kafka transaction");
}
Ok(())
}
pub async fn abort_transaction(&self) -> Result<()> {
if self.config.transaction_id.is_some() {
let mut stats = self.stats.write().await;
stats.transactions_aborted += 1;
info!("Aborted Kafka transaction");
}
Ok(())
}
}
impl Clone for ProducerStats {
fn clone(&self) -> Self {
Self {
messages_sent: self.messages_sent,
messages_failed: self.messages_failed,
bytes_sent: self.bytes_sent,
avg_latency_ms: self.avg_latency_ms,
transactions_committed: self.transactions_committed,
transactions_aborted: self.transactions_aborted,
}
}
}