use crate::config::ClusterConfig;
use crate::error::{KlagError, Result};
use crate::kafka::client::TopicPartition;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
use rdkafka::Offset;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use tracing::{debug, instrument, warn};
#[derive(Debug, Clone)]
pub struct TimestampFetchResult {
pub timestamp_ms: i64,
}
pub struct TimestampConsumer {
config: ClusterConfig,
cluster_name: String,
fetch_timeout: Duration,
consumer_counter: AtomicU64,
pool: Mutex<Vec<BaseConsumer>>,
pool_size: usize,
}
impl TimestampConsumer {
pub fn with_pool_size(config: &ClusterConfig, pool_size: usize) -> Result<Self> {
let mut consumer = Self {
config: config.clone(),
cluster_name: config.name.clone(),
fetch_timeout: Duration::from_secs(5),
consumer_counter: AtomicU64::new(0),
pool: Mutex::new(Vec::with_capacity(pool_size)),
pool_size,
};
for _ in 0..pool_size {
let c = consumer.create_consumer()?;
consumer.pool.get_mut().unwrap().push(c);
}
debug!(
cluster = %consumer.cluster_name,
pool_size = pool_size,
"Created timestamp consumer pool"
);
Ok(consumer)
}
fn create_consumer(&self) -> Result<BaseConsumer> {
let counter = self.consumer_counter.fetch_add(1, Ordering::Relaxed);
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.config.bootstrap_servers)
.set(
"client.id",
format!("klag-exporter-ts-{}-{}", self.config.name, counter),
)
.set(
"group.id",
format!("klag-exporter-ts-internal-{}", self.config.name),
)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set("fetch.max.bytes", "1048576")
.set("max.partition.fetch.bytes", "262144")
.set("queued.min.messages", "100")
.set("queued.max.messages.kbytes", "1024")
.set("topic.metadata.refresh.interval.ms", "-1");
for (key, value) in &self.config.consumer_properties {
client_config.set(key, value);
}
client_config.create().map_err(KlagError::Kafka)
}
fn acquire(&self) -> Result<BaseConsumer> {
let mut pool = self
.pool
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(consumer) = pool.pop() {
Ok(consumer)
} else {
self.create_consumer()
}
}
fn release(&self, consumer: BaseConsumer) {
let empty = rdkafka::TopicPartitionList::new();
if let Err(e) = consumer.assign(&empty) {
warn!(error = %e, "Failed to unassign consumer before returning to pool");
return;
}
let mut pool = self
.pool
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if pool.len() < self.pool_size {
pool.push(consumer);
}
}
#[instrument(skip(self), fields(cluster = %self.cluster_name, topic = %tp.topic, partition = tp.partition, offset = offset))]
pub fn fetch_timestamp(
&self,
tp: &TopicPartition,
offset: i64,
) -> Result<Option<TimestampFetchResult>> {
use rdkafka::TopicPartitionList;
let consumer = self.acquire()?;
struct PoolGuard<'a> {
consumer: Option<BaseConsumer>,
pool: &'a TimestampConsumer,
}
impl<'a> Drop for PoolGuard<'a> {
fn drop(&mut self) {
if let Some(consumer) = self.consumer.take() {
self.pool.release(consumer);
}
}
}
let guard = PoolGuard {
consumer: Some(consumer),
pool: self,
};
let consumer = guard.consumer.as_ref().unwrap();
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&tp.topic, tp.partition, Offset::Offset(offset))
.map_err(KlagError::Kafka)?;
consumer.assign(&tpl).map_err(KlagError::Kafka)?;
let result = match consumer.poll(self.fetch_timeout) {
Some(result) => match result {
Ok(msg) => {
let timestamp = msg.timestamp().to_millis();
debug!(
topic = %tp.topic,
partition = tp.partition,
requested_offset = offset,
actual_offset = msg.offset(),
timestamp = ?timestamp,
"Fetched message timestamp"
);
Ok(timestamp.map(|ts| TimestampFetchResult { timestamp_ms: ts }))
}
Err(e) => {
warn!(
topic = %tp.topic,
partition = tp.partition,
error = %e,
"Failed to fetch message"
);
Err(KlagError::Kafka(e))
}
},
None => {
debug!(
topic = %tp.topic,
partition = tp.partition,
offset = offset,
"No message available at offset (may be beyond high watermark)"
);
Ok(None)
}
};
result
}
pub fn recycle_pool(&self) -> Result<()> {
let mut new_consumers = Vec::with_capacity(self.pool_size);
for _ in 0..self.pool_size {
new_consumers.push(self.create_consumer()?);
}
let mut pool = self
.pool
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*pool = new_consumers;
debug!(
cluster = %self.cluster_name,
pool_size = self.pool_size,
"Recycled timestamp consumer pool"
);
Ok(())
}
#[allow(dead_code)]
pub fn fetch_timestamps_batch(
&self,
requests: &[(TopicPartition, i64)],
) -> Vec<(TopicPartition, Result<Option<TimestampFetchResult>>)> {
requests
.iter()
.map(|(tp, offset)| {
let result = self.fetch_timestamp(tp, *offset);
(tp.clone(), result)
})
.collect()
}
}
impl std::fmt::Debug for TimestampConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimestampConsumer")
.field("cluster", &self.cluster_name)
.field("pool_size", &self.pool_size)
.finish()
}
}