use std::collections::{HashMap, HashSet};
use std::time::Duration;
use crate::http_client::build_client as build_scraper_client;
use crate::score::ops_snapshot_diff::OpsSnapshotDiff;
use super::config::{ProcessMatcher, ScaphandreConfig};
use super::ops::{apply_scrape, compute_energy_per_op_kwh};
use super::parser::{ProcessPower, parse_scaphandre_metrics};
use super::scraper::{
ScraperError, ZERO_SAMPLE_WARN_MARKER, fetch_metrics_once, scraper_error_reason, spawn_scraper,
track_zero_sample_streak,
};
use super::state::ScaphandreState;
#[test]
fn parse_empty_body() {
assert!(parse_scaphandre_metrics("").is_empty());
}
#[test]
fn parse_comments_only() {
let body = "# HELP scaph_host_power_microwatts host power\n\
# TYPE scaph_host_power_microwatts gauge\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_single_process_power() {
let body = r#"# HELP scaph_process_power_consumption_microwatts per-process power
# TYPE scaph_process_power_consumption_microwatts gauge
scaph_process_power_consumption_microwatts{exe="java",cmdline="java -jar app.jar",pid="1234"} 12500000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "java");
assert_eq!(parsed[0].cmdline, "java -jar app.jar");
assert!((parsed[0].power_microwatts - 12_500_000.0).abs() < f64::EPSILON);
}
#[test]
fn parse_preserves_cmdline_when_argv_concatenated() {
let body = r#"scaph_process_power_consumption_microwatts{pid="3139",cmdline="java-jar/tmp/svc-a.jar",exe="/usr/lib/jvm/temurin-25-jdk-amd64/bin/java"} 0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "/usr/lib/jvm/temurin-25-jdk-amd64/bin/java");
assert_eq!(parsed[0].cmdline, "java-jar/tmp/svc-a.jar");
}
#[test]
fn parse_defaults_cmdline_to_empty_when_absent() {
let body = "scaph_process_power_consumption_microwatts{exe=\"native\"} 42.0\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "native");
assert_eq!(parsed[0].cmdline, "");
}
#[test]
fn parse_skips_other_metrics() {
let body = r#"scaph_host_power_microwatts 50000000.0
scaph_socket_power_microwatts{socket_id="0"} 25000000.0
scaph_process_power_consumption_microwatts{exe="java"} 8000000.0
scaph_process_power_consumption_microwatts{exe="dotnet"} 3000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 2);
assert!(parsed.iter().any(|p| p.exe == "java"));
assert!(parsed.iter().any(|p| p.exe == "dotnet"));
}
#[test]
fn parse_handles_escaped_quotes_in_cmdline() {
let body = r#"scaph_process_power_consumption_microwatts{exe="java",cmdline="java -Dfoo=\"bar baz\" -jar app.jar"} 12000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "java");
}
#[test]
fn parse_handles_escaped_backslash() {
let body = r#"scaph_process_power_consumption_microwatts{exe="weird\\path",cmdline="..."} 100.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "weird\\path");
}
#[test]
fn parse_skips_malformed_lines() {
let body = r#"scaph_process_power_consumption_microwatts{exe="java"} not_a_number
scaph_process_power_consumption_microwatts broken no braces
scaph_process_power_consumption_microwatts{exe="dotnet"} 5000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "dotnet");
}
#[test]
fn parse_unmatched_brace_is_skipped() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\",cmdline=\"broken 100.0\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_unescapes_newline_escape() {
let body = "scaph_process_power_consumption_microwatts{exe=\"multi\\nline\"} 1.0\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "multi\nline");
}
#[test]
fn compute_energy_per_op_basic() {
let got = compute_energy_per_op_kwh(12_000_000.0, 5.0, 8000).unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 8000.0;
assert!((got - expected).abs() < 1e-18);
}
#[test]
fn compute_energy_per_op_zero_ops_returns_none() {
assert!(compute_energy_per_op_kwh(12_000_000.0, 5.0, 0).is_none());
}
#[test]
fn compute_energy_per_op_negative_power_returns_none() {
assert!(compute_energy_per_op_kwh(-1.0, 5.0, 100).is_none());
}
#[test]
fn compute_energy_per_op_nan_returns_none() {
assert!(compute_energy_per_op_kwh(f64::NAN, 5.0, 100).is_none());
assert!(compute_energy_per_op_kwh(f64::INFINITY, 5.0, 100).is_none());
}
#[test]
fn ops_snapshot_diff_first_call_counts_all() {
let mut diff = OpsSnapshotDiff::default();
let mut current = HashMap::new();
current.insert("order-svc".to_string(), 100u64);
current.insert("chat-svc".to_string(), 50u64);
let deltas = diff.delta_and_advance(current);
assert_eq!(deltas.get("order-svc"), Some(&100));
assert_eq!(deltas.get("chat-svc"), Some(&50));
}
#[test]
fn ops_snapshot_diff_second_call_subtracts() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first);
let mut second = HashMap::new();
second.insert("order-svc".to_string(), 160u64);
let deltas = diff.delta_and_advance(second);
assert_eq!(deltas.get("order-svc"), Some(&60));
}
#[test]
fn ops_snapshot_diff_no_change_produces_empty() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first.clone());
let deltas = diff.delta_and_advance(first);
assert!(deltas.is_empty());
}
#[test]
fn ops_snapshot_diff_counter_reset_produces_zero_delta() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first);
let mut second = HashMap::new();
second.insert("order-svc".to_string(), 10u64);
let deltas = diff.delta_and_advance(second);
assert!(!deltas.contains_key("order-svc"));
}
fn test_scrape_fixture(
power_microwatts: f64,
ops: u64,
) -> (
ScaphandreState,
Vec<ProcessPower>,
HashMap<String, u64>,
ScaphandreConfig,
) {
let state = ScaphandreState::default();
let readings = vec![ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/order-svc.jar".to_string(),
power_microwatts,
}];
let mut deltas = HashMap::new();
if ops > 0 {
deltas.insert("order-svc".to_string(), ops);
}
let mut process_map = HashMap::new();
process_map.insert(
"order-svc".to_string(),
ProcessMatcher {
exe_contains: "bin/java".to_string(),
cmdline_contains: Some("order-svc.jar".to_string()),
},
);
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
(state, readings, deltas, cfg)
}
#[test]
fn apply_scrape_disambiguates_three_jvms_via_cmdline() {
let state = ScaphandreState::default();
let readings = vec![
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/account.jar".to_string(),
power_microwatts: 6_000_000.0,
},
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/chat.jar".to_string(),
power_microwatts: 9_000_000.0,
},
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/game.jar".to_string(),
power_microwatts: 12_000_000.0,
},
];
let mut deltas = HashMap::new();
deltas.insert("account".to_string(), 1_000u64);
deltas.insert("chat".to_string(), 1_000u64);
deltas.insert("game".to_string(), 1_000u64);
let mut process_map = HashMap::new();
for (svc, jar) in [
("account", "account.jar"),
("chat", "chat.jar"),
("game", "game.jar"),
] {
process_map.insert(
svc.to_string(),
ProcessMatcher {
exe_contains: "bin/java".to_string(),
cmdline_contains: Some(jar.to_string()),
},
);
}
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 100);
let snap = state.snapshot(100, 60_000);
let account = *snap.get("account").unwrap();
let chat = *snap.get("chat").unwrap();
let game = *snap.get("game").unwrap();
assert!(account < chat, "account {account} should be < chat {chat}");
assert!(chat < game, "chat {chat} should be < game {game}");
}
#[test]
fn apply_scrape_matches_on_exe_only_when_cmdline_unset() {
let state = ScaphandreState::default();
let readings = vec![ProcessPower {
exe: "/opt/native-svc/bin/native-svc".to_string(),
cmdline: "/opt/native-svc/bin/native-svc".to_string(),
power_microwatts: 5_000_000.0,
}];
let mut deltas = HashMap::new();
deltas.insert("native-svc".to_string(), 2_000u64);
let mut process_map = HashMap::new();
process_map.insert(
"native-svc".to_string(),
ProcessMatcher {
exe_contains: "/opt/native-svc/bin/native-svc".to_string(),
cmdline_contains: None,
},
);
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 100);
assert!(state.snapshot(100, 60_000).contains_key("native-svc"));
}
#[test]
fn apply_scrape_skips_on_ambiguous_matcher() {
let state = ScaphandreState::default();
let readings = vec![
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/account.jar".to_string(),
power_microwatts: 6_000_000.0,
},
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/chat.jar".to_string(),
power_microwatts: 9_000_000.0,
},
];
let mut deltas = HashMap::new();
deltas.insert("ambiguous-svc".to_string(), 1_000u64);
let mut process_map = HashMap::new();
process_map.insert(
"ambiguous-svc".to_string(),
ProcessMatcher {
exe_contains: "bin/java".to_string(),
cmdline_contains: Some("java-jar".to_string()),
},
);
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 100);
assert!(
!state.snapshot(100, 60_000).contains_key("ambiguous-svc"),
"ambiguous matcher must not publish energy",
);
}
#[test]
fn apply_scrape_ambiguous_warn_latch_persists_then_clears_on_clean_match() {
let state = ScaphandreState::default();
let ambiguous_readings = vec![
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/a.jar".to_string(),
power_microwatts: 6_000_000.0,
},
ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/b.jar".to_string(),
power_microwatts: 9_000_000.0,
},
];
let unique_reading = vec![ProcessPower {
exe: "/usr/bin/java".to_string(),
cmdline: "java-jar/tmp/a.jar".to_string(),
power_microwatts: 6_000_000.0,
}];
let mut deltas = HashMap::new();
deltas.insert("svc".to_string(), 1_000u64);
let mut process_map = HashMap::new();
process_map.insert(
"svc".to_string(),
ProcessMatcher {
exe_contains: "bin/java".to_string(),
cmdline_contains: Some("java-jar".to_string()),
},
);
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
let mut warned: HashSet<String> = HashSet::new();
apply_scrape(&state, &ambiguous_readings, &deltas, &cfg, &mut warned, 100);
assert!(warned.contains("svc"), "first ambiguous tick must latch");
apply_scrape(&state, &ambiguous_readings, &deltas, &cfg, &mut warned, 200);
assert!(warned.contains("svc"));
assert_eq!(warned.len(), 1);
apply_scrape(&state, &unique_reading, &deltas, &cfg, &mut warned, 300);
assert!(
!warned.contains("svc"),
"clean match must clear the latch so a later flap re-warns"
);
}
#[test]
fn apply_scrape_updates_mapped_service() {
let (state, readings, deltas, cfg) = test_scrape_fixture(12_000_000.0, 8000);
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 100);
let snap = state.snapshot(100, 60_000);
let got = *snap.get("order-svc").unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 8000.0;
assert!((got - expected).abs() < 1e-18);
}
#[test]
fn apply_scrape_keeps_previous_when_ops_zero() {
let (state, readings, _, cfg) = test_scrape_fixture(10_000_000.0, 5000);
let mut deltas = HashMap::new();
deltas.insert("order-svc".to_string(), 5000u64);
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 100);
let first = *state.snapshot(100, 60_000).get("order-svc").unwrap();
apply_scrape(
&state,
&readings,
&HashMap::new(),
&cfg,
&mut HashSet::new(),
110,
);
let second = *state.snapshot(110, 60_000).get("order-svc").unwrap();
assert!((first - second).abs() < f64::EPSILON);
}
#[test]
fn scaphandre_state_snapshot_filters_stale() {
let state = ScaphandreState::default();
state.insert_for_test("order-svc".to_string(), 1e-7, 100);
let snap = state.snapshot(1000, 500);
assert!(snap.is_empty());
let snap = state.snapshot(1000, 2000);
assert_eq!(snap.len(), 1);
}
#[test]
fn scaphandre_state_snapshot_clock_skew_kept_as_fresh() {
let state = ScaphandreState::default();
state.insert_for_test("order-svc".to_string(), 1e-7, 1000);
let snap = state.snapshot(500, 100);
assert_eq!(snap.len(), 1);
}
#[tokio::test]
async fn fetch_metrics_once_reads_from_fake_server() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let body = "# HELP scaph_process_power_consumption_microwatts per-process power\n\
# TYPE scaph_process_power_consumption_microwatts gauge\n\
scaph_host_power_microwatts 50000000.0\n\
scaph_process_power_consumption_microwatts{exe=\"java\",pid=\"1234\"} 12000000.0\n\
scaph_process_power_consumption_microwatts{exe=\"dotnet\",pid=\"5678\"} 3000000.0\n";
let response_owned = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain; version=0.0.4\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{body}",
body.len()
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
socket.write_all(response_owned.as_bytes()).await.unwrap();
socket.flush().await.unwrap();
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let fetched = fetch_metrics_once(&client, &uri, None)
.await
.expect("fake server fetch should succeed");
server.await.unwrap();
let readings = parse_scaphandre_metrics(&fetched);
assert_eq!(readings.len(), 2);
let java = readings.iter().find(|p| p.exe == "java").unwrap();
assert!((java.power_microwatts - 12_000_000.0).abs() < f64::EPSILON);
let dotnet = readings.iter().find(|p| p.exe == "dotnet").unwrap();
assert!((dotnet.power_microwatts - 3_000_000.0).abs() < f64::EPSILON);
let state = ScaphandreState::default();
let mut deltas = HashMap::new();
deltas.insert("order-svc".to_string(), 10_000u64);
let mut process_map = HashMap::new();
process_map.insert(
"order-svc".to_string(),
ProcessMatcher {
exe_contains: "java".to_string(),
cmdline_contains: None,
},
);
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_secs(5),
process_map,
auth_header: None,
};
apply_scrape(&state, &readings, &deltas, &cfg, &mut HashSet::new(), 1_000);
let snap = state.snapshot(1_000, 60_000);
let got = *snap.get("order-svc").unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 10_000.0;
assert!(
(got - expected).abs() < 1e-18,
"expected {expected}, got {got}"
);
}
#[tokio::test]
async fn fetch_metrics_once_surfaces_http_error_status() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let resp = "HTTP/1.1 500 Internal Server Error\r\n\
Content-Length: 0\r\n\
Connection: close\r\n\
\r\n";
socket.write_all(resp.as_bytes()).await.unwrap();
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let err = fetch_metrics_once(&client, &uri, None)
.await
.expect_err("500 should error");
server.await.unwrap();
match err {
ScraperError::Fetch(crate::http_client::FetchError::HttpStatus(500)) => {}
other => panic!("expected Fetch(HttpStatus(500)), got {other:?}"),
}
}
#[test]
fn parse_rejects_prefix_collision_metric() {
let body = "scaph_process_power_consumption_microwatts_total{exe=\"java\"} 123\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_metric_without_exe_label() {
let body =
"scaph_process_power_consumption_microwatts{pid=\"1234\",cmdline=\"java -jar\"} 50000\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_extracts_exe_when_not_first_label() {
let body = "scaph_process_power_consumption_microwatts{pid=\"42\",exe=\"postgres\"} 12345\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "postgres");
assert!((parsed[0].power_microwatts - 12345.0).abs() < f64::EPSILON);
}
#[test]
fn parse_rejects_label_with_unterminated_value() {
let body = "scaph_process_power_consumption_microwatts{exe=\"unclosed} 100\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_unmatched_label_block() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\",pid=\"1 20000\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_line_with_non_numeric_value() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} not-a-number\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_unescapes_quote_backslash_and_newline_in_exe_label() {
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\\"b\"} 10\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "a\"b");
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\\\b\"} 20\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed[0].exe, "a\\b");
let body = "scaph_process_power_consumption_microwatts{exe=\"line1\\nline2\"} 30\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed[0].exe, "line1\nline2");
}
#[test]
fn parse_preserves_unknown_escape_in_exe_label() {
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\tb\"} 5\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "a\\tb");
}
#[test]
fn parse_value_with_trailing_timestamp() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} 12345 1700000000\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert!((parsed[0].power_microwatts - 12345.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn spawn_scraper_happy_path_updates_state() {
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} 10000000\n";
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{body}",
body.len()
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let response_arc = Arc::new(response);
let server = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
let resp = response_arc.clone();
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let _ = socket.write_all(resp.as_bytes()).await;
let _ = socket.shutdown().await;
});
}
});
let mut process_map = HashMap::new();
process_map.insert(
"order-svc".to_string(),
ProcessMatcher {
exe_contains: "java".to_string(),
cmdline_contains: None,
},
);
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_millis(50),
process_map,
auth_header: None,
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
metrics
.service_io_ops_total
.with_label_values(&["order-svc"])
.inc_by(5_000.0);
let handle = spawn_scraper(cfg, state.clone(), metrics.clone());
tokio::time::sleep(Duration::from_millis(220)).await;
handle.abort();
let _ = handle.await;
server.abort();
let _ = server.await;
let snap = state.snapshot(crate::score::scaphandre::state::monotonic_ms(), 60_000);
assert!(
snap.contains_key("order-svc"),
"state should have been populated for order-svc; got {snap:?}"
);
}
#[tokio::test]
async fn spawn_scraper_500_keeps_running_and_state_empty() {
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let resp = "HTTP/1.1 500 Internal Server Error\r\n\
Content-Length: 0\r\n\
Connection: close\r\n\
\r\n";
let _ = socket.write_all(resp.as_bytes()).await;
let _ = socket.shutdown().await;
});
}
});
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_millis(40),
process_map: HashMap::new(),
auth_header: None,
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state.clone(), metrics);
tokio::time::sleep(Duration::from_millis(220)).await;
handle.abort();
let _ = handle.await;
server.abort();
let _ = server.await;
let snap = state.snapshot(crate::score::scaphandre::state::monotonic_ms(), 60_000);
assert!(snap.is_empty(), "500 scrapes must not populate state");
}
#[tokio::test]
async fn spawn_scraper_invalid_uri_exits_cleanly() {
use std::sync::Arc;
let cfg = ScaphandreConfig {
endpoint: "not a valid :: uri".to_string(),
scrape_interval: Duration::from_secs(5),
process_map: HashMap::new(),
auth_header: None,
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state, metrics);
let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
assert!(
result.is_ok(),
"scraper should exit cleanly on invalid URI, got: {result:?}"
);
}
#[tokio::test]
async fn spawn_scraper_unreachable_endpoint_keeps_running() {
use std::sync::Arc;
let cfg = ScaphandreConfig {
endpoint: "http://127.0.0.1:1/metrics".to_string(),
scrape_interval: Duration::from_millis(50),
process_map: HashMap::new(),
auth_header: None,
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state, metrics);
tokio::time::sleep(Duration::from_millis(150)).await;
handle.abort();
let _ = handle.await;
}
#[tokio::test]
async fn fetch_metrics_once_rejects_oversized_body() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let header = "HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: 9437184\r\n\
Connection: close\r\n\
\r\n";
let _ = socket.write_all(header.as_bytes()).await;
let chunk = vec![b'x'; 65536];
for _ in 0..(9 * 16) {
if socket.write_all(&chunk).await.is_err() {
break;
}
}
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let err = fetch_metrics_once(&client, &uri, None)
.await
.expect_err("oversized body must fail with BodyRead (LengthLimit)");
server.await.unwrap();
match err {
ScraperError::Fetch(crate::http_client::FetchError::BodyRead(msg)) => {
assert!(
msg.to_ascii_lowercase().contains("length") || msg.contains("limit"),
"expected length-limit error, got: {msg}"
);
}
other => panic!("expected Fetch(BodyRead), got {other:?}"),
}
}
#[test]
fn scraper_error_display_messages_are_informative() {
use crate::http_client::FetchError;
let e1 = ScraperError::Fetch(FetchError::BodyRead("oops".to_string()));
let e2 = ScraperError::Fetch(FetchError::HttpStatus(418));
let e3 = ScraperError::Fetch(FetchError::Timeout);
assert!(format!("{e1}").contains("fetch failed"));
assert!(format!("{e2}").contains("fetch failed"));
assert!(format!("{e3}").contains("fetch failed"));
}
#[tokio::test]
async fn scaphandre_scraper_sends_auth_header_on_wire() {
use crate::ingest::auth_header::AuthHeader;
let response = crate::test_helpers::http_200_text("text/plain", "");
let (endpoint, mut rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&format!("{endpoint}/metrics")).unwrap();
let auth = AuthHeader::parse("Authorization: Bearer topsecret").expect("valid");
let body = fetch_metrics_once(&client, &uri, Some(&auth))
.await
.expect("fetch_metrics_once must succeed");
assert_eq!(body, "");
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).expect("utf8");
assert!(
text.contains("authorization: Bearer topsecret")
|| text.contains("Authorization: Bearer topsecret"),
"auth header missing from request, got:\n{text}"
);
server.await.expect("server join");
}
#[test]
fn scraper_error_reason_maps_invalid_utf8() {
use crate::report::metrics::ScaphandreScrapeReason;
let utf8_err = String::from_utf8(vec![0xFF, 0xFE]).expect_err("invalid utf-8");
let err = ScraperError::Utf8(utf8_err);
assert_eq!(
scraper_error_reason(&err),
ScaphandreScrapeReason::InvalidUtf8
);
}
#[test]
fn scraper_error_reason_maps_fetch_timeout() {
use crate::http_client::FetchError;
use crate::report::metrics::ScaphandreScrapeReason;
let err = ScraperError::Fetch(FetchError::Timeout);
assert_eq!(scraper_error_reason(&err), ScaphandreScrapeReason::Timeout);
}
#[test]
fn scraper_error_reason_maps_fetch_http_status() {
use crate::http_client::FetchError;
use crate::report::metrics::ScaphandreScrapeReason;
let err = ScraperError::Fetch(FetchError::HttpStatus(503));
assert_eq!(
scraper_error_reason(&err),
ScaphandreScrapeReason::HttpError
);
}
#[test]
fn scraper_error_reason_maps_fetch_body_read() {
use crate::http_client::FetchError;
use crate::report::metrics::ScaphandreScrapeReason;
let err = ScraperError::Fetch(FetchError::BodyRead("connection reset".to_string()));
assert_eq!(
scraper_error_reason(&err),
ScaphandreScrapeReason::BodyReadError
);
}
#[test]
fn scraper_error_reason_maps_invalid_uri_to_request_error() {
use crate::report::metrics::ScaphandreScrapeReason;
let invalid_uri =
<hyper::Uri as std::str::FromStr>::from_str("not a uri").expect_err("uri parse must fail");
let err = ScraperError::InvalidUri {
endpoint: "not a uri".to_string(),
source: invalid_uri,
};
assert_eq!(
scraper_error_reason(&err),
ScaphandreScrapeReason::RequestError
);
}
#[test]
fn metrics_state_pre_warms_scaphandre_scrape_status_labels() {
use crate::report::metrics::MetricsState;
let state = MetricsState::new();
let output = state.render();
for status in ["success", "failed"] {
assert!(
output.contains(&format!(
"perf_sentinel_scaphandre_scrape_total{{status=\"{status}\"}} 0"
)),
"pre-warmed line for status={status} should appear in /metrics, got: {output}"
);
}
}
#[test]
fn metrics_state_pre_warms_scaphandre_scrape_failed_reasons() {
use crate::report::metrics::{MetricsState, ScaphandreScrapeReason};
let state = MetricsState::new();
let output = state.render();
for reason in &ScaphandreScrapeReason::ALL {
let label = reason.as_str();
assert!(
output.contains(&format!(
"perf_sentinel_scaphandre_scrape_failed_total{{reason=\"{label}\"}} 0"
)),
"pre-warmed line for reason={label} should appear in /metrics, got: {output}"
);
}
}
#[test]
fn metrics_state_scrape_success_counter_increments() {
use crate::report::metrics::MetricsState;
let state = MetricsState::new();
state.scaphandre_scrape_success.inc();
state.scaphandre_scrape_success.inc();
let output = state.render();
assert!(
output.contains("perf_sentinel_scaphandre_scrape_total{status=\"success\"} 2\n"),
"success counter must report 2 after two inc() calls, got: {output}"
);
assert!(
output.contains("perf_sentinel_scaphandre_scrape_total{status=\"failed\"} 0\n"),
"failed counter must remain 0, got: {output}"
);
}
#[test]
fn metrics_state_scrape_failed_reason_counter_increments() {
use crate::report::metrics::{MetricsState, ScaphandreScrapeReason};
let state = MetricsState::new();
state.scaphandre_scrape_failed.inc();
state
.scaphandre_scrape_failed_total
.with_label_values(&[ScaphandreScrapeReason::Timeout.as_str()])
.inc();
let output = state.render();
assert!(
output.contains("perf_sentinel_scaphandre_scrape_total{status=\"failed\"} 1\n"),
"failed status counter must report 1, got: {output}"
);
assert!(
output.contains("perf_sentinel_scaphandre_scrape_failed_total{reason=\"timeout\"} 1\n"),
"timeout reason counter must report 1, got: {output}"
);
assert!(
output.contains("perf_sentinel_scaphandre_scrape_failed_total{reason=\"unreachable\"} 0\n"),
"other reason counters must remain 0, got: {output}"
);
}
#[test]
fn track_zero_sample_streak_does_not_warn_under_threshold() {
let mut count: u32 = 0;
let mut warned = false;
for _ in 0..2 {
track_zero_sample_streak(0, 0, "http://redacted/metrics", &mut count, &mut warned);
}
assert_eq!(count, 2);
assert!(
!warned,
"warn flag must stay false until the 3rd zero-sample tick"
);
}
#[test]
fn track_zero_sample_streak_warns_after_three_consecutive_empty_ticks() {
let mut count: u32 = 0;
let mut warned = false;
for _ in 0..3 {
track_zero_sample_streak(0, 0, "http://redacted/metrics", &mut count, &mut warned);
}
assert_eq!(count, 3);
assert!(
warned,
"3rd consecutive zero-sample tick must trip the warn flag"
);
}
#[test]
fn track_zero_sample_streak_warns_only_once_per_streak() {
let mut count: u32 = 0;
let mut warned = false;
for _ in 0..10 {
track_zero_sample_streak(0, 0, "http://redacted/metrics", &mut count, &mut warned);
}
assert_eq!(count, 10);
assert!(warned);
}
#[test]
fn track_zero_sample_streak_resets_on_non_empty_scrape() {
let mut count: u32 = 5;
let mut warned = true;
track_zero_sample_streak(
7, 2,
"http://redacted/metrics",
&mut count,
&mut warned,
);
assert_eq!(count, 0);
assert!(!warned, "non-empty scrape must reset the warn latch");
}
#[test]
fn zero_sample_warn_marker_is_stable() {
assert_eq!(
ZERO_SAMPLE_WARN_MARKER,
"no samples matched the configured metric"
);
}