use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::http_client::{self, FetchError, HttpClient};
use crate::ingest::auth_header::{AuthHeader, ScraperAuthOutcome, parse_scraper_auth_header};
use crate::report::metrics::{KeplerScrapeReason, MetricsState};
use crate::score::ops_snapshot_diff::OpsSnapshotDiff;
use super::apply::process_scrape;
use super::config::KeplerConfig;
use super::parser::parse_kepler_metrics;
use super::state::{KeplerState, monotonic_ms};
const UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD: u32 = 3;
const ZERO_SAMPLE_WARN_THRESHOLD: u32 = 3;
pub(super) async fn fetch_metrics_once(
client: &HttpClient,
uri: &hyper::Uri,
auth: Option<&AuthHeader>,
) -> Result<String, ScraperError> {
let bytes = http_client::fetch_get(
client,
uri,
"perf-sentinel/kepler-scraper",
Duration::from_secs(3),
auth,
)
.await
.map_err(ScraperError::Fetch)?;
String::from_utf8(bytes.to_vec()).map_err(ScraperError::Utf8)
}
#[derive(Debug, thiserror::Error)]
pub(super) enum ScraperError {
#[error("Kepler fetch failed")]
Fetch(#[source] FetchError),
#[error("Kepler response was not valid UTF-8")]
Utf8(#[source] std::string::FromUtf8Error),
}
pub(super) fn scraper_error_reason(err: &ScraperError) -> KeplerScrapeReason {
match err {
ScraperError::Fetch(fe) => fetch_error_reason(fe),
ScraperError::Utf8(_) => KeplerScrapeReason::InvalidUtf8,
}
}
fn fetch_error_reason(err: &FetchError) -> KeplerScrapeReason {
match err {
FetchError::Transport(_) => KeplerScrapeReason::Unreachable,
FetchError::Timeout => KeplerScrapeReason::Timeout,
FetchError::HttpStatus(_) => KeplerScrapeReason::HttpError,
FetchError::BodyRead(_) => KeplerScrapeReason::BodyReadError,
FetchError::RequestBuild(_) => KeplerScrapeReason::RequestError,
}
}
#[must_use]
pub fn spawn_scraper(
cfg: KeplerConfig,
state: Arc<KeplerState>,
metrics: Arc<MetricsState>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run_scraper_loop(cfg, state, metrics).await;
})
}
async fn run_scraper_loop(cfg: KeplerConfig, state: Arc<KeplerState>, metrics: Arc<MetricsState>) {
use std::str::FromStr;
let uri = match hyper::Uri::from_str(&cfg.endpoint) {
Ok(u) => u,
Err(e) => {
tracing::error!(
endpoint = %http_client::redact_endpoint_str(&cfg.endpoint),
error = %e,
"Kepler scraper aborting on invalid endpoint URI"
);
return;
}
};
let redacted = http_client::redact_endpoint(&uri);
let parsed_auth: Option<AuthHeader> = match parse_scraper_auth_header(
cfg.auth_header.as_deref(),
&cfg.endpoint,
&redacted,
"kepler",
) {
ScraperAuthOutcome::Invalid => return,
ScraperAuthOutcome::None => None,
ScraperAuthOutcome::Some(h) => Some(h),
};
let client = http_client::build_client();
let mut ticker = tokio::time::interval(cfg.scrape_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
let mut snapshot_diff = OpsSnapshotDiff::default();
let mut last_raw_joules: HashMap<String, f64> =
HashMap::with_capacity(cfg.service_mappings.len());
let mut failure_streak_warned = false;
let mut consecutive_failures: u32 = 0;
let mut unsupported_platform_warned = false;
let mut consecutive_zero_sample_ticks: u32 = 0;
let mut zero_sample_warned = false;
let mut last_success_ms: u64 = monotonic_ms();
let metric_name = cfg.metric_kind.metric_name();
let label_key = cfg.metric_kind.label_key();
tracing::info!(
endpoint = %redacted,
scrape_interval_secs = cfg.scrape_interval.as_secs(),
metric = metric_name,
service_count = cfg.service_mappings.len(),
"Kepler scraper started"
);
loop {
ticker.tick().await;
let current_ops = metrics.snapshot_service_io_ops();
let deltas = snapshot_diff.delta_and_advance(current_ops);
match fetch_metrics_once(&client, &uri, parsed_auth.as_ref()).await {
Ok(body) => {
failure_streak_warned = false;
consecutive_failures = 0;
let samples = parse_kepler_metrics(&body, metric_name, label_key);
let now = monotonic_ms();
process_scrape(&state, &samples, &deltas, &cfg, &mut last_raw_joules, now);
last_success_ms = now;
metrics.kepler_last_scrape_age_seconds.set(0.0);
metrics.kepler_scrape_success.inc();
track_zero_sample_streak(
samples.len(),
deltas.len(),
&redacted,
metric_name,
label_key,
&mut consecutive_zero_sample_ticks,
&mut zero_sample_warned,
);
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
consecutive_zero_sample_ticks = 0;
zero_sample_warned = false;
handle_kepler_failure(
&e,
&metrics,
&redacted,
last_success_ms,
monotonic_ms(),
consecutive_failures,
&mut failure_streak_warned,
&mut unsupported_platform_warned,
);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_kepler_failure(
err: &ScraperError,
metrics: &MetricsState,
redacted: &str,
last_success_ms: u64,
now_ms: u64,
consecutive_failures: u32,
failure_streak_warned: &mut bool,
unsupported_platform_warned: &mut bool,
) {
let age_secs = now_ms.saturating_sub(last_success_ms) as f64 / 1000.0;
metrics.kepler_last_scrape_age_seconds.set(age_secs);
let reason = scraper_error_reason(err);
metrics.kepler_scrape_failed.inc();
metrics
.kepler_scrape_failed_total
.with_label_values(&[reason.as_str()])
.inc();
if *failure_streak_warned {
tracing::debug!(error = %err, "Kepler scrape failed again");
} else {
tracing::warn!(
error = %err,
endpoint = %redacted,
"Kepler scrape failed; subsequent failures will log at debug level"
);
*failure_streak_warned = true;
}
if !*unsupported_platform_warned
&& consecutive_failures >= UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD
{
tracing::warn!(
endpoint = %redacted,
consecutive_failures = consecutive_failures,
"Kepler endpoint has been unreachable for {UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD} consecutive scrapes. \
Check that Kepler is installed and serving metrics at the configured endpoint. \
The daemon is falling back through the precedence chain for affected services. \
See docs/LIMITATIONS.md#kepler-precision-bounds."
);
*unsupported_platform_warned = true;
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn track_zero_sample_streak(
samples_len: usize,
services_updated: usize,
redacted: &str,
metric_name: &str,
label_key: &str,
consecutive_zero_sample_ticks: &mut u32,
zero_sample_warned: &mut bool,
) {
if samples_len == 0 {
*consecutive_zero_sample_ticks = consecutive_zero_sample_ticks.saturating_add(1);
if !*zero_sample_warned && *consecutive_zero_sample_ticks >= ZERO_SAMPLE_WARN_THRESHOLD {
let ticks = *consecutive_zero_sample_ticks;
tracing::warn!(
endpoint = %redacted,
metric = metric_name,
label = label_key,
ticks,
"Kepler endpoint replied HTTP 200 but no samples matched \
the configured metric across the last {ticks} ticks. \
Most common cause: the cluster runs a Kepler exporter \
older than v0.10 (legacy metric names without the \
'_cpu_' infix). Other causes: metric_kind mismatched \
with the deployment topology, or service_mappings label \
values that do not exist on the wire.",
);
*zero_sample_warned = true;
}
} else {
*consecutive_zero_sample_ticks = 0;
*zero_sample_warned = false;
}
tracing::debug!(
samples = samples_len,
services_updated = services_updated,
"Kepler scrape succeeded"
);
}