use rdkafka::client::ClientContext;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::error::KafkaError;
use rdkafka::statistics::Statistics;
use std::collections::HashMap;
use std::sync::RwLock;
#[derive(Debug, Clone, Default)]
pub struct KafkaMetrics {
pub messages_sent: i64,
pub messages_received: i64,
pub bytes_sent: i64,
pub bytes_received: i64,
pub queue_message_count: u64,
pub queue_byte_count: u64,
pub brokers: HashMap<String, BrokerMetrics>,
pub partition_lag: HashMap<(String, i32), i64>,
pub partition_committed: HashMap<(String, i32), i64>,
pub partition_high_watermark: HashMap<(String, i32), i64>,
pub consumer_group_state: Option<String>,
pub rebalance_count: i64,
pub rebalance_age_ms: i64,
pub timestamp: i64,
}
#[derive(Debug, Clone, Default)]
pub struct BrokerMetrics {
pub state: String,
pub rtt_avg_ms: f64,
pub rtt_p99_ms: f64,
pub throttle_time_ms: i64,
pub outbuf_msg_cnt: i64,
pub waitresp_cnt: i64,
pub requests_sent: u64,
pub responses_received: u64,
pub request_errors: u64,
}
#[derive(Debug)]
pub struct StatsContext {
stats: RwLock<Option<Statistics>>,
latest_metrics: RwLock<KafkaMetrics>,
}
impl Default for StatsContext {
fn default() -> Self {
Self::new()
}
}
impl StatsContext {
#[must_use]
pub fn new() -> Self {
Self {
stats: RwLock::new(None),
latest_metrics: RwLock::new(KafkaMetrics::default()),
}
}
#[must_use]
pub fn get_metrics(&self) -> KafkaMetrics {
self.latest_metrics
.read()
.map(|m| m.clone())
.unwrap_or_default()
}
#[must_use]
pub fn get_raw_stats(&self) -> Option<Statistics> {
self.stats.read().ok().and_then(|s| s.clone())
}
fn convert_stats(stats: &Statistics) -> KafkaMetrics {
let mut metrics = KafkaMetrics {
messages_sent: stats.txmsgs,
messages_received: stats.rxmsgs,
bytes_sent: stats.tx_bytes,
bytes_received: stats.rx_bytes,
queue_message_count: stats.msg_cnt,
queue_byte_count: stats.msg_size,
timestamp: stats.time,
..Default::default()
};
for (name, broker) in &stats.brokers {
let rtt_avg_ms = broker.rtt.as_ref().map_or(0.0, |w| w.avg as f64 / 1000.0);
let rtt_p99_ms = broker.rtt.as_ref().map_or(0.0, |w| w.p99 as f64 / 1000.0);
let throttle_time_ms = broker.throttle.as_ref().map_or(0, |w| w.sum);
metrics.brokers.insert(
name.clone(),
BrokerMetrics {
state: broker.state.clone(),
rtt_avg_ms,
rtt_p99_ms,
throttle_time_ms,
outbuf_msg_cnt: broker.outbuf_msg_cnt,
waitresp_cnt: broker.waitresp_cnt,
requests_sent: broker.tx,
responses_received: broker.rx,
request_errors: broker.txerrs,
},
);
}
for (topic_name, topic) in &stats.topics {
for (partition_id, partition) in &topic.partitions {
let key = (topic_name.clone(), *partition_id);
if partition.consumer_lag >= 0 {
metrics
.partition_lag
.insert(key.clone(), partition.consumer_lag);
}
if partition.committed_offset >= 0 {
metrics
.partition_committed
.insert(key.clone(), partition.committed_offset);
}
if partition.hi_offset >= 0 {
metrics
.partition_high_watermark
.insert(key, partition.hi_offset);
}
}
}
if let Some(ref cgrp) = stats.cgrp {
metrics.consumer_group_state = Some(cgrp.state.clone());
metrics.rebalance_count = cgrp.rebalance_cnt;
metrics.rebalance_age_ms = cgrp.rebalance_age;
}
metrics
}
}
impl ClientContext for StatsContext {
fn stats(&self, statistics: Statistics) {
let metrics = Self::convert_stats(&statistics);
if let Ok(mut lock) = self.latest_metrics.write() {
*lock = metrics;
}
if let Ok(mut lock) = self.stats.write() {
*lock = Some(statistics);
}
#[cfg(feature = "metrics")]
self.emit_prometheus_metrics();
}
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
| RDKafkaLogLevel::Error => {
#[cfg(feature = "logger")]
tracing::error!(target: "librdkafka", facility = fac, "{}", log_message);
#[cfg(not(feature = "logger"))]
eprintln!("ERROR librdkafka: {} {}", fac, log_message);
}
RDKafkaLogLevel::Warning => {
#[cfg(feature = "logger")]
tracing::warn!(target: "librdkafka", facility = fac, "{}", log_message);
#[cfg(not(feature = "logger"))]
eprintln!("WARN librdkafka: {} {}", fac, log_message);
}
RDKafkaLogLevel::Notice | RDKafkaLogLevel::Info => {
#[cfg(feature = "logger")]
tracing::debug!(target: "librdkafka", facility = fac, "{}", log_message);
#[cfg(not(feature = "logger"))]
{}
}
RDKafkaLogLevel::Debug => {
#[cfg(feature = "logger")]
tracing::debug!(target: "librdkafka", facility = fac, "{}", log_message);
#[cfg(not(feature = "logger"))]
{}
}
}
}
fn error(&self, error: KafkaError, reason: &str) {
#[cfg(feature = "logger")]
tracing::error!(target: "librdkafka", error = %error, "{}", reason);
#[cfg(not(feature = "logger"))]
eprintln!("ERROR librdkafka: {}: {}", error, reason);
}
}
impl StatsContext {
#[cfg(feature = "metrics")]
pub fn emit_prometheus_metrics(&self) {
let m = self.get_metrics();
metrics::gauge!("rdkafka_global_msg_cnt").set(m.queue_message_count as f64);
metrics::gauge!("rdkafka_global_msg_size_bytes").set(m.queue_byte_count as f64);
for (name, broker) in &m.brokers {
metrics::gauge!(
"rdkafka_broker_rtt_avg_seconds",
"broker" => name.clone()
)
.set(broker.rtt_avg_ms / 1000.0);
metrics::gauge!(
"rdkafka_broker_outbuf_cnt",
"broker" => name.clone()
)
.set(broker.outbuf_msg_cnt as f64);
metrics::gauge!(
"rdkafka_broker_waitresp_cnt",
"broker" => name.clone()
)
.set(broker.waitresp_cnt as f64);
}
let max_partitions = 256;
for (i, ((topic, partition), lag)) in m.partition_lag.iter().enumerate() {
if i >= max_partitions {
break;
}
metrics::gauge!(
"rdkafka_topic_partition_consumer_lag",
"topic" => topic.clone(),
"partition" => partition.to_string()
)
.set(*lag as f64);
}
for (i, ((topic, partition), offset)) in m.partition_committed.iter().enumerate() {
if i >= max_partitions {
break;
}
metrics::gauge!(
"rdkafka_topic_partition_committed_offset",
"topic" => topic.clone(),
"partition" => partition.to_string()
)
.set(*offset as f64);
}
if m.rebalance_count > 0 {
metrics::gauge!("rdkafka_consumer_rebalance_count").set(m.rebalance_count as f64);
}
}
}
impl rdkafka::consumer::ConsumerContext for StatsContext {}
impl rdkafka::producer::ProducerContext for StatsContext {
type DeliveryOpaque = ();
fn delivery(
&self,
_result: &rdkafka::producer::DeliveryResult<'_>,
_opaque: Self::DeliveryOpaque,
) {
}
}
#[must_use]
pub fn total_consumer_lag(metrics: &KafkaMetrics) -> i64 {
metrics.partition_lag.values().sum()
}
#[must_use]
pub fn healthy_broker_count(metrics: &KafkaMetrics) -> usize {
metrics.brokers.values().filter(|b| b.state == "UP").count()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stats_context_creation() {
let ctx = StatsContext::new();
let metrics = ctx.get_metrics();
assert_eq!(metrics.messages_sent, 0);
assert_eq!(metrics.messages_received, 0);
}
#[test]
fn test_kafka_metrics_default() {
let metrics = KafkaMetrics::default();
assert_eq!(metrics.messages_sent, 0);
assert!(metrics.brokers.is_empty());
assert!(metrics.partition_lag.is_empty());
}
#[test]
fn test_broker_metrics_default() {
let metrics = BrokerMetrics::default();
assert_eq!(metrics.state, "");
assert!(metrics.rtt_avg_ms.abs() < f64::EPSILON);
}
#[test]
fn test_total_consumer_lag() {
let mut metrics = KafkaMetrics::default();
metrics.partition_lag.insert(("topic".to_string(), 0), 100);
metrics.partition_lag.insert(("topic".to_string(), 1), 200);
metrics.partition_lag.insert(("topic".to_string(), 2), 50);
assert_eq!(total_consumer_lag(&metrics), 350);
}
#[test]
fn test_healthy_broker_count() {
let mut metrics = KafkaMetrics::default();
metrics.brokers.insert(
"broker1".to_string(),
BrokerMetrics {
state: "UP".to_string(),
..Default::default()
},
);
metrics.brokers.insert(
"broker2".to_string(),
BrokerMetrics {
state: "DOWN".to_string(),
..Default::default()
},
);
metrics.brokers.insert(
"broker3".to_string(),
BrokerMetrics {
state: "UP".to_string(),
..Default::default()
},
);
assert_eq!(healthy_broker_count(&metrics), 2);
}
#[test]
fn test_stats_context_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<StatsContext>();
}
}