use std::collections::HashMap;
use std::time::Duration;
use crate::http_client::build_client as build_scraper_client;
use super::config::ScaphandreConfig;
use super::ops::{OpsSnapshotDiff, apply_scrape, compute_energy_per_op_kwh};
use super::parser::{ProcessPower, parse_scaphandre_metrics};
use super::scraper::{ScraperError, fetch_metrics_once, spawn_scraper};
use super::state::ScaphandreState;
#[test]
fn parse_empty_body() {
assert!(parse_scaphandre_metrics("").is_empty());
}
#[test]
fn parse_comments_only() {
let body = "# HELP scaph_host_power_microwatts host power\n\
# TYPE scaph_host_power_microwatts gauge\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_single_process_power() {
let body = r#"# HELP scaph_process_power_consumption_microwatts per-process power
# TYPE scaph_process_power_consumption_microwatts gauge
scaph_process_power_consumption_microwatts{exe="java",cmdline="java -jar app.jar",pid="1234"} 12500000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "java");
assert!((parsed[0].power_microwatts - 12_500_000.0).abs() < f64::EPSILON);
}
#[test]
fn parse_skips_other_metrics() {
let body = r#"scaph_host_power_microwatts 50000000.0
scaph_socket_power_microwatts{socket_id="0"} 25000000.0
scaph_process_power_consumption_microwatts{exe="java"} 8000000.0
scaph_process_power_consumption_microwatts{exe="dotnet"} 3000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 2);
assert!(parsed.iter().any(|p| p.exe == "java"));
assert!(parsed.iter().any(|p| p.exe == "dotnet"));
}
#[test]
fn parse_handles_escaped_quotes_in_cmdline() {
let body = r#"scaph_process_power_consumption_microwatts{exe="java",cmdline="java -Dfoo=\"bar baz\" -jar app.jar"} 12000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "java");
}
#[test]
fn parse_handles_escaped_backslash() {
let body = r#"scaph_process_power_consumption_microwatts{exe="weird\\path",cmdline="..."} 100.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "weird\\path");
}
#[test]
fn parse_skips_malformed_lines() {
let body = r#"scaph_process_power_consumption_microwatts{exe="java"} not_a_number
scaph_process_power_consumption_microwatts broken no braces
scaph_process_power_consumption_microwatts{exe="dotnet"} 5000000.0
"#;
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "dotnet");
}
#[test]
fn parse_unmatched_brace_is_skipped() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\",cmdline=\"broken 100.0\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_unescapes_newline_escape() {
let body = "scaph_process_power_consumption_microwatts{exe=\"multi\\nline\"} 1.0\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "multi\nline");
}
#[test]
fn compute_energy_per_op_basic() {
let got = compute_energy_per_op_kwh(12_000_000.0, 5.0, 8000).unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 8000.0;
assert!((got - expected).abs() < 1e-18);
}
#[test]
fn compute_energy_per_op_zero_ops_returns_none() {
assert!(compute_energy_per_op_kwh(12_000_000.0, 5.0, 0).is_none());
}
#[test]
fn compute_energy_per_op_negative_power_returns_none() {
assert!(compute_energy_per_op_kwh(-1.0, 5.0, 100).is_none());
}
#[test]
fn compute_energy_per_op_nan_returns_none() {
assert!(compute_energy_per_op_kwh(f64::NAN, 5.0, 100).is_none());
assert!(compute_energy_per_op_kwh(f64::INFINITY, 5.0, 100).is_none());
}
#[test]
fn ops_snapshot_diff_first_call_counts_all() {
let mut diff = OpsSnapshotDiff::default();
let mut current = HashMap::new();
current.insert("order-svc".to_string(), 100u64);
current.insert("chat-svc".to_string(), 50u64);
let deltas = diff.delta_and_advance(current);
assert_eq!(deltas.get("order-svc"), Some(&100));
assert_eq!(deltas.get("chat-svc"), Some(&50));
}
#[test]
fn ops_snapshot_diff_second_call_subtracts() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first);
let mut second = HashMap::new();
second.insert("order-svc".to_string(), 160u64);
let deltas = diff.delta_and_advance(second);
assert_eq!(deltas.get("order-svc"), Some(&60));
}
#[test]
fn ops_snapshot_diff_no_change_produces_empty() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first.clone());
let deltas = diff.delta_and_advance(first);
assert!(deltas.is_empty());
}
#[test]
fn ops_snapshot_diff_counter_reset_produces_zero_delta() {
let mut diff = OpsSnapshotDiff::default();
let mut first = HashMap::new();
first.insert("order-svc".to_string(), 100u64);
diff.delta_and_advance(first);
let mut second = HashMap::new();
second.insert("order-svc".to_string(), 10u64);
let deltas = diff.delta_and_advance(second);
assert!(!deltas.contains_key("order-svc"));
}
fn test_scrape_fixture(
power_microwatts: f64,
ops: u64,
) -> (
ScaphandreState,
Vec<ProcessPower>,
HashMap<String, u64>,
ScaphandreConfig,
) {
let state = ScaphandreState::default();
let readings = vec![ProcessPower {
exe: "java".to_string(),
power_microwatts,
}];
let mut deltas = HashMap::new();
if ops > 0 {
deltas.insert("order-svc".to_string(), ops);
}
let mut process_map = HashMap::new();
process_map.insert("order-svc".to_string(), "java".to_string());
let cfg = ScaphandreConfig {
endpoint: "http://localhost:8080/metrics".to_string(),
scrape_interval: Duration::from_secs(5),
process_map,
};
(state, readings, deltas, cfg)
}
#[test]
fn apply_scrape_updates_mapped_service() {
let (state, readings, deltas, cfg) = test_scrape_fixture(12_000_000.0, 8000);
apply_scrape(&state, &readings, &deltas, &cfg, 100);
let snap = state.snapshot(100, 60_000);
let got = *snap.get("order-svc").unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 8000.0;
assert!((got - expected).abs() < 1e-18);
}
#[test]
fn apply_scrape_keeps_previous_when_ops_zero() {
let (state, readings, _, cfg) = test_scrape_fixture(10_000_000.0, 5000);
let mut deltas = HashMap::new();
deltas.insert("order-svc".to_string(), 5000u64);
apply_scrape(&state, &readings, &deltas, &cfg, 100);
let first = *state.snapshot(100, 60_000).get("order-svc").unwrap();
apply_scrape(&state, &readings, &HashMap::new(), &cfg, 110);
let second = *state.snapshot(110, 60_000).get("order-svc").unwrap();
assert!((first - second).abs() < f64::EPSILON);
}
#[test]
fn scaphandre_state_snapshot_filters_stale() {
let state = ScaphandreState::default();
state.insert_for_test("order-svc".to_string(), 1e-7, 100);
let snap = state.snapshot(1000, 500);
assert!(snap.is_empty());
let snap = state.snapshot(1000, 2000);
assert_eq!(snap.len(), 1);
}
#[test]
fn scaphandre_state_snapshot_clock_skew_kept_as_fresh() {
let state = ScaphandreState::default();
state.insert_for_test("order-svc".to_string(), 1e-7, 1000);
let snap = state.snapshot(500, 100);
assert_eq!(snap.len(), 1);
}
#[tokio::test]
async fn fetch_metrics_once_reads_from_fake_server() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let body = "# HELP scaph_process_power_consumption_microwatts per-process power\n\
# TYPE scaph_process_power_consumption_microwatts gauge\n\
scaph_host_power_microwatts 50000000.0\n\
scaph_process_power_consumption_microwatts{exe=\"java\",pid=\"1234\"} 12000000.0\n\
scaph_process_power_consumption_microwatts{exe=\"dotnet\",pid=\"5678\"} 3000000.0\n";
let response_owned = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain; version=0.0.4\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{body}",
body.len()
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
socket.write_all(response_owned.as_bytes()).await.unwrap();
socket.flush().await.unwrap();
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let fetched = fetch_metrics_once(&client, &uri)
.await
.expect("fake server fetch should succeed");
server.await.unwrap();
let readings = parse_scaphandre_metrics(&fetched);
assert_eq!(readings.len(), 2);
let java = readings.iter().find(|p| p.exe == "java").unwrap();
assert!((java.power_microwatts - 12_000_000.0).abs() < f64::EPSILON);
let dotnet = readings.iter().find(|p| p.exe == "dotnet").unwrap();
assert!((dotnet.power_microwatts - 3_000_000.0).abs() < f64::EPSILON);
let state = ScaphandreState::default();
let mut deltas = HashMap::new();
deltas.insert("order-svc".to_string(), 10_000u64);
let mut process_map = HashMap::new();
process_map.insert("order-svc".to_string(), "java".to_string());
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_secs(5),
process_map,
};
apply_scrape(&state, &readings, &deltas, &cfg, 1_000);
let snap = state.snapshot(1_000, 60_000);
let got = *snap.get("order-svc").unwrap();
let expected = (12.0 * 5.0 / 3_600_000.0) / 10_000.0;
assert!(
(got - expected).abs() < 1e-18,
"expected {expected}, got {got}"
);
}
#[tokio::test]
async fn fetch_metrics_once_surfaces_http_error_status() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let resp = "HTTP/1.1 500 Internal Server Error\r\n\
Content-Length: 0\r\n\
Connection: close\r\n\
\r\n";
socket.write_all(resp.as_bytes()).await.unwrap();
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let err = fetch_metrics_once(&client, &uri)
.await
.expect_err("500 should error");
server.await.unwrap();
match err {
ScraperError::Fetch(crate::http_client::FetchError::HttpStatus(500)) => {}
other => panic!("expected Fetch(HttpStatus(500)), got {other:?}"),
}
}
#[test]
fn parse_rejects_prefix_collision_metric() {
let body = "scaph_process_power_consumption_microwatts_total{exe=\"java\"} 123\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_metric_without_exe_label() {
let body =
"scaph_process_power_consumption_microwatts{pid=\"1234\",cmdline=\"java -jar\"} 50000\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_extracts_exe_when_not_first_label() {
let body = "scaph_process_power_consumption_microwatts{pid=\"42\",exe=\"postgres\"} 12345\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "postgres");
assert!((parsed[0].power_microwatts - 12345.0).abs() < f64::EPSILON);
}
#[test]
fn parse_rejects_label_with_unterminated_value() {
let body = "scaph_process_power_consumption_microwatts{exe=\"unclosed} 100\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_unmatched_label_block() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\",pid=\"1 20000\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_skips_line_with_non_numeric_value() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} not-a-number\n";
assert!(parse_scaphandre_metrics(body).is_empty());
}
#[test]
fn parse_unescapes_quote_backslash_and_newline_in_exe_label() {
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\\"b\"} 10\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "a\"b");
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\\\b\"} 20\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed[0].exe, "a\\b");
let body = "scaph_process_power_consumption_microwatts{exe=\"line1\\nline2\"} 30\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed[0].exe, "line1\nline2");
}
#[test]
fn parse_preserves_unknown_escape_in_exe_label() {
let body = "scaph_process_power_consumption_microwatts{exe=\"a\\tb\"} 5\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].exe, "a\\tb");
}
#[test]
fn parse_value_with_trailing_timestamp() {
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} 12345 1700000000\n";
let parsed = parse_scaphandre_metrics(body);
assert_eq!(parsed.len(), 1);
assert!((parsed[0].power_microwatts - 12345.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn spawn_scraper_happy_path_updates_state() {
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let body = "scaph_process_power_consumption_microwatts{exe=\"java\"} 10000000\n";
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{body}",
body.len()
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let response_arc = Arc::new(response);
let server = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
let resp = response_arc.clone();
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let _ = socket.write_all(resp.as_bytes()).await;
let _ = socket.shutdown().await;
});
}
});
let mut process_map = HashMap::new();
process_map.insert("order-svc".to_string(), "java".to_string());
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_millis(50),
process_map,
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
metrics
.service_io_ops_total
.with_label_values(&["order-svc"])
.inc_by(5_000.0);
let handle = spawn_scraper(cfg, state.clone(), metrics.clone());
tokio::time::sleep(Duration::from_millis(220)).await;
handle.abort();
let _ = handle.await;
server.abort();
let _ = server.await;
let snap = state.snapshot(crate::score::scaphandre::state::monotonic_ms(), 60_000);
assert!(
snap.contains_key("order-svc"),
"state should have been populated for order-svc; got {snap:?}"
);
}
#[tokio::test]
async fn spawn_scraper_500_keeps_running_and_state_empty() {
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let resp = "HTTP/1.1 500 Internal Server Error\r\n\
Content-Length: 0\r\n\
Connection: close\r\n\
\r\n";
let _ = socket.write_all(resp.as_bytes()).await;
let _ = socket.shutdown().await;
});
}
});
let cfg = ScaphandreConfig {
endpoint,
scrape_interval: Duration::from_millis(40),
process_map: HashMap::new(),
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state.clone(), metrics);
tokio::time::sleep(Duration::from_millis(220)).await;
handle.abort();
let _ = handle.await;
server.abort();
let _ = server.await;
let snap = state.snapshot(crate::score::scaphandre::state::monotonic_ms(), 60_000);
assert!(snap.is_empty(), "500 scrapes must not populate state");
}
#[tokio::test]
async fn spawn_scraper_invalid_uri_exits_cleanly() {
use std::sync::Arc;
let cfg = ScaphandreConfig {
endpoint: "not a valid :: uri".to_string(),
scrape_interval: Duration::from_secs(5),
process_map: HashMap::new(),
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state, metrics);
let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
assert!(
result.is_ok(),
"scraper should exit cleanly on invalid URI, got: {result:?}"
);
}
#[tokio::test]
async fn spawn_scraper_unreachable_endpoint_keeps_running() {
use std::sync::Arc;
let cfg = ScaphandreConfig {
endpoint: "http://127.0.0.1:1/metrics".to_string(),
scrape_interval: Duration::from_millis(50),
process_map: HashMap::new(),
};
let state = Arc::new(ScaphandreState::default());
let metrics = Arc::new(crate::report::metrics::MetricsState::default());
let handle = spawn_scraper(cfg, state, metrics);
tokio::time::sleep(Duration::from_millis(150)).await;
handle.abort();
let _ = handle.await;
}
#[tokio::test]
async fn fetch_metrics_once_rejects_oversized_body() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/metrics");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let header = "HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: 9437184\r\n\
Connection: close\r\n\
\r\n";
let _ = socket.write_all(header.as_bytes()).await;
let chunk = vec![b'x'; 65536];
for _ in 0..(9 * 16) {
if socket.write_all(&chunk).await.is_err() {
break;
}
}
let _ = socket.shutdown().await;
});
let client = build_scraper_client();
let uri = <hyper::Uri as std::str::FromStr>::from_str(&endpoint).unwrap();
let err = fetch_metrics_once(&client, &uri)
.await
.expect_err("oversized body must fail with BodyRead (LengthLimit)");
server.await.unwrap();
match err {
ScraperError::Fetch(crate::http_client::FetchError::BodyRead(msg)) => {
assert!(
msg.to_ascii_lowercase().contains("length") || msg.contains("limit"),
"expected length-limit error, got: {msg}"
);
}
other => panic!("expected Fetch(BodyRead), got {other:?}"),
}
}
#[test]
fn scraper_error_display_messages_are_informative() {
use crate::http_client::FetchError;
let e1 = ScraperError::Fetch(FetchError::BodyRead("oops".to_string()));
let e2 = ScraperError::Fetch(FetchError::HttpStatus(418));
let e3 = ScraperError::Fetch(FetchError::Timeout);
assert!(format!("{e1}").contains("fetch failed"));
assert!(format!("{e2}").contains("fetch failed"));
assert!(format!("{e3}").contains("fetch failed"));
}