use std::sync::Arc;
use std::time::Duration;
use crate::http_client::{self, FetchError, HttpClient};
use crate::ingest::auth_header::{AuthHeader, ScraperAuthOutcome, parse_scraper_auth_header};
use crate::report::metrics::{MetricsState, ScaphandreScrapeReason};
use super::config::ScaphandreConfig;
use super::ops::{OpsSnapshotDiff, apply_scrape};
use super::parser::parse_scaphandre_metrics;
use super::state::{ScaphandreState, monotonic_ms};
const UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD: u32 = 3;
pub(super) async fn fetch_metrics_once(
client: &HttpClient,
uri: &hyper::Uri,
auth: Option<&AuthHeader>,
) -> Result<String, ScraperError> {
let bytes = http_client::fetch_get(
client,
uri,
"perf-sentinel/scaphandre-scraper",
Duration::from_secs(3),
auth,
)
.await
.map_err(ScraperError::Fetch)?;
String::from_utf8(bytes.to_vec()).map_err(ScraperError::Utf8)
}
#[derive(Debug, thiserror::Error)]
pub(super) enum ScraperError {
#[error("invalid Scaphandre endpoint URI '{endpoint}'")]
InvalidUri {
endpoint: String,
#[source]
source: hyper::http::uri::InvalidUri,
},
#[error("Scaphandre fetch failed")]
Fetch(#[source] FetchError),
#[error("Scaphandre response was not valid UTF-8")]
Utf8(#[source] std::string::FromUtf8Error),
}
pub(super) fn scraper_error_reason(err: &ScraperError) -> ScaphandreScrapeReason {
match err {
ScraperError::Fetch(fe) => fetch_error_reason(fe),
ScraperError::Utf8(_) => ScaphandreScrapeReason::InvalidUtf8,
ScraperError::InvalidUri { .. } => ScaphandreScrapeReason::RequestError,
}
}
fn fetch_error_reason(err: &FetchError) -> ScaphandreScrapeReason {
match err {
FetchError::Transport(_) => ScaphandreScrapeReason::Unreachable,
FetchError::Timeout => ScaphandreScrapeReason::Timeout,
FetchError::HttpStatus(_) => ScaphandreScrapeReason::HttpError,
FetchError::BodyRead(_) => ScaphandreScrapeReason::BodyReadError,
FetchError::RequestBuild(_) => ScaphandreScrapeReason::RequestError,
}
}
#[must_use]
pub fn spawn_scraper(
cfg: ScaphandreConfig,
state: Arc<ScaphandreState>,
metrics: Arc<MetricsState>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run_scraper_loop(cfg, state, metrics).await;
})
}
async fn run_scraper_loop(
cfg: ScaphandreConfig,
state: Arc<ScaphandreState>,
metrics: Arc<MetricsState>,
) {
use std::str::FromStr;
let uri = match hyper::Uri::from_str(&cfg.endpoint) {
Ok(u) => u,
Err(e) => {
let err = ScraperError::InvalidUri {
endpoint: cfg.endpoint.clone(),
source: e,
};
tracing::error!(error = %err, "Scaphandre scraper aborting on invalid endpoint");
return;
}
};
let redacted = http_client::redact_endpoint(&uri);
let parsed_auth: Option<AuthHeader> = match parse_scraper_auth_header(
cfg.auth_header.as_deref(),
&cfg.endpoint,
&redacted,
"scaphandre",
) {
ScraperAuthOutcome::Invalid => return,
ScraperAuthOutcome::None => None,
ScraperAuthOutcome::Some(h) => Some(h),
};
let client = http_client::build_client();
let mut ticker = tokio::time::interval(cfg.scrape_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
let mut snapshot_diff = OpsSnapshotDiff::default();
let mut first_failure_warned = false;
let mut consecutive_failures: u32 = 0;
let mut unsupported_platform_warned = false;
tracing::info!(
endpoint = %redacted,
scrape_interval_secs = cfg.scrape_interval.as_secs(),
process_count = cfg.process_map.len(),
"Scaphandre scraper started"
);
loop {
ticker.tick().await;
let current_ops = metrics.snapshot_service_io_ops();
let deltas = snapshot_diff.delta_and_advance(current_ops);
match fetch_metrics_once(&client, &uri, parsed_auth.as_ref()).await {
Ok(body) => {
first_failure_warned = false;
consecutive_failures = 0;
let readings = parse_scaphandre_metrics(&body);
let now = monotonic_ms();
apply_scrape(&state, &readings, &deltas, &cfg, now);
metrics.scaphandre_last_scrape_age_seconds.set(0.0);
metrics.scaphandre_scrape_success.inc();
tracing::debug!(
readings = readings.len(),
services_updated = deltas.len(),
"Scaphandre scrape succeeded"
);
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
let reason = scraper_error_reason(&e);
metrics.scaphandre_scrape_failed.inc();
metrics
.scaphandre_scrape_failed_total
.with_label_values(&[reason.as_str()])
.inc();
if first_failure_warned {
tracing::debug!(error = %e, "Scaphandre scrape failed again");
} else {
tracing::warn!(
error = %e,
endpoint = %redacted,
"Scaphandre scrape failed; subsequent failures will log at debug level"
);
first_failure_warned = true;
}
if !unsupported_platform_warned
&& consecutive_failures >= UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD
{
tracing::warn!(
endpoint = %redacted,
consecutive_failures = consecutive_failures,
"Scaphandre endpoint has been unreachable for {UNSUPPORTED_PLATFORM_FAILURE_THRESHOLD} consecutive scrapes. \
Check that Scaphandre is installed and serving metrics at the configured endpoint. \
Scaphandre requires Linux with Intel/AMD RAPL support; ARM64 and most cloud VMs without RAPL \
passthrough are not supported. See docs/LIMITATIONS.md#scaphandre-precision-bounds. \
The daemon is falling back to the proxy model for all services."
);
unsupported_platform_warned = true;
}
}
}
}
}