use crate::collector::lag_calculator::LagCalculator;
use crate::collector::offset_collector::OffsetCollector;
use crate::collector::timestamp_sampler::TimestampSampler;
use crate::config::{ClusterConfig, ExporterConfig, Granularity};
use crate::error::Result;
use crate::kafka::client::KafkaClient;
use crate::kafka::TimestampConsumer;
use crate::leadership::LeadershipStatus;
use crate::metrics::registry::MetricsRegistry;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tracing::{debug, error, info, instrument, warn};
const DEFAULT_COLLECTION_TIMEOUT: Duration = Duration::from_secs(60);
pub struct ClusterManager {
cluster_name: String,
cluster_labels: HashMap<String, String>,
client: Arc<KafkaClient>,
offset_collector: OffsetCollector,
timestamp_sampler: Option<TimestampSampler>,
registry: Arc<MetricsRegistry>,
poll_interval: Duration,
max_backoff: Duration,
granularity: Granularity,
max_concurrent_fetches: usize,
cache_cleanup_interval: Duration,
collection_timeout: Duration,
client_recycle_interval: u64,
}
impl ClusterManager {
pub fn new(
config: ClusterConfig,
registry: Arc<MetricsRegistry>,
exporter_config: &ExporterConfig,
) -> Result<Self> {
let cluster_name = config.name.clone();
let cluster_labels = config.labels.clone();
let filters = config.compile_filters()?;
let performance = exporter_config.performance.clone();
let client = Arc::new(KafkaClient::with_performance(&config, performance.clone())?);
let offset_collector = OffsetCollector::with_performance(
Arc::clone(&client),
filters,
performance.clone(),
exporter_config.granularity,
);
let ts_cfg = &exporter_config.timestamp_sampling;
let timestamp_sampler = if ts_cfg.enabled {
Some(match ts_cfg.mode {
crate::config::TimestampSamplingMode::Message => {
let ts_consumer =
TimestampConsumer::with_pool_size(&config, ts_cfg.max_concurrent_fetches)?;
TimestampSampler::new_message(ts_consumer, ts_cfg.cache_ttl)
}
crate::config::TimestampSamplingMode::Rate => {
let rs = crate::collector::rate_sampler::RateSampler::new(
ts_cfg.rate_history_samples,
ts_cfg.rate_history_max_age,
ts_cfg.rate_min_msgs_per_sec,
);
TimestampSampler::new_rate(rs)
}
})
} else {
None
};
info!(
cluster = cluster_name,
timestamp_sampling = ts_cfg.enabled,
timestamp_sampling_mode = ?ts_cfg.mode,
poll_interval = ?exporter_config.poll_interval,
granularity = ?exporter_config.granularity,
max_concurrent_fetches = ts_cfg.max_concurrent_fetches,
custom_labels = ?cluster_labels,
"Created cluster manager"
);
let collection_timeout = if exporter_config.poll_interval > Duration::from_secs(10) {
exporter_config.poll_interval - Duration::from_secs(5)
} else {
DEFAULT_COLLECTION_TIMEOUT.min(exporter_config.poll_interval)
};
Ok(Self {
cluster_name,
cluster_labels,
client,
offset_collector,
timestamp_sampler,
registry,
poll_interval: exporter_config.poll_interval,
max_backoff: Duration::from_secs(300),
granularity: exporter_config.granularity,
max_concurrent_fetches: exporter_config.timestamp_sampling.max_concurrent_fetches,
cache_cleanup_interval: exporter_config.timestamp_sampling.cache_ttl * 2,
collection_timeout,
client_recycle_interval: exporter_config.performance.client_recycle_interval,
})
}
#[instrument(skip(self, shutdown, leadership), fields(cluster = %self.cluster_name))]
pub async fn run(self, mut shutdown: broadcast::Receiver<()>, leadership: LeadershipStatus) {
info!(cluster = %self.cluster_name, "Starting collection loop");
let mut interval = tokio::time::interval(self.poll_interval);
let mut cache_cleanup_interval = tokio::time::interval(self.cache_cleanup_interval);
let mut consecutive_errors = 0u32;
let mut current_backoff = Duration::from_secs(1);
let mut was_leader = leadership.is_leader();
let mut cycle_count: u64 = 0;
if !was_leader {
info!(
cluster = %self.cluster_name,
"Starting in standby mode - waiting for leadership"
);
}
loop {
tokio::select! {
_ = interval.tick() => {
let is_leader = leadership.is_leader();
if is_leader != was_leader {
if is_leader {
info!(cluster = %self.cluster_name, "Acquired leadership - starting collection");
} else {
info!(cluster = %self.cluster_name, "Lost leadership - pausing collection");
self.registry.remove_cluster(&self.cluster_name);
}
was_leader = is_leader;
}
if !is_leader {
debug!(cluster = %self.cluster_name, "Standby mode - skipping collection");
continue;
}
let collection_result = tokio::time::timeout(
self.collection_timeout,
self.collect_once()
).await;
match collection_result {
Ok(Ok(())) => {
consecutive_errors = 0;
current_backoff = Duration::from_secs(1);
self.registry.set_healthy(true);
if self.client_recycle_interval > 0 {
cycle_count += 1;
if cycle_count >= self.client_recycle_interval {
cycle_count = 0;
let client = Arc::clone(&self.client);
let sampler = self.timestamp_sampler.clone();
let cluster = self.cluster_name.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Err(e) = client.recycle() {
warn!(
cluster = %cluster,
error = %e,
"Failed to recycle Kafka clients"
);
}
if let Some(ref sampler) = sampler {
if let Err(e) = sampler.recycle_pool() {
warn!(
cluster = %cluster,
error = %e,
"Failed to recycle timestamp consumer pool"
);
}
}
}).await;
}
}
}
Ok(Err(e)) => {
consecutive_errors += 1;
error!(
cluster = %self.cluster_name,
error = %e,
consecutive_errors = consecutive_errors,
"Collection failed"
);
if consecutive_errors >= 3 {
self.registry.set_healthy(false);
let backoff = current_backoff.min(self.max_backoff);
warn!(
cluster = %self.cluster_name,
backoff_secs = backoff.as_secs(),
"Applying backoff due to consecutive errors"
);
tokio::time::sleep(backoff).await;
current_backoff = (current_backoff * 2).min(self.max_backoff);
}
}
Err(_timeout) => {
consecutive_errors += 1;
error!(
cluster = %self.cluster_name,
timeout_secs = self.collection_timeout.as_secs(),
consecutive_errors = consecutive_errors,
"Collection timed out"
);
if consecutive_errors >= 3 {
self.registry.set_healthy(false);
}
}
}
}
_ = cache_cleanup_interval.tick() => {
if leadership.is_leader() {
if let Some(ref sampler) = self.timestamp_sampler {
let before = sampler.cache_size();
sampler.clear_stale_entries();
let after = sampler.cache_size();
if before != after {
debug!(
cluster = %self.cluster_name,
before = before,
after = after,
"Cleaned up stale cache entries"
);
}
}
}
}
_ = shutdown.recv() => {
info!(cluster = %self.cluster_name, "Received shutdown signal");
break;
}
}
}
self.registry.remove_cluster(&self.cluster_name);
info!(cluster = %self.cluster_name, "Collection loop stopped");
}
#[instrument(skip(self), fields(cluster = %self.cluster_name))]
async fn collect_once(&self) -> Result<()> {
let start = Instant::now();
let snapshot = self.offset_collector.collect_parallel().await?;
debug!(
cluster = %self.cluster_name,
groups = snapshot.groups.len(),
partitions = snapshot.watermarks.len(),
compacted_topics = snapshot.compacted_topics.len(),
"Collected offsets"
);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let timestamps = if let Some(ref sampler) = self.timestamp_sampler {
sampler
.compute_time_lags(&snapshot, now_ms, self.max_concurrent_fetches)
.await
} else {
HashMap::new()
};
let poll_time_ms = start.elapsed().as_millis() as u64;
let lag_metrics = LagCalculator::calculate(
&snapshot,
×tamps,
now_ms,
poll_time_ms,
&snapshot.compacted_topics,
);
self.registry.update_with_options(
&self.cluster_name,
lag_metrics,
self.granularity,
&self.cluster_labels,
);
let scrape_duration_ms = start.elapsed().as_millis() as u64;
self.registry.set_scrape_duration_ms(scrape_duration_ms);
debug!(
cluster = %self.cluster_name,
elapsed_ms = scrape_duration_ms,
timestamp_cache_size = self.timestamp_sampler.as_ref().map(|s| s.cache_size()).unwrap_or(0),
"Collection cycle completed"
);
Ok(())
}
}
impl std::fmt::Debug for ClusterManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClusterManager")
.field("cluster_name", &self.cluster_name)
.field("poll_interval", &self.poll_interval)
.field("granularity", &self.granularity)
.finish()
}
}