use crate::sinks::Sink;
use crate::{LogRecord, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
pub struct KafkaSink {
producer: rdkafka::producer::FutureProducer,
topic: String,
batch_size: usize,
batch_timeout_ms: u64,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
}
impl KafkaSink {
pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self {
let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size)));
let last_flush = Arc::new(std::sync::atomic::AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
));
let sink = KafkaSink {
producer: producer.clone(),
topic: topic.clone(),
batch_size,
batch_timeout_ms,
entries: entries.clone(),
last_flush: last_flush.clone(),
};
tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms));
sink
}
#[allow(dead_code)]
pub fn batch_timeout(&self) -> u64 {
self.batch_timeout_ms
}
#[allow(dead_code)]
pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) {
self.batch_timeout_ms = new_timeout_ms;
}
async fn periodic_flush(
producer: rdkafka::producer::FutureProducer,
topic: String,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
timeout_ms: u64,
) {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = last_flush.load(std::sync::atomic::Ordering::Relaxed);
if now - last >= timeout_ms {
let mut batch = entries.lock().await;
if !batch.is_empty() {
Self::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<UnifiedLogEntry>) {
for entry in entries {
let payload = match serde_json::to_string(&entry) {
Ok(p) => p,
Err(e) => {
eprintln!("Failed to serialize log entry: {e}");
continue;
}
};
let span_id = entry.get_timestamp().to_rfc3339();
let _ = producer
.send(
rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id),
std::time::Duration::from_secs(5),
)
.await;
}
}
}
#[async_trait]
impl Sink for KafkaSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut batch = self.entries.lock().await;
batch.push(entry.clone());
let should_flush_by_size = batch.len() >= self.batch_size;
let should_flush_by_time = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.batch_timeout_ms
};
if should_flush_by_size || should_flush_by_time {
let entries_to_send: Vec<UnifiedLogEntry> = batch.drain(..).collect();
let producer = self.producer.clone();
let topic = self.topic.clone();
self.last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
tokio::spawn(async move {
KafkaSink::send_batch(&producer, &topic, entries_to_send).await;
});
}
}
}
impl Drop for KafkaSink {
fn drop(&mut self) {
let producer = self.producer.clone();
let topic = self.topic.clone();
let entries = self.entries.clone();
let last_flush = self.last_flush.clone();
tokio::spawn(async move {
let mut batch = entries.lock().await;
if !batch.is_empty() {
KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
}
});
eprintln!("Dropping KafkaSink with topic: {0}", self.topic);
}
}