#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(feature = "jemalloc")]
#[used]
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "_rjem_malloc_conf")]
pub static malloc_conf: &[u8] =
b"narenas:2,dirty_decay_ms:5000,muzzy_decay_ms:5000,background_thread:true\0";
mod cluster;
mod collector;
mod config;
mod error;
mod export;
mod http;
mod kafka;
mod leadership;
mod metrics;
use crate::cluster::ClusterManager;
use crate::config::Config;
use crate::export::prometheus::PrometheusExporter;
use crate::http::server::HttpServer;
use crate::leadership::{LeadershipProvider, LeadershipStatus};
use crate::metrics::registry::MetricsRegistry;
use clap::Parser;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::broadcast;
#[cfg(not(feature = "kubernetes"))]
use tracing::warn;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[derive(Parser, Debug)]
#[command(name = "klag-exporter")]
#[command(about = "Kafka consumer group lag exporter with offset and time lag metrics")]
#[command(version)]
struct Args {
#[arg(short, long, default_value = "config.toml")]
config: String,
#[arg(short, long, default_value = "info")]
log_level: String,
}
fn main() -> anyhow::Result<()> {
let args = Args::parse();
init_logging(&args.log_level);
info!("Starting klag-exporter");
let config = Config::load(Some(&args.config))?;
info!(
clusters = config.clusters.len(),
poll_interval = ?config.exporter.poll_interval,
max_blocking_threads = config.exporter.performance.max_blocking_threads,
"Configuration loaded"
);
let watermarks_default = crate::config::PerformanceConfig::default().max_concurrent_watermarks;
if config.exporter.performance.max_concurrent_watermarks != watermarks_default {
info!(
configured_value = config.exporter.performance.max_concurrent_watermarks,
"performance.max_concurrent_watermarks is deprecated and has no effect since the \
batched Admin API replaced per-partition watermark fetch. Safe to remove from your \
config."
);
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.max_blocking_threads(config.exporter.performance.max_blocking_threads)
.build()?;
runtime.block_on(async_main(config))
}
async fn async_main(config: Config) -> anyhow::Result<()> {
let registry = Arc::new(MetricsRegistry::new());
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let (leadership_provider, leadership_status) =
create_leadership_provider(&config.exporter.leadership).await?;
if config.exporter.leadership.enabled {
info!(
provider = ?config.exporter.leadership.provider,
lease_name = %config.exporter.leadership.lease_name,
namespace = %config.exporter.leadership.lease_namespace,
"Leader election enabled"
);
} else {
info!("Running in single-instance mode (leader election disabled)");
}
let mut handles = Vec::new();
for cluster_config in config.clusters.clone() {
let registry = Arc::clone(®istry);
let shutdown_rx = shutdown_tx.subscribe();
let exporter_config = config.exporter.clone();
let leadership = leadership_status.clone();
let handle = tokio::spawn(async move {
let manager =
match ClusterManager::new(cluster_config.clone(), registry, &exporter_config) {
Ok(m) => m,
Err(e) => {
error!(
cluster = cluster_config.name,
error = %e,
"Failed to create cluster manager"
);
return;
}
};
manager.run(shutdown_rx, leadership).await;
});
handles.push(handle);
}
let prometheus_exporter = PrometheusExporter::new(Arc::clone(®istry));
let http_server = HttpServer::new(
&config.exporter.http_host,
config.exporter.http_port,
prometheus_exporter,
Arc::clone(®istry),
leadership_status.clone(),
);
let shutdown_rx = shutdown_tx.subscribe();
let server_handle = tokio::spawn(async move {
if let Err(e) = http_server.run(shutdown_rx).await {
error!(error = %e, "HTTP server error");
}
});
if config.exporter.otel.enabled {
let registry = Arc::clone(®istry);
let otel_config = config.exporter.otel.clone();
let shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
if let Err(e) =
crate::export::otel::run_otel_exporter(registry, otel_config, shutdown_rx).await
{
error!(error = %e, "OpenTelemetry exporter error");
}
});
}
shutdown_signal().await;
info!("Shutdown signal received, stopping...");
let _ = shutdown_tx.send(());
let _ = server_handle.await;
let shutdown_timeout = tokio::time::timeout(
std::time::Duration::from_secs(10),
futures::future::join_all(handles),
);
match shutdown_timeout.await {
Ok(_) => info!("All cluster managers stopped"),
Err(_) => error!("Timeout waiting for cluster managers to stop"),
}
leadership_provider.stop().await;
info!("Leadership provider stopped");
info!("klag-exporter stopped");
Ok(())
}
async fn create_leadership_provider(
config: &crate::config::LeadershipConfig,
) -> anyhow::Result<(Box<dyn LeadershipProvider>, LeadershipStatus)> {
if !config.enabled {
let provider = leadership::noop::NoopLeader::new();
let status = provider.start().await?;
return Ok((Box::new(provider), status));
}
match config.provider {
crate::config::LeadershipProvider::Kubernetes => {
#[cfg(feature = "kubernetes")]
{
let provider = leadership::kubernetes::create_kubernetes_leader(
&config.lease_name,
&config.lease_namespace,
config.identity.as_deref(),
Some(config.lease_duration_secs),
Some(config.grace_period_secs),
)?;
let status = provider.start().await?;
Ok((Box::new(provider), status))
}
#[cfg(not(feature = "kubernetes"))]
{
warn!(
"Kubernetes leadership is enabled in config but the 'kubernetes' feature is not compiled in. \
Falling back to single-instance mode. \
Rebuild with --features kubernetes to enable leader election."
);
let provider = leadership::noop::NoopLeader::new();
let status = provider.start().await?;
Ok((Box::new(provider), status))
}
}
}
}
fn init_logging(level: &str) {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level));
tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer())
.init();
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}